diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index eb2bb010f92..6aac944d3f3 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +37,7 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" @@ -144,6 +146,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon return fmt.Errorf("error removing unwanted Subscriptions: %w", err) } + // Reconcile EventPolicies for the parallel. + if err := r.reconcileEventPolicies(ctx, p, ingressChannel, channels, filterSubs, featureFlags); err != nil { + return fmt.Errorf("failed to reconcile EventPolicies for Parallel: %w", err) + } + err := auth.UpdateStatusWithEventPolicies(featureFlags, &p.Status.AppliedEventPoliciesStatus, &p.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) if err != nil { return fmt.Errorf("could not update parallel status with EventPolicies: %v", err) @@ -349,3 +356,141 @@ func (r *Reconciler) removeUnwantedSubscriptions(ctx context.Context, p *v1.Para return nil } + +func (r *Reconciler) reconcileEventPolicies(ctx context.Context, p *v1.Parallel, ingressChannel *duckv1.Channelable, + channels []*duckv1.Channelable, filterSubs []*messagingv1.Subscription, featureFlags feature.Flags) error { + + if !featureFlags.IsOIDCAuthentication() { + return r.cleanupAllEventPolicies(ctx, p) + } + // list all the existing event policies for the parallel. + existingPolicies, err := r.listEventPoliciesForParallel(p) + if err != nil { + return fmt.Errorf("failed to list existing event policies for parallel: %w", err) + } + // make a map of existing event policies for easy and efficient lookup. + existingPolicyMap := make(map[string]*eventingv1alpha1.EventPolicy) + for _, policy := range existingPolicies { + existingPolicyMap[policy.Name] = policy + } + + // prepare the list of event policies to create, update and delete. + var policiesToCreate, policiesToUpdate []*eventingv1alpha1.EventPolicy + policiesToDelete := make([]*eventingv1alpha1.EventPolicy, 0, len(existingPolicyMap)) + + for i, channel := range channels { + filterSub := filterSubs[i] + expectedPolicy := resources.MakeEventPolicyForParallelChannel(p, channel, filterSub) + if existingPolicy, ok := existingPolicyMap[expectedPolicy.Name]; ok { + if !equality.Semantic.DeepDerivative(expectedPolicy, existingPolicy) { + expectedPolicy.SetResourceVersion(existingPolicy.ResourceVersion) + policiesToUpdate = append(policiesToUpdate, expectedPolicy) + } + delete(existingPolicyMap, expectedPolicy.Name) + } else { + policiesToCreate = append(policiesToCreate, expectedPolicy) + } + } + + // prepare the event policies for the ingress channel. + ingressChannelEventPolicies, err := r.prepareIngressChannelEventpolicies(p, ingressChannel) + if err != nil { + return fmt.Errorf("failed to prepare event policies for ingress channel: %w", err) + } + + for _, policy := range ingressChannelEventPolicies { + if existingIngressChannelPolicy, ok := existingPolicyMap[policy.Name]; ok { + if !equality.Semantic.DeepDerivative(policy, existingIngressChannelPolicy) { + policy.SetResourceVersion(existingIngressChannelPolicy.ResourceVersion) + policiesToUpdate = append(policiesToUpdate, policy) + } + delete(existingPolicyMap, policy.Name) + } else { + policiesToCreate = append(policiesToCreate, policy) + } + } + + // delete the remaining event policies in the map. + for _, policy := range existingPolicyMap { + policiesToDelete = append(policiesToDelete, policy) + } + + // now that we have the list of event policies to create, update and delete, we can perform the operations. + if err := r.createEventPolicies(ctx, policiesToCreate); err != nil { + return fmt.Errorf("failed to create event policies: %w", err) + } + if err := r.updateEventPolicies(ctx, policiesToUpdate); err != nil { + return fmt.Errorf("failed to update event policies: %w", err) + } + if err := r.deleteEventPolicies(ctx, policiesToDelete); err != nil { + return fmt.Errorf("failed to delete event policies: %w", err) + } + + return nil +} + +func (r *Reconciler) createEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Create(ctx, policy, metav1.CreateOptions{}) + if err != nil { + return err + } + } + return nil +} + +func (r *Reconciler) updateEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Update(ctx, policy, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + return nil +} + +func (r *Reconciler) deleteEventPolicies(ctx context.Context, policies []*eventingv1alpha1.EventPolicy) error { + for _, policy := range policies { + err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{}) + if err != nil && !apierrs.IsNotFound(err) { + return err + } + } + return nil +} + +func (r *Reconciler) prepareIngressChannelEventpolicies(p *v1.Parallel, ingressChannel *duckv1.Channelable) ([]*eventingv1alpha1.EventPolicy, error) { + applyingEventPoliciesForParallel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta) + if err != nil { + return nil, fmt.Errorf("could not get EventPolicies for Parallel %s/%s: %w", p.Namespace, p.Name, err) + } + + if len(applyingEventPoliciesForParallel) == 0 { + return nil, nil + } + + ingressChannelEventPolicies := make([]*eventingv1alpha1.EventPolicy, 0, len(applyingEventPoliciesForParallel)) + for _, eventPolicy := range applyingEventPoliciesForParallel { + ingressChannelEventPolicies = append(ingressChannelEventPolicies, resources.MakeEventPolicyForParallelIngressChannel(p, ingressChannel, eventPolicy)) + } + + return ingressChannelEventPolicies, nil +} + +func (r *Reconciler) cleanupAllEventPolicies(ctx context.Context, p *v1.Parallel) error { + // list all the event policies for the parallel. + eventPolicies, err := r.listEventPoliciesForParallel(p) + if err != nil { + return err + } + return r.deleteEventPolicies(ctx, eventPolicies) +} + +// listEventPoliciesForParallel lists all EventPolicies (e.g. the policies for the input channel and the intermediate channels) +// created during reconcileKind that are associated with the given Parallel. +func (r *Reconciler) listEventPoliciesForParallel(p *v1.Parallel) ([]*eventingv1alpha1.EventPolicy, error) { + labelSelector := labels.SelectorFromSet(map[string]string{ + resources.ParallelChannelEventPolicyLabelPrefix + "parallel-name": p.Name, + }) + return r.eventPolicyLister.EventPolicies(p.Namespace).List(labelSelector) +} diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 1c4fe0310ee..e467b7b6d7c 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -35,6 +35,8 @@ import ( clientgotesting "k8s.io/client-go/testing" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/apis" @@ -65,7 +67,7 @@ var ( subscriberGVK = metav1.GroupVersionKind{ Group: "messaging.knative.dev", Version: "v1", - Kind: "Subscriber", + Kind: "Subscription", } parallelGVK = metav1.GroupVersionKind{ @@ -73,6 +75,12 @@ var ( Version: "v1", Kind: "Parallel", } + + channelV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "InMemoryChannel", + } ) func TestAllBranches(t *testing.T) { @@ -708,185 +716,815 @@ func TestAllBranches(t *testing.T) { SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), }})), }}, - }, - } - - logger := logtesting.TestLogger(t) - table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { - ctx = channelable.WithDuck(ctx) - r := &Reconciler{ - parallelLister: listers.GetParallelLister(), - channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), - subscriptionLister: listers.GetSubscriptionLister(), - eventingClientSet: fakeeventingclient.Get(ctx), - dynamicClientSet: fakedynamicclient.Get(ctx), - eventPolicyLister: listers.GetEventPolicyLister(), - } - return parallel.NewReconciler(ctx, logging.FromContext(ctx), - fakeeventingclient.Get(ctx), listers.GetParallelLister(), - controller.GetEventRecorder(ctx), r) - }, false, logger)) -} - -func createBranchReplyChannel(caseNumber int) *duckv1.Destination { - return &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: fmt.Sprintf("%s-case-%d", replyChannelName, caseNumber), - Namespace: testNS, - }, - } -} - -func createReplyChannel(channelName string) *duckv1.Destination { - return &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: channelName, - Namespace: testNS, - }, - } -} - -func createChannel(parallelName string) *unstructured.Unstructured { - return &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "messaging.knative.dev/v1", - "kind": "InMemoryChannel", - "metadata": map[string]interface{}{ - "creationTimestamp": nil, - "namespace": testNS, - "name": resources.ParallelChannelName(parallelName), - "ownerReferences": []interface{}{ - map[string]interface{}{ - "apiVersion": "flows.knative.dev/v1", - "blockOwnerDeletion": true, - "controller": true, - "kind": "Parallel", - "name": parallelName, - "uid": "", - }, - }, + }, { + Name: "AuthZ Enabled with single branch, with filter, no EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))}, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), }, - "spec": map[string]interface{}{}, - }, - } -} - -func createBranchChannel(parallelName string, caseNumber int) *unstructured.Unstructured { - return &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "messaging.knative.dev/v1", - "kind": "InMemoryChannel", - "metadata": map[string]interface{}{ - "creationTimestamp": nil, - "namespace": testNS, - "name": resources.ParallelBranchChannelName(parallelName, caseNumber), - "ownerReferences": []interface{}{ - map[string]interface{}{ - "apiVersion": "flows.knative.dev/v1", - "blockOwnerDeletion": true, - "controller": true, - "kind": "Parallel", - "name": parallelName, - "uid": "", - }, - }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "AuthZ Enabled with single branch, with filter, with Parallel EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), }, - "spec": map[string]interface{}{}, - }, - } -} - -func createParallelBranchChannelStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelChannelStatus { - return v1.ParallelChannelStatus{ - Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: resources.ParallelBranchChannelName(parallelName, caseNumber), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: status, - Reason: "NotAddressable", - Message: "Channel is not addressable", - }, - } -} - -func createParallelChannelStatus(parallelName string, status corev1.ConditionStatus) v1.ParallelChannelStatus { - return v1.ParallelChannelStatus{ - Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: resources.ParallelChannelName(parallelName), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: status, - Reason: "NotAddressable", - Message: "Channel is not addressable", - }, - } -} - -func createParallelFilterSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { - return v1.ParallelSubscriptionStatus{ - Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "Subscription", - Name: resources.ParallelFilterSubscriptionName(parallelName, caseNumber), - Namespace: testNS, - }, - } -} - -func createParallelSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { - return v1.ParallelSubscriptionStatus{ - Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "Subscription", - Name: resources.ParallelSubscriptionName(parallelName, caseNumber), - Namespace: testNS, - }, - } -} - -func createSubscriber(caseNumber int) duckv1.Destination { - uri := apis.HTTP(fmt.Sprintf("example.com/%d", caseNumber)) - return duckv1.Destination{ - URI: uri, - } -} - -func createFilter(caseNumber int) *duckv1.Destination { - uri := apis.HTTP(fmt.Sprintf("example.com/filter-%d", caseNumber)) - return &duckv1.Destination{ - URI: uri, - } -} - -func apiVersion(gvk metav1.GroupVersionKind) string { - groupVersion := gvk.Version - if gvk.Group != "" { - groupVersion = gvk.Group + "/" + gvk.Version - } - return groupVersion -} - -func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventingduckv1.DeliverySpec { - return &eventingduckv1.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{ - Ref: &duckv1.KReference{ - APIVersion: apiVersion(gvk), - Kind: gvk.Kind, - Name: name, - Namespace: namespace, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), }, - }, - } + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "AuthZ Enabled two branches, no filters, no EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))}, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "AuthZ Enabled two branches, no filters, with EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, + { + Name: "two branches, update: remove one branch, with AuthZ enabled and parallel doesn't have EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }, + })), + + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }))), + + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + }, + WantErr: false, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelSubscriptionName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelFilterSubscriptionName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("inmemorychannels"), + }, + Name: resources.ParallelBranchChannelName(parallelName, 1), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, resources.ParallelBranchChannelName(parallelName, 1)), + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "three branches, update: remove one branch, with AuthZ enabled and parallel doesn't have EventPolicies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }, + { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 2, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 2, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 2, corev1.ConditionFalse), + }, + })), + + createChannel(parallelName), + createBranchChannel(parallelName, 0), + createBranchChannel(parallelName, 1), + createBranchChannel(parallelName, 2), + + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + + resources.NewSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(1, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewSubscription(2, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + resources.NewFilterSubscription(2, NewFlowsParallel(parallelName, testNS, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + {Subscriber: createSubscriber(2)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 1), 1), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 2), 2), + }, + WantErr: false, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelSubscriptionName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("subscriptions"), + }, + Name: resources.ParallelFilterSubscriptionName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("inmemorychannels"), + }, + Name: resources.ParallelBranchChannelName(parallelName, 2), + }, { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, resources.ParallelBranchChannelName(parallelName, 2)), + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Subscriber: createSubscriber(0)}, + {Subscriber: createSubscriber(1)}, + }), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }, { + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 1, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 1, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "Parallel Event Policy Deleted, corresponding Ingress Channel Policy should be deleted", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + }, + WantDeletes: []clientgotesting.DeleteActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + Resource: v1.SchemeGroupVersion.WithResource("eventpolicies"), + }, + Name: resources.ParallelEventPolicyName(parallelName, readyEventPolicyName+"-1"), + }, + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, { + Name: "Parallel with multiple event policies", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-1", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + NewEventPolicy(readyEventPolicyName+"-2", testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-1"), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName+"-2"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName, readyEventPolicyName+"-1", readyEventPolicyName+"-2"), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + }, + } + + logger := logtesting.TestLogger(t) + table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { + ctx = channelable.WithDuck(ctx) + r := &Reconciler{ + parallelLister: listers.GetParallelLister(), + channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), + subscriptionLister: listers.GetSubscriptionLister(), + eventingClientSet: fakeeventingclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), + eventPolicyLister: listers.GetEventPolicyLister(), + } + return parallel.NewReconciler(ctx, logging.FromContext(ctx), + fakeeventingclient.Get(ctx), listers.GetParallelLister(), + controller.GetEventRecorder(ctx), r) + }, false, logger)) +} + +func createBranchReplyChannel(caseNumber int) *duckv1.Destination { + return &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: fmt.Sprintf("%s-case-%d", replyChannelName, caseNumber), + Namespace: testNS, + }, + } +} + +func createReplyChannel(channelName string) *duckv1.Destination { + return &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: channelName, + Namespace: testNS, + }, + } +} + +func createChannel(parallelName string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": resources.ParallelChannelName(parallelName), + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "flows.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Parallel", + "name": parallelName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{}, + }, + } +} + +func createBranchChannel(parallelName string, caseNumber int) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1", + "kind": "InMemoryChannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": resources.ParallelBranchChannelName(parallelName, caseNumber), + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "flows.knative.dev/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Parallel", + "name": parallelName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{}, + }, + } +} + +func createParallelBranchChannelStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelChannelStatus { + return v1.ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: resources.ParallelBranchChannelName(parallelName, caseNumber), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: status, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + } +} + +func createParallelChannelStatus(parallelName string, status corev1.ConditionStatus) v1.ParallelChannelStatus { + return v1.ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: resources.ParallelChannelName(parallelName), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: status, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + } +} + +func createParallelFilterSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { + return v1.ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + Name: resources.ParallelFilterSubscriptionName(parallelName, caseNumber), + Namespace: testNS, + }, + } +} + +func createParallelSubscriptionStatus(parallelName string, caseNumber int, status corev1.ConditionStatus) v1.ParallelSubscriptionStatus { + return v1.ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + Name: resources.ParallelSubscriptionName(parallelName, caseNumber), + Namespace: testNS, + }, + } +} + +func createSubscriber(caseNumber int) duckv1.Destination { + uri := apis.HTTP(fmt.Sprintf("example.com/%d", caseNumber)) + return duckv1.Destination{ + URI: uri, + } +} + +func createFilter(caseNumber int) *duckv1.Destination { + uri := apis.HTTP(fmt.Sprintf("example.com/filter-%d", caseNumber)) + return &duckv1.Destination{ + URI: uri, + } +} + +func apiVersion(gvk metav1.GroupVersionKind) string { + groupVersion := gvk.Version + if gvk.Group != "" { + groupVersion = gvk.Group + "/" + gvk.Version + } + return groupVersion +} + +func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventingduckv1.DeliverySpec { + return &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + }, + } +} + +func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, + WithEventPolicyToRef(channelV1GVK, channelName), + // from a subscription + WithEventPolicyFrom(subscriberGVK, resources.ParallelFilterSubscriptionName(parallelName, branch), testNS), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "flows.knative.dev/v1", + Kind: "Parallel", + Name: parallelName, + }, + }...), + WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), + ) +} + +func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string) *eventingv1alpha1.EventPolicy { + return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS, + WithEventPolicyToRef(channelV1GVK, channelName), + // from a subscription + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: "flows.knative.dev/v1", + Kind: "Parallel", + Name: parallelName, + }, { + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "EventPolicy", + Name: parallelEventPolicyName, + }, + }...), + WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), + ) } diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go new file mode 100644 index 00000000000..b2c1b0a71f6 --- /dev/null +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/pkg/kmeta" +) + +const ( + ParallelChannelEventPolicyLabelPrefix = "flows.knative.dev/" + parallelKind = "Parallel" +) + +func MakeEventPolicyForParallelChannel(p *flowsv1.Parallel, channel *eventingduckv1.Channelable, subscription *messagingv1.Subscription) *eventingv1alpha1.EventPolicy { + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: channel.Namespace, + Name: ParallelEventPolicyName(p.Name, channel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: flowsv1.SchemeGroupVersion.String(), + Kind: parallelKind, + Name: p.Name, + UID: p.UID, + }, + }, + Labels: LabelsForParallelChannelsEventPolicy(p.Name), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: channel.APIVersion, + Kind: channel.Kind, + Name: channel.Name, + }, + }, + }, + From: []eventingv1alpha1.EventPolicySpecFrom{ + { + Ref: &eventingv1alpha1.EventPolicyFromReference{ + APIVersion: messagingv1.SchemeGroupVersion.String(), + Kind: "Subscription", + Name: subscription.Name, + Namespace: subscription.Namespace, + }, + }, + }, + }, + } +} + +func LabelsForParallelChannelsEventPolicy(parallelName string) map[string]string { + return map[string]string{ + ParallelChannelEventPolicyLabelPrefix + "parallel-name": parallelName, + } +} + +// ParallelEventPolicyName returns the name of the EventPolicy for the Parallel. +// suffix is either channel name or parent event policy name. +func ParallelEventPolicyName(parallelName, suffix string) string { + return kmeta.ChildName(parallelName, "-ep-"+suffix) +} + +// MakeEventPolicyForParallelIngressChannel creates an EventPolicy for the ingress channel of a Parallel. +func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChannel *eventingduckv1.Channelable, parallelPolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy { + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: ingressChannel.Namespace, + Name: ParallelEventPolicyName(p.Name, parallelPolicy.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: flowsv1.SchemeGroupVersion.String(), + Kind: parallelKind, + Name: p.Name, + UID: p.UID, + }, { + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "EventPolicy", + Name: parallelPolicy.Name, + UID: parallelPolicy.UID, + }, + }, + Labels: LabelsForParallelChannelsEventPolicy(p.Name), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: ingressChannel.APIVersion, + Kind: ingressChannel.Kind, + Name: ingressChannel.Name, + }, + }, + }, + From: parallelPolicy.Spec.From, + }, + } +}