Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 admission/webhooks: properly cache webhooks and cleanup #3164

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 140 additions & 54 deletions pkg/admission/mutatingwebhook/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ import (
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
"github.com/kcp-dev/logicalcluster/v3"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/configuration"
"k8s.io/apiserver/pkg/admission/plugin/webhook/generic"
"k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"

kcpinitializers "github.com/kcp-dev/kcp/pkg/admission/initializers"
"github.com/kcp-dev/kcp/pkg/admission/validatingwebhook"
"github.com/kcp-dev/kcp/pkg/indexers"
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions"
)

Expand All @@ -56,10 +60,16 @@ type Plugin struct {
localKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory
globalKubeSharedInformerFactory kcpkubernetesinformers.SharedInformerFactory

getAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error)
getAPIBinding func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error)
getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error)

managerLock sync.Mutex
managersCache map[logicalcluster.Name]generic.Source
lock sync.RWMutex
cache map[logicalcluster.Name]map[logicalcluster.Name]clusterCache // by request and hook source cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why we need sourceCluster and cluster logicalclusters in this cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its Source and Target? maybe name them this way if this the case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have two informers giving us webhook configuration: locally in the workspace and from the APIExport workspace (cache server).

}

type clusterCache struct {
source generic.Source
plugin *mutating.Plugin
}

var (
Expand All @@ -72,9 +82,8 @@ var (

func NewMutatingAdmissionWebhook(configFile io.Reader) (*Plugin, error) {
p := &Plugin{
managerLock: sync.Mutex{},
managersCache: make(map[logicalcluster.Name]generic.Source),
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
cache: make(map[logicalcluster.Name]map[logicalcluster.Name]clusterCache),
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
}
if configFile != nil {
config, err := io.ReadAll(configFile)
Expand All @@ -100,30 +109,11 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi
}
clusterName := cluster.Name

var config io.Reader
if len(p.config) > 0 {
config = bytes.NewReader(p.config)
}

hookSource, err := p.getHookSource(clusterName, attr.GetResource().GroupResource())
plugin, err := p.getPlugin(clusterName, attr)
if err != nil {
return err
}

plugin, err := mutating.NewMutatingWebhook(config)
if err != nil {
return fmt.Errorf("error creating mutating admission webhook: %w", err)
}

plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path()))
plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))
plugin.SetHookSource(hookSource)
plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))

if err := plugin.ValidateInitialization(); err != nil {
return fmt.Errorf("error validating MutatingWebhook initialization: %w", err)
}

// Add cluster annotation on create
if attr.GetOperation() == admission.Create {
u, ok := attr.GetObject().(metav1.Object)
Expand All @@ -138,40 +128,73 @@ func (p *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admissi
return plugin.Admit(ctx, attr, o)
}

func (p *Plugin) getHookSource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (generic.Source, error) {
clusterNameForGroupResource, err := p.getSourceClusterForGroupResource(clusterName, groupResource)
if err != nil {
return nil, err
func (p *Plugin) getPlugin(clusterName logicalcluster.Name, attr admission.Attributes) (*mutating.Plugin, error) {
var config io.Reader
if len(p.config) > 0 {
config = bytes.NewReader(p.config)
}

p.managerLock.Lock()
defer p.managerLock.Unlock()
if _, ok := p.managersCache[clusterNameForGroupResource]; !ok {
p.managersCache[clusterNameForGroupResource] = configuration.NewMutatingWebhookConfigurationManagerForInformer(
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(clusterNameForGroupResource),
)
// get the APIBinding for the resource, or nil for local resources
gr := attr.GetResource().GroupResource()
binding, err := p.getAPIBinding(clusterName, gr)
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting APIBinding for %q: %w", gr, err)
}
sourceClusterName := clusterName
if binding != nil {
sourceClusterName = logicalcluster.Name(binding.Status.APIExportClusterName)
}

return p.managersCache[clusterNameForGroupResource], nil
}
// fast path
p.lock.RLock()
c, ok := p.cache[clusterName][sourceClusterName]
p.lock.RUnlock()
if ok {
return c.plugin, nil
}

// slow path
p.lock.Lock()
defer p.lock.Unlock()
c, ok = p.cache[clusterName][sourceClusterName]
if ok {
return c.plugin, nil
}

func (p *Plugin) getSourceClusterForGroupResource(clusterName logicalcluster.Name, groupResource schema.GroupResource) (logicalcluster.Name, error) {
objs, err := p.getAPIBindings(clusterName)
// double check that the logical cluster is still alive
if _, err := p.getLogicalCluster(clusterName); err != nil {
return nil, fmt.Errorf("error getting LogicalCluster %q: %w", clusterName, err)
}

// create new plugin for this logical cluster and source cluster
source := configuration.NewMutatingWebhookConfigurationManagerForInformer(
// TODO(sttts): fix supporting local admission webhooks for bound resources as well
p.globalKubeSharedInformerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Cluster(sourceClusterName),
)
plugin, err := mutating.NewMutatingWebhook(config)
if err != nil {
return "", err
return nil, fmt.Errorf("error creating mutaing admission webhook: %w", err)
}
plugin.SetExternalKubeClientSet(p.kubeClusterClient.Cluster(clusterName.Path()))
plugin.SetNamespaceInformer(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))
plugin.SetHookSource(source)
plugin.SetReadyFuncFromKCP(p.localKubeSharedInformerFactory.Core().V1().Namespaces().Cluster(clusterName))

for _, apiBinding := range objs {
for _, br := range apiBinding.Status.BoundResources {
if br.Group == groupResource.Group && br.Resource == groupResource.Resource {
// GroupResource comes from an APIBinding/APIExport
return logicalcluster.Name(apiBinding.Status.APIExportClusterName), nil
}
}
if err := plugin.ValidateInitialization(); err != nil {
return nil, fmt.Errorf("error mutaing MutatingAdmissionWebhook initialization: %w", err)
}

// store in cache
c = clusterCache{
source: source,
plugin: plugin,
}
if _, ok := p.cache[clusterName]; !ok {
p.cache[clusterName] = map[logicalcluster.Name]clusterCache{}
}
p.cache[clusterName][sourceClusterName] = c

// GroupResource is local to this cluster
return clusterName, nil
return c.plugin, nil
}

func (p *Plugin) ValidateInitialization() error {
Expand All @@ -196,8 +219,71 @@ func (p *Plugin) SetKubeInformers(local, global kcpkubernetesinformers.SharedInf
p.globalKubeSharedInformerFactory = global
}

func (p *Plugin) SetKcpInformers(local, global kcpinformers.SharedInformerFactory) {
p.getAPIBindings = func(clusterName logicalcluster.Name) ([]*apisv1alpha1.APIBinding, error) {
return local.Apis().V1alpha1().APIBindings().Lister().Cluster(clusterName).List(labels.Everything())
func (p *Plugin) SetKcpInformers(local, _ kcpinformers.SharedInformerFactory) {
// watch APIBindings
_ = local.Apis().V1alpha1().APIBindings().Informer().AddIndexers(cache.Indexers{
indexers.APIBindingByBoundResources: indexers.IndexAPIBindingByBoundResources,
indexers.APIBindingsByAPIExportCluster: indexers.IndexAPIBindingsByAPIExportCluster,
}) // ignore conflict
p.getAPIBinding = func(clusterName logicalcluster.Name, gr schema.GroupResource) (*apisv1alpha1.APIBinding, error) {
key := indexers.APIBindingBoundResourceValue(clusterName, gr.Resource, gr.Group)
objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingByBoundResources, key)
if err != nil {
return nil, fmt.Errorf("error getting APIBindings by bound resources: %w", err)
}
switch len(objs) {
case 0:
return nil, kerrors.NewNotFound(apisv1alpha1.Resource("APIBinding"), key)
case 1:
return objs[0].(*apisv1alpha1.APIBinding), nil
default:
// should never happen
return nil, fmt.Errorf("found multiple APIBindings for bound resources %q", key)
}
}
_, err := local.Apis().V1alpha1().APIBindings().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
p.lock.Lock()
defer p.lock.Unlock()

// delete if there is no other binding from the same logical cluster
binding := obj.(*apisv1alpha1.APIBinding)
key := indexers.APIBindingByBoundResourceValue(logicalcluster.From(binding), logicalcluster.Name(binding.Status.APIExportClusterName))
objs, err := local.Apis().V1alpha1().APIBindings().Informer().GetIndexer().ByIndex(indexers.APIBindingsByAPIExportCluster, key)
if err != nil {
runtime.HandleError(fmt.Errorf("error getting APIBindings by APIExportCluster: %w", err))
return
}
foundOther := false
for _, obj := range objs {
otherBinding := obj.(*apisv1alpha1.APIBinding)
if otherBinding.Name != binding.Name {
foundOther = true
break
}
}
if !foundOther {
delete(p.cache[logicalcluster.From(binding)], logicalcluster.Name(binding.Status.APIExportClusterName))
}
},
})
if err != nil {
runtime.HandleError(fmt.Errorf("error adding APIBinding delete event handler: %w", err))
}

// watch logical clusters
p.getLogicalCluster = func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return local.Core().V1alpha1().LogicalClusters().Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName)
}
_, err = local.Core().V1alpha1().LogicalClusters().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
clusterName := logicalcluster.From(obj.(*corev1alpha1.LogicalCluster))
p.lock.Lock()
defer p.lock.Unlock()
delete(p.cache, clusterName)
},
})
if err != nil {
runtime.HandleError(fmt.Errorf("error adding LogicalCluster delete event handler: %w", err))
}
}
Loading
Loading