diff --git a/pkg/generated/controllers/core/v1/node.go b/pkg/generated/controllers/core/v1/node.go index 4107968b..46fc835e 100644 --- a/pkg/generated/controllers/core/v1/node.go +++ b/pkg/generated/controllers/core/v1/node.go @@ -20,6 +20,7 @@ package v1 import ( "context" + "sync" "time" "github.com/rancher/wrangler/v2/pkg/apply" @@ -48,10 +49,14 @@ type NodeCache interface { generic.NonNamespacedCacheInterface[*v1.Node] } +// NodeStatusHandler is executed for every added or modified Node. Should return the new status to be updated type NodeStatusHandler func(obj *v1.Node, status v1.NodeStatus) (v1.NodeStatus, error) +// NodeGeneratingHandler is the top-level handler that is executed for every Node event. It extends NodeStatusHandler by a returning a slice of child objects to be passed to apply.Apply type NodeGeneratingHandler func(obj *v1.Node, status v1.NodeStatus) ([]runtime.Object, v1.NodeStatus, error) +// RegisterNodeStatusHandler configures a NodeController to execute a NodeStatusHandler for every events observed. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterNodeStatusHandler(ctx context.Context, controller NodeController, condition condition.Cond, name string, handler NodeStatusHandler) { statusHandler := &nodeStatusHandler{ client: controller, @@ -61,6 +66,8 @@ func RegisterNodeStatusHandler(ctx context.Context, controller NodeController, c controller.AddGenericHandler(ctx, name, generic.FromObjectHandlerToHandler(statusHandler.sync)) } +// RegisterNodeGeneratingHandler configures a NodeController to execute a NodeGeneratingHandler for every events observed, passing the returned objects to the provided apply.Apply. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterNodeGeneratingHandler(ctx context.Context, controller NodeController, apply apply.Apply, condition condition.Cond, name string, handler NodeGeneratingHandler, opts *generic.GeneratingHandlerOptions) { statusHandler := &nodeGeneratingHandler{ @@ -82,6 +89,7 @@ type nodeStatusHandler struct { handler NodeStatusHandler } +// sync is executed on every resource addition or modification. Executes the configured handlers and sends the updated status to the Kubernetes API func (a *nodeStatusHandler) sync(key string, obj *v1.Node) (*v1.Node, error) { if obj == nil { return obj, nil @@ -127,8 +135,10 @@ type nodeGeneratingHandler struct { opts generic.GeneratingHandlerOptions gvk schema.GroupVersionKind name string + seen sync.Map } +// Remove handles the observed deletion of a resource, cascade deleting every associated resource previously applied func (a *nodeGeneratingHandler) Remove(key string, obj *v1.Node) (*v1.Node, error) { if obj != nil { return obj, nil @@ -138,12 +148,17 @@ func (a *nodeGeneratingHandler) Remove(key string, obj *v1.Node) (*v1.Node, erro obj.Namespace, obj.Name = kv.RSplit(key, "/") obj.SetGroupVersionKind(a.gvk) + if a.opts.UniqueApplyForResourceVersion { + a.seen.Delete(key) + } + return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects() } +// Handle executes the configured NodeGeneratingHandler and pass the resulting objects to apply.Apply, finally returning the new status of the resource func (a *nodeGeneratingHandler) Handle(obj *v1.Node, status v1.NodeStatus) (v1.NodeStatus, error) { if !obj.DeletionTimestamp.IsZero() { return status, nil @@ -153,9 +168,41 @@ func (a *nodeGeneratingHandler) Handle(obj *v1.Node, status v1.NodeStatus) (v1.N if err != nil { return newStatus, err } + if !a.isNewResourceVersion(obj) { + return newStatus, nil + } - return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). + err = generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects(objs...) + if err != nil { + return newStatus, err + } + a.storeResourceVersion(obj) + return newStatus, nil +} + +// isNewResourceVersion detects if a specific resource version was already successfully processed. +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *nodeGeneratingHandler) isNewResourceVersion(obj *v1.Node) bool { + if !a.opts.UniqueApplyForResourceVersion { + return true + } + + // Apply once per resource version + key := obj.Namespace + "/" + obj.Name + previous, ok := a.seen.Load(key) + return !ok || previous != obj.ResourceVersion +} + +// storeResourceVersion keeps track of the latest resource version of an object for which Apply was executed +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *nodeGeneratingHandler) storeResourceVersion(obj *v1.Node) { + if !a.opts.UniqueApplyForResourceVersion { + return + } + + key := obj.Namespace + "/" + obj.Name + a.seen.Store(key, obj.ResourceVersion) } diff --git a/pkg/generated/controllers/core/v1/pod.go b/pkg/generated/controllers/core/v1/pod.go index b132e51a..91765104 100644 --- a/pkg/generated/controllers/core/v1/pod.go +++ b/pkg/generated/controllers/core/v1/pod.go @@ -20,6 +20,7 @@ package v1 import ( "context" + "sync" "time" "github.com/rancher/wrangler/v2/pkg/apply" @@ -48,10 +49,14 @@ type PodCache interface { generic.CacheInterface[*v1.Pod] } +// PodStatusHandler is executed for every added or modified Pod. Should return the new status to be updated type PodStatusHandler func(obj *v1.Pod, status v1.PodStatus) (v1.PodStatus, error) +// PodGeneratingHandler is the top-level handler that is executed for every Pod event. It extends PodStatusHandler by a returning a slice of child objects to be passed to apply.Apply type PodGeneratingHandler func(obj *v1.Pod, status v1.PodStatus) ([]runtime.Object, v1.PodStatus, error) +// RegisterPodStatusHandler configures a PodController to execute a PodStatusHandler for every events observed. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterPodStatusHandler(ctx context.Context, controller PodController, condition condition.Cond, name string, handler PodStatusHandler) { statusHandler := &podStatusHandler{ client: controller, @@ -61,6 +66,8 @@ func RegisterPodStatusHandler(ctx context.Context, controller PodController, con controller.AddGenericHandler(ctx, name, generic.FromObjectHandlerToHandler(statusHandler.sync)) } +// RegisterPodGeneratingHandler configures a PodController to execute a PodGeneratingHandler for every events observed, passing the returned objects to the provided apply.Apply. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterPodGeneratingHandler(ctx context.Context, controller PodController, apply apply.Apply, condition condition.Cond, name string, handler PodGeneratingHandler, opts *generic.GeneratingHandlerOptions) { statusHandler := &podGeneratingHandler{ @@ -82,6 +89,7 @@ type podStatusHandler struct { handler PodStatusHandler } +// sync is executed on every resource addition or modification. Executes the configured handlers and sends the updated status to the Kubernetes API func (a *podStatusHandler) sync(key string, obj *v1.Pod) (*v1.Pod, error) { if obj == nil { return obj, nil @@ -127,8 +135,10 @@ type podGeneratingHandler struct { opts generic.GeneratingHandlerOptions gvk schema.GroupVersionKind name string + seen sync.Map } +// Remove handles the observed deletion of a resource, cascade deleting every associated resource previously applied func (a *podGeneratingHandler) Remove(key string, obj *v1.Pod) (*v1.Pod, error) { if obj != nil { return obj, nil @@ -138,12 +148,17 @@ func (a *podGeneratingHandler) Remove(key string, obj *v1.Pod) (*v1.Pod, error) obj.Namespace, obj.Name = kv.RSplit(key, "/") obj.SetGroupVersionKind(a.gvk) + if a.opts.UniqueApplyForResourceVersion { + a.seen.Delete(key) + } + return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects() } +// Handle executes the configured PodGeneratingHandler and pass the resulting objects to apply.Apply, finally returning the new status of the resource func (a *podGeneratingHandler) Handle(obj *v1.Pod, status v1.PodStatus) (v1.PodStatus, error) { if !obj.DeletionTimestamp.IsZero() { return status, nil @@ -153,9 +168,41 @@ func (a *podGeneratingHandler) Handle(obj *v1.Pod, status v1.PodStatus) (v1.PodS if err != nil { return newStatus, err } + if !a.isNewResourceVersion(obj) { + return newStatus, nil + } - return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). + err = generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects(objs...) + if err != nil { + return newStatus, err + } + a.storeResourceVersion(obj) + return newStatus, nil +} + +// isNewResourceVersion detects if a specific resource version was already successfully processed. +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *podGeneratingHandler) isNewResourceVersion(obj *v1.Pod) bool { + if !a.opts.UniqueApplyForResourceVersion { + return true + } + + // Apply once per resource version + key := obj.Namespace + "/" + obj.Name + previous, ok := a.seen.Load(key) + return !ok || previous != obj.ResourceVersion +} + +// storeResourceVersion keeps track of the latest resource version of an object for which Apply was executed +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *podGeneratingHandler) storeResourceVersion(obj *v1.Pod) { + if !a.opts.UniqueApplyForResourceVersion { + return + } + + key := obj.Namespace + "/" + obj.Name + a.seen.Store(key, obj.ResourceVersion) } diff --git a/pkg/generated/controllers/gke.cattle.io/v1/gkeclusterconfig.go b/pkg/generated/controllers/gke.cattle.io/v1/gkeclusterconfig.go index 343b5deb..2e7ca486 100644 --- a/pkg/generated/controllers/gke.cattle.io/v1/gkeclusterconfig.go +++ b/pkg/generated/controllers/gke.cattle.io/v1/gkeclusterconfig.go @@ -20,6 +20,7 @@ package v1 import ( "context" + "sync" "time" v1 "github.com/rancher/gke-operator/pkg/apis/gke.cattle.io/v1" @@ -48,10 +49,14 @@ type GKEClusterConfigCache interface { generic.CacheInterface[*v1.GKEClusterConfig] } +// GKEClusterConfigStatusHandler is executed for every added or modified GKEClusterConfig. Should return the new status to be updated type GKEClusterConfigStatusHandler func(obj *v1.GKEClusterConfig, status v1.GKEClusterConfigStatus) (v1.GKEClusterConfigStatus, error) +// GKEClusterConfigGeneratingHandler is the top-level handler that is executed for every GKEClusterConfig event. It extends GKEClusterConfigStatusHandler by a returning a slice of child objects to be passed to apply.Apply type GKEClusterConfigGeneratingHandler func(obj *v1.GKEClusterConfig, status v1.GKEClusterConfigStatus) ([]runtime.Object, v1.GKEClusterConfigStatus, error) +// RegisterGKEClusterConfigStatusHandler configures a GKEClusterConfigController to execute a GKEClusterConfigStatusHandler for every events observed. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterGKEClusterConfigStatusHandler(ctx context.Context, controller GKEClusterConfigController, condition condition.Cond, name string, handler GKEClusterConfigStatusHandler) { statusHandler := &gKEClusterConfigStatusHandler{ client: controller, @@ -61,6 +66,8 @@ func RegisterGKEClusterConfigStatusHandler(ctx context.Context, controller GKECl controller.AddGenericHandler(ctx, name, generic.FromObjectHandlerToHandler(statusHandler.sync)) } +// RegisterGKEClusterConfigGeneratingHandler configures a GKEClusterConfigController to execute a GKEClusterConfigGeneratingHandler for every events observed, passing the returned objects to the provided apply.Apply. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution func RegisterGKEClusterConfigGeneratingHandler(ctx context.Context, controller GKEClusterConfigController, apply apply.Apply, condition condition.Cond, name string, handler GKEClusterConfigGeneratingHandler, opts *generic.GeneratingHandlerOptions) { statusHandler := &gKEClusterConfigGeneratingHandler{ @@ -82,6 +89,7 @@ type gKEClusterConfigStatusHandler struct { handler GKEClusterConfigStatusHandler } +// sync is executed on every resource addition or modification. Executes the configured handlers and sends the updated status to the Kubernetes API func (a *gKEClusterConfigStatusHandler) sync(key string, obj *v1.GKEClusterConfig) (*v1.GKEClusterConfig, error) { if obj == nil { return obj, nil @@ -127,8 +135,10 @@ type gKEClusterConfigGeneratingHandler struct { opts generic.GeneratingHandlerOptions gvk schema.GroupVersionKind name string + seen sync.Map } +// Remove handles the observed deletion of a resource, cascade deleting every associated resource previously applied func (a *gKEClusterConfigGeneratingHandler) Remove(key string, obj *v1.GKEClusterConfig) (*v1.GKEClusterConfig, error) { if obj != nil { return obj, nil @@ -138,12 +148,17 @@ func (a *gKEClusterConfigGeneratingHandler) Remove(key string, obj *v1.GKECluste obj.Namespace, obj.Name = kv.RSplit(key, "/") obj.SetGroupVersionKind(a.gvk) + if a.opts.UniqueApplyForResourceVersion { + a.seen.Delete(key) + } + return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects() } +// Handle executes the configured GKEClusterConfigGeneratingHandler and pass the resulting objects to apply.Apply, finally returning the new status of the resource func (a *gKEClusterConfigGeneratingHandler) Handle(obj *v1.GKEClusterConfig, status v1.GKEClusterConfigStatus) (v1.GKEClusterConfigStatus, error) { if !obj.DeletionTimestamp.IsZero() { return status, nil @@ -153,9 +168,41 @@ func (a *gKEClusterConfigGeneratingHandler) Handle(obj *v1.GKEClusterConfig, sta if err != nil { return newStatus, err } + if !a.isNewResourceVersion(obj) { + return newStatus, nil + } - return newStatus, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). + err = generic.ConfigureApplyForObject(a.apply, obj, &a.opts). WithOwner(obj). WithSetID(a.name). ApplyObjects(objs...) + if err != nil { + return newStatus, err + } + a.storeResourceVersion(obj) + return newStatus, nil +} + +// isNewResourceVersion detects if a specific resource version was already successfully processed. +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *gKEClusterConfigGeneratingHandler) isNewResourceVersion(obj *v1.GKEClusterConfig) bool { + if !a.opts.UniqueApplyForResourceVersion { + return true + } + + // Apply once per resource version + key := obj.Namespace + "/" + obj.Name + previous, ok := a.seen.Load(key) + return !ok || previous != obj.ResourceVersion +} + +// storeResourceVersion keeps track of the latest resource version of an object for which Apply was executed +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *gKEClusterConfigGeneratingHandler) storeResourceVersion(obj *v1.GKEClusterConfig) { + if !a.opts.UniqueApplyForResourceVersion { + return + } + + key := obj.Namespace + "/" + obj.Name + a.seen.Store(key, obj.ResourceVersion) }