Skip to content

Commit

Permalink
Add EventPolicy reconciliation for Parallel (#8112)
Browse files Browse the repository at this point in the history
* create eventpolicies for parallel channel

Signed-off-by: rahulii <[email protected]>

* reconcile event policies for parallel

Signed-off-by: rahulii <[email protected]>

* add unit test cases

Signed-off-by: rahulii <[email protected]>

* add more unit test cases

Signed-off-by: rahulii <[email protected]>

* remove unused function

Signed-off-by: rahulii <[email protected]>

* add more unit test cases

Signed-off-by: rahulii <[email protected]>

* add more unit test case - with multiple event policies

Signed-off-by: rahulii <[email protected]>

* fix gofmt

Signed-off-by: rahulii <[email protected]>

* add more test cases

Signed-off-by: rahulii <[email protected]>

* sort the slice for deterministic order in unit tests

Signed-off-by: rahulii <[email protected]>

* minor fix

Signed-off-by: rahulii <[email protected]>

* add more test cases

Signed-off-by: rahulii <[email protected]>

* uid in ownerref

Signed-off-by: rahulii <[email protected]>

* fix review comments from Chris

Signed-off-by: rahulii <[email protected]>

* add parallel policy as owner references

Signed-off-by: rahulii <[email protected]>

* fix e2e test

Signed-off-by: rahulii <[email protected]>

* remove sorting of eventpolicies

Signed-off-by: rahulii <[email protected]>

---------

Signed-off-by: rahulii <[email protected]>
  • Loading branch information
rahulii authored Aug 9, 2024
1 parent d69b8b4 commit c521efb
Show file tree
Hide file tree
Showing 3 changed files with 1,080 additions and 179 deletions.
145 changes: 145 additions & 0 deletions pkg/reconciler/parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -36,6 +37,7 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand Down Expand Up @@ -144,6 +146,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
return fmt.Errorf("error removing unwanted Subscriptions: %w", err)
}

// Reconcile EventPolicies for the parallel.
if err := r.reconcileEventPolicies(ctx, p, ingressChannel, channels, filterSubs, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicies for Parallel: %w", err)
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update parallel status with EventPolicies: %v", err)
Expand Down Expand Up @@ -349,3 +356,141 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, p *v1.Para

return nil
}

func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, ingressChannel *duckv1.Channelable,
channels []*duckv1.Channelable, filterSubs []*messagingv1.Subscription, featureFlags feature.Flags) error {

if !featureFlags.IsOIDCAuthentication() {
return r.cleanupAllEventPolicies(ctx, p)
}
// list all the existing event policies for the parallel.
existingPolicies, err := r.listEventPoliciesForParallel(p)
if err != nil {
return fmt.Errorf("failed to list existing event policies for parallel: %w", err)
}
// make a map of existing event policies for easy and efficient lookup.
existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy)
for _, policy := range existingPolicies {
existingPolicyMap[policy.Name] = policy
}

// prepare the list of event policies to create, update and delete.
var policiesToCreate, policiesToUpdate []*eventingv1alpha1.EventPolicy
policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap))

for i, channel := range channels {
filterSub := filterSubs[i]
expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub)
if existingPolicy, ok := existingPolicyMap[expectedPolicy.Name]; ok {
if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) {
expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, expectedPolicy)
}
delete(existingPolicyMap, expectedPolicy.Name)
} else {
policiesToCreate = append(policiesToCreate, expectedPolicy)
}
}

// prepare the event policies for the ingress channel.
ingressChannelEventPolicies, err := r.prepareIngressChannelEventpolicies(p, ingressChannel)
if err != nil {
return fmt.Errorf("failed to prepare event policies for ingress channel: %w", err)
}

for _, policy := range ingressChannelEventPolicies {
if existingIngressChannelPolicy, ok := existingPolicyMap[policy.Name]; ok {
if !equality.Semantic.DeepDerivative(policy, existingIngressChannelPolicy) {
policy.SetResourceVersion(existingIngressChannelPolicy.ResourceVersion)
policiesToUpdate = append(policiesToUpdate, policy)
}
delete(existingPolicyMap, policy.Name)
} else {
policiesToCreate = append(policiesToCreate, policy)
}
}

// delete the remaining event policies in the map.
for _, policy := range existingPolicyMap {
policiesToDelete = append(policiesToDelete, policy)
}

// now that we have the list of event policies to create, update and delete, we can perform the operations.
if err := r.createEventPolicies(ctx, policiesToCreate); err != nil {
return fmt.Errorf("failed to create event policies: %w", err)
}
if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil {
return fmt.Errorf("failed to update event policies: %w", err)
}
if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil {
return fmt.Errorf("failed to delete event policies: %w", err)
}

return nil
}

func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
_, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}

func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error {
for _, policy := range policies {
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
}
return nil
}

func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressChannel *duckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) {
applyingEventPoliciesForParallel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
if err != nil {
return nil, fmt.Errorf("could not get EventPolicies for Parallel %s/%s: %w", p.Namespace, p.Name, err)
}

if len(applyingEventPoliciesForParallel) == 0 {
return nil, nil
}

ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel))
for _, eventPolicy := range applyingEventPoliciesForParallel {
ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy))
}

return ingressChannelEventPolicies, nil
}

func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, p *v1.Parallel) error {
// list all the event policies for the parallel.
eventPolicies, err := r.listEventPoliciesForParallel(p)
if err != nil {
return err
}
return r.deleteEventPolicies(ctx, eventPolicies)
}

// listEventPoliciesForParallel lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels)
// created during reconcileKind that are associated with the given Parallel.
func (r *Reconciler) listEventPoliciesForParallel(p *v1.Parallel) ([]*eventingv1alpha1.EventPolicy, error) {
labelSelector := labels.SelectorFromSet(map[string]string{
resources.ParallelChannelEventPolicyLabelPrefix + "parallel-name": p.Name,
})
return r.eventPolicyLister.EventPolicies(p.Namespace).List(labelSelector)
}
Loading

0 comments on commit c521efb

Please sign in to comment.