Skip to content

Commit

Permalink
Make EventPolicy EventHandler generic
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Jun 19, 2024
1 parent fd35309 commit 039a5b9
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 82 deletions.
113 changes: 113 additions & 0 deletions pkg/reconciler/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
)

// enqueueApplyingResourcesOfEventPolicy checks if the given GVK is referenced in the given EventPolicy.
// If so, it enqueues it into the enqueueFn().
func enqueueApplyingResourcesOfEventPolicy(indexer cache.Indexer, gvk schema.GroupVersionKind, policyObj interface{}, enqueueFn func(key types.NamespacedName)) {
eventPolicy, ok := policyObj.(*v1alpha1.EventPolicy)
if !ok {
return
}

for _, to := range eventPolicy.Spec.To {
if to.Ref != nil {
toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion)
if err != nil {
continue
}

if strings.EqualFold(toGV.Group, gvk.Group) &&
strings.EqualFold(to.Ref.Kind, gvk.Kind) {

enqueueFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: to.Ref.Name,
})
}
}

if to.Selector != nil {
selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion)
if err != nil {
continue
}

if strings.EqualFold(selectorGV.Group, gvk.Group) &&
strings.EqualFold(to.Selector.Kind, gvk.Kind) {

selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector)
if err != nil {
continue
}

resources := []metav1.Object{}
err = cache.ListAllByNamespace(indexer, eventPolicy.Namespace, selector, func(i interface{}) {
resources = append(resources, i.(metav1.Object))
})
if err != nil {
continue
}

for _, resource := range resources {
enqueueFn(types.NamespacedName{
Namespace: resource.GetNamespace(),
Name: resource.GetName(),
})
}
}
}
}
}

// EventPolicyEventHandler returns an ResourceEventHandler, which enqueues the referencing resources of the EventPolicy
// if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK.
func EventPolicyEventHandler(indexer cache.Indexer, gvk schema.GroupVersionKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Here we need to check if the old or the new EventPolicy was referencing the InMemoryChannel

// make sure, we enqueue the keys only once
toEnqueue := map[types.NamespacedName]struct{}{}
addToEnqueueList := func(key types.NamespacedName) {
toEnqueue[key] = struct{}{}
}
enqueueApplyingResourcesOfEventPolicy(indexer, gvk, oldObj, addToEnqueueList)
enqueueApplyingResourcesOfEventPolicy(indexer, gvk, newObj, addToEnqueueList)

for k := range toEnqueue {
enqueueFn(k)
}
},
DeleteFunc: func(obj interface{}) {
enqueueApplyingResourcesOfEventPolicy(indexer, gvk, obj, enqueueFn)
},
}
}
87 changes: 5 additions & 82 deletions pkg/reconciler/inmemorychannel/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ package controller

import (
"context"
"strings"

"github.com/kelseyhightower/envconfig"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/messaging"
v1 "knative.dev/eventing/pkg/client/listers/messaging/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/reconciler"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -149,24 +144,11 @@ func NewController(
Handler: controller.HandleAll(globalResync),
})

imcGVK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel")

// Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing
// or got updated and now is referencing the InMemoryChannel
eventPolicyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// Here we need to check if either the old or the new EventPolicy was referencing the InMemoryChannel

alreadyEnqueued := enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), oldObj, impl.EnqueueKey)
if !alreadyEnqueued {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), newObj, impl.EnqueueKey)
}
},
DeleteFunc: func(obj interface{}) {
enqueueApplyingChannelOfEventPolicy(inmemorychannelInformer.Lister(), obj, impl.EnqueueKey)
},
})
eventPolicyInformer.Informer().AddEventHandler(reconciler.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGVK, impl.EnqueueKey))

// Setup the watch on the config map of dispatcher config
configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx))
Expand All @@ -175,62 +157,3 @@ func NewController(

return impl
}

// enqueueApplyingChannelOfEventPolicy checks if an InMemoryChannel is referenced in the given EventPolicy.
// If so, it enqueues the channel into the enqueueFn() and returns true.
func enqueueApplyingChannelOfEventPolicy(imcLister v1.InMemoryChannelLister, obj interface{}, enqueueFn func(key types.NamespacedName)) bool {
eventPolicy, ok := obj.(*v1alpha1.EventPolicy)
if !ok {
return false
}

for _, to := range eventPolicy.Spec.To {
if to.Ref != nil {
toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion)
if err != nil {
return false
}

if strings.EqualFold(toGV.Group, messaging.GroupName) &&
strings.EqualFold(to.Ref.Kind, "InMemoryChannel") {

enqueueFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: to.Ref.Name,
})
return true
}
}

if to.Selector != nil {
selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion)
if err != nil {
return false
}

if strings.EqualFold(selectorGV.Group, messaging.GroupName) &&
strings.EqualFold(to.Selector.Kind, "InMemoryChannel") {

selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector)
if err != nil {
return false
}

imcs, err := imcLister.InMemoryChannels(eventPolicy.Namespace).List(selector)
if err != nil {
return false
}

for _, imc := range imcs {
enqueueFn(types.NamespacedName{
Namespace: eventPolicy.Namespace,
Name: imc.Name,
})
}
return true
}
}
}

return false
}

0 comments on commit 039a5b9

Please sign in to comment.