From 6493b39402e00b8460ded0f5b532beb66fab7c35 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 17 Sep 2024 09:59:35 +0200 Subject: [PATCH] admission/webhooks: cache whole webhook and cleanup Signed-off-by: Dr. Stefan Schimanski --- pkg/admission/mutatingwebhook/plugin.go | 194 +++++++++++++++------ pkg/admission/validatingwebhook/plugin.go | 196 ++++++++++++++++------ pkg/indexers/apibinding.go | 16 ++ 3 files changed, 300 insertions(+), 106 deletions(-) diff --git a/pkg/admission/mutatingwebhook/plugin.go b/pkg/admission/mutatingwebhook/plugin.go index b316c290759..7b2f184adf2 100644 --- a/pkg/admission/mutatingwebhook/plugin.go +++ b/pkg/admission/mutatingwebhook/plugin.go @@ -28,18 +28,22 @@ import ( kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" "github.com/kcp-dev/logicalcluster/v3" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/tools/cache" kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers" "github.com/kcp-dev/kcp/pkg/admission/validatingwebhook" + "github.com/kcp-dev/kcp/pkg/indexers" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) @@ -56,10 +60,16 @@ type Plugin struct { localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory - getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) + getAPIBinding func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) + getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) - managerLock sync.Mutex - managersCache map[logicalcluster.Name]generic.Source + lock sync.RWMutex + cache map[logicalcluster.Name]map[logicalcluster.Name]clusterCache // by request and hook source cluster. +} + +type clusterCache struct { + source generic.Source + plugin *mutating.Plugin } var ( @@ -72,9 +82,8 @@ var ( func NewMutatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) { p := &Plugin{ - managerLock: sync.Mutex{}, - managersCache: make(map[logicalcluster.Name]generic.Source), - Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), + cache: make(map[logicalcluster.Name]map[logicalcluster.Name]clusterCache), + Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), } if configFile != nil { config, err := io.ReadAll(configFile) @@ -100,30 +109,11 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi } clusterName := cluster.Name - var config io.Reader - if len(p.config) > 0 { - config = bytes.NewReader(p.config) - } - - hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource()) + plugin, err := p.getPlugin(clusterName, attr) if err != nil { return err } - plugin, err := mutating.NewMutatingWebhook(config) - if err != nil { - return fmt.Errorf("error creating mutating admission webhook: %w", err) - } - - plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) - plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) - plugin.SetHookSource(hookSource) - plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) - - if err := plugin.ValidateInitialization(); err != nil { - return fmt.Errorf("error validating MutatingWebhook initialization: %w", err) - } - // Add cluster annotation on create if attr.GetOperation() == admission.Create { u, ok := attr.GetObject().(metav1.Object) @@ -138,40 +128,73 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi return plugin.Admit(ctx, attr, o) } -func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) { - clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource) - if err != nil { - return nil, err +func (p *Plugin) getPlugin(clusterName logicalcluster.Name, attr admission.Attributes) (*mutating.Plugin, error) { + var config io.Reader + if len(p.config) > 0 { + config = bytes.NewReader(p.config) } - p.managerLock.Lock() - defer p.managerLock.Unlock() - if _, ok := p.managersCache[clusterNameForGroupResource]; !ok { - p.managersCache[clusterNameForGroupResource] = configuration.NewMutatingWebhookConfigurationManagerForInformer( - p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(clusterNameForGroupResource), - ) + // get the APIBinding for the resource, or nil for local resources + gr := attr.GetResource().GroupResource() + binding, err := p.getAPIBinding(clusterName, gr) + if err != nil && !kerrors.IsNotFound(err) { + return nil, fmt.Errorf("error getting APIBinding for %q: %w", gr, err) + } + sourceClusterName := clusterName + if binding != nil { + sourceClusterName = logicalcluster.Name(binding.Status.APIExportClusterName) } - return p.managersCache[clusterNameForGroupResource], nil -} + // fast path + p.lock.RLock() + c, ok := p.cache[clusterName][sourceClusterName] + p.lock.RUnlock() + if ok { + return c.plugin, nil + } + + // slow path + p.lock.Lock() + defer p.lock.Unlock() + c, ok = p.cache[clusterName][sourceClusterName] + if ok { + return c.plugin, nil + } -func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) { - objs, err := p.getAPIBindings(clusterName) + // double check that the logical cluster is still alive + if _, err := p.getLogicalCluster(clusterName); err != nil { + return nil, fmt.Errorf("error getting LogicalCluster %q: %w", clusterName, err) + } + + // create new plugin for this logical cluster and source cluster + source := configuration.NewMutatingWebhookConfigurationManagerForInformer( + // TODO(sttts): fix supporting local admission webhooks for bound resources as well + p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(sourceClusterName), + ) + plugin, err := mutating.NewMutatingWebhook(config) if err != nil { - return "", err + return nil, fmt.Errorf("error creating mutaing admission webhook: %w", err) } + plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) + plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + plugin.SetHookSource(source) + plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) - for _, apiBinding := range objs { - for _, br := range apiBinding.Status.BoundResources { - if br.Group == groupResource.Group && br.Resource == groupResource.Resource { - // GroupResource comes from an APIBinding/APIExport - return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil - } - } + if err := plugin.ValidateInitialization(); err != nil { + return nil, fmt.Errorf("error mutaing MutatingAdmissionWebhook initialization: %w", err) + } + + // store in cache + c = clusterCache{ + source: source, + plugin: plugin, + } + if _, ok := p.cache[clusterName]; !ok { + p.cache[clusterName] = map[logicalcluster.Name]clusterCache{} } + p.cache[clusterName][sourceClusterName] = c - // GroupResource is local to this cluster - return clusterName, nil + return c.plugin, nil } func (p *Plugin) ValidateInitialization() error { @@ -196,8 +219,71 @@ func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInf p.globalKubeSharedInformerFactory = global } -func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { - p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { - return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything()) +func (p *Plugin) SetKcpInformers(local, _ kcpinformers.SharedInformerFactory) { + // watch APIBindings + _ = local.Apis().V1alpha1().APIBindings().Informer().AddIndexers(cache.Indexers{ + indexers.APIBindingByBoundResources: indexers.IndexAPIBindingByBoundResources, + indexers.APIBindingsByAPIExportCluster: indexers.IndexAPIBindingsByAPIExportCluster, + }) // ignore conflict + p.getAPIBinding = func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) { + key := indexers.APIBindingBoundResourceValue(clusterName, gr.Resource, gr.Group) + objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingByBoundResources, key) + if err != nil { + return nil, fmt.Errorf("error getting APIBindings by bound resources: %w", err) + } + switch len(objs) { + case 0: + return nil, kerrors.NewNotFound(apisv1alpha1.Resource("APIBinding"), key) + case 1: + return objs[0].(*apisv1alpha1.APIBinding), nil + default: + // should never happen + return nil, fmt.Errorf("found multiple APIBindings for bound resources %q", key) + } + } + _, err := local.Apis().V1alpha1().APIBindings().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + // delete if there is no other binding from the same logical cluster + binding := obj.(*apisv1alpha1.APIBinding) + key := indexers.APIBindingByBoundResourceValue(logicalcluster.From(binding), logicalcluster.Name(binding.Status.APIExportClusterName)) + objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingsByAPIExportCluster, key) + if err != nil { + runtime.HandleError(fmt.Errorf("error getting APIBindings by APIExportCluster: %w", err)) + return + } + foundOther := false + for _, obj := range objs { + otherBinding := obj.(*apisv1alpha1.APIBinding) + if otherBinding.Name != binding.Name { + foundOther = true + break + } + } + if !foundOther { + delete(p.cache[logicalcluster.From(binding)], logicalcluster.Name(binding.Status.APIExportClusterName)) + } + }, + }) + if err != nil { + runtime.HandleError(fmt.Errorf("error adding APIBinding delete event handler: %w", err)) + } + + // watch logical clusters + p.getLogicalCluster = func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) { + return local.Core().V1alpha1().LogicalClusters().Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName) + } + _, err = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + clusterName := logicalcluster.From(obj.(*corev1alpha1.LogicalCluster)) + p.lock.Lock() + defer p.lock.Unlock() + delete(p.cache, clusterName) + }, + }) + if err != nil { + runtime.HandleError(fmt.Errorf("error adding LogicalCluster delete event handler: %w", err)) } } diff --git a/pkg/admission/validatingwebhook/plugin.go b/pkg/admission/validatingwebhook/plugin.go index 5c19d9f3fe8..70ed5a9e1b1 100644 --- a/pkg/admission/validatingwebhook/plugin.go +++ b/pkg/admission/validatingwebhook/plugin.go @@ -28,17 +28,21 @@ import ( kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" "github.com/kcp-dev/logicalcluster/v3" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission/configuration" "k8s.io/apiserver/pkg/admission/plugin/webhook/generic" "k8s.io/apiserver/pkg/admission/plugin/webhook/validating" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/tools/cache" kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers" + "github.com/kcp-dev/kcp/pkg/indexers" apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1" + corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) @@ -55,10 +59,16 @@ type Plugin struct { localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory - getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) + getAPIBinding func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) + getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) - managerLock sync.Mutex - managersCache map[logicalcluster.Name]generic.Source + lock sync.RWMutex + cache map[logicalcluster.Name]map[logicalcluster.Name]clusterCache // by request and hook source cluster. +} + +type clusterCache struct { + source generic.Source + plugin *validating.Plugin } var ( @@ -71,9 +81,8 @@ var ( func NewValidatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) { p := &Plugin{ - managerLock: sync.Mutex{}, - managersCache: make(map[logicalcluster.Name]generic.Source), - Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), + cache: make(map[logicalcluster.Name]map[logicalcluster.Name]clusterCache), + Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update), } if configFile != nil { config, err := io.ReadAll(configFile) @@ -99,30 +108,16 @@ func (p *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admi } clusterName := cluster.Name - var config io.Reader - if len(p.config) > 0 { - config = bytes.NewReader(p.config) + if attr.GetResource().GroupResource() == corev1alpha1.Resource("logicalclusters") { + // logical clusters are excluded from admission webhooks + return nil } - hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource()) + plugin, err := p.getPlugin(clusterName, attr) if err != nil { return err } - plugin, err := validating.NewValidatingAdmissionWebhook(config) - if err != nil { - return fmt.Errorf("error creating validating admission webhook: %w", err) - } - - plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) - plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) - plugin.SetHookSource(hookSource) - plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) - - if err := plugin.ValidateInitialization(); err != nil { - return fmt.Errorf("error validating ValidatingAdmissionWebhook initialization: %w", err) - } - // Add cluster annotation on create if attr.GetOperation() == admission.Create { u, ok := attr.GetObject().(metav1.Object) @@ -137,40 +132,74 @@ func (p *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admi return plugin.Validate(ctx, attr, o) } -func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) { - clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource) - if err != nil { - return nil, err +func (p *Plugin) getPlugin(clusterName logicalcluster.Name, attr admission.Attributes) (*validating.Plugin, error) { + var config io.Reader + if len(p.config) > 0 { + config = bytes.NewReader(p.config) } - p.managerLock.Lock() - defer p.managerLock.Unlock() - if _, ok := p.managersCache[clusterNameForGroupResource]; !ok { - p.managersCache[clusterNameForGroupResource] = configuration.NewValidatingWebhookConfigurationManagerForInformer( - p.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().Cluster(clusterNameForGroupResource), - ) + // get the APIBinding for the resource, or nil for local resources + gr := attr.GetResource().GroupResource() + binding, err := p.getAPIBinding(clusterName, gr) + if err != nil && !kerrors.IsNotFound(err) { + return nil, fmt.Errorf("error getting APIBinding for %q: %w", gr, err) + } + sourceClusterName := clusterName + if binding != nil { + sourceClusterName = logicalcluster.Name(binding.Status.APIExportClusterName) } - return p.managersCache[clusterNameForGroupResource], nil -} + // fast path + p.lock.RLock() + c, ok := p.cache[clusterName][sourceClusterName] + p.lock.RUnlock() + if ok { + return c.plugin, nil + } + + // slow path + p.lock.Lock() + defer p.lock.Unlock() + c, ok = p.cache[clusterName][sourceClusterName] + if ok { + return c.plugin, nil + } + + // double check that the logical cluster is still alive + if _, err := p.getLogicalCluster(clusterName); err != nil { + p.lock.Unlock() + return nil, fmt.Errorf("error getting LogicalCluster %q: %w", clusterName, err) + } -func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) { - objs, err := p.getAPIBindings(clusterName) + // create new plugin for this logical cluster and source cluster + source := configuration.NewValidatingWebhookConfigurationManagerForInformer( + // TODO(sttts): fix supporting local admission webhooks for bound resources as well + p.globalKubeSharedInformerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().Cluster(sourceClusterName), + ) + plugin, err := validating.NewValidatingAdmissionWebhook(config) if err != nil { - return "", err + return nil, fmt.Errorf("error creating mutaing admission webhook: %w", err) + } + plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path())) + plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + plugin.SetHookSource(source) + plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName)) + + if err := plugin.ValidateInitialization(); err != nil { + return nil, fmt.Errorf("error mutaing ValidatingAdmissionWebhook initialization: %w", err) } - for _, apiBinding := range objs { - for _, br := range apiBinding.Status.BoundResources { - if br.Group == groupResource.Group && br.Resource == groupResource.Resource { - // GroupResource comes from an APIBinding/APIExport - return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil - } - } + // store in cache + c = clusterCache{ + source: source, + plugin: plugin, + } + if _, ok := p.cache[clusterName]; !ok { + p.cache[clusterName] = map[logicalcluster.Name]clusterCache{} } + p.cache[clusterName][sourceClusterName] = c - // GroupResource is local to this cluster - return clusterName, nil + return c.plugin, nil } func (p *Plugin) ValidateInitialization() error { @@ -195,9 +224,72 @@ func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInf p.globalKubeSharedInformerFactory = global } -func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) { - p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) { - return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything()) +func (p *Plugin) SetKcpInformers(local, _ kcpinformers.SharedInformerFactory) { + // watch APIBindings + _ = local.Apis().V1alpha1().APIBindings().Informer().AddIndexers(cache.Indexers{ + indexers.APIBindingByBoundResources: indexers.IndexAPIBindingByBoundResources, + indexers.APIBindingsByAPIExportCluster: indexers.IndexAPIBindingsByAPIExportCluster, + }) // ignore conflict + p.getAPIBinding = func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) { + key := indexers.APIBindingBoundResourceValue(clusterName, gr.Resource, gr.Group) + objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingByBoundResources, key) + if err != nil { + return nil, fmt.Errorf("error getting APIBindings by bound resources: %w", err) + } + switch len(objs) { + case 0: + return nil, kerrors.NewNotFound(apisv1alpha1.Resource("APIBinding"), key) + case 1: + return objs[0].(*apisv1alpha1.APIBinding), nil + default: + // should never happen + return nil, fmt.Errorf("found multiple APIBindings for bound resources %q", key) + } + } + _, err := local.Apis().V1alpha1().APIBindings().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + // delete if there is no other binding from the same logical cluster + binding := obj.(*apisv1alpha1.APIBinding) + key := indexers.APIBindingByBoundResourceValue(logicalcluster.From(binding), logicalcluster.Name(binding.Status.APIExportClusterName)) + objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingsByAPIExportCluster, key) + if err != nil { + runtime.HandleError(fmt.Errorf("error getting APIBindings by APIExportCluster: %w", err)) + return + } + foundOther := false + for _, obj := range objs { + otherBinding := obj.(*apisv1alpha1.APIBinding) + if otherBinding.Name != binding.Name { + foundOther = true + break + } + } + if !foundOther { + delete(p.cache[logicalcluster.From(binding)], logicalcluster.Name(binding.Status.APIExportClusterName)) + } + }, + }) + if err != nil { + runtime.HandleError(fmt.Errorf("error adding APIBinding delete event handler: %w", err)) + } + + // watch logical clusters + p.getLogicalCluster = func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) { + return local.Core().V1alpha1().LogicalClusters().Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName) + } + _, err = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + clusterName := logicalcluster.From(obj.(*corev1alpha1.LogicalCluster)) + p.lock.Lock() + defer p.lock.Unlock() + delete(p.cache, clusterName) + }, + }) + if err != nil { + runtime.HandleError(fmt.Errorf("error adding LogicalCluster delete event handler: %w", err)) } } diff --git a/pkg/indexers/apibinding.go b/pkg/indexers/apibinding.go index 332970ff833..7512d3ca0b0 100644 --- a/pkg/indexers/apibinding.go +++ b/pkg/indexers/apibinding.go @@ -107,3 +107,19 @@ func IndexAPIBindingByAPIExport(obj interface{}) ([]string, error) { return []string{path.Join(apiBinding.Spec.Reference.Export.Name).String()}, nil } + +const APIBindingsByAPIExportCluster = "APIBindingsByAPIExportCluster" + +// IndexAPIBindingsByAPIExportCluster indexes the APIBindings by their APIExport's logical cluster. +func IndexAPIBindingsByAPIExportCluster(obj interface{}) ([]string, error) { + apiBinding, ok := obj.(*apisv1alpha1.APIBinding) + if !ok { + return []string{}, fmt.Errorf("obj %T is not an APIBinding", obj) + } + + return []string{APIBindingByBoundResourceValue(logicalcluster.From(apiBinding), logicalcluster.Name(apiBinding.Status.APIExportClusterName))}, nil +} + +func APIBindingByBoundResourceValue(clusterName, exportClusterName logicalcluster.Name) string { + return fmt.Sprintf("%s|%s", clusterName, exportClusterName) +}