diff --git a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml index 9e1ab6812b4..6164e834f41 100644 --- a/config/channels/in-memory-channel/roles/controller-clusterrole.yaml +++ b/config/channels/in-memory-channel/roles/controller-clusterrole.yaml @@ -45,6 +45,14 @@ rules: - inmemorychannels verbs: - patch + - apiGroups: + - eventing.knative.dev + resources: + - eventpolicies + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go b/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go index 6be9e29f338..3b6441a305a 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go +++ b/pkg/apis/messaging/v1/in_memory_channel_lifecycle.go @@ -33,6 +33,7 @@ var imcCondSet = apis.NewLivingConditionSet( InMemoryChannelConditionAddressable, InMemoryChannelConditionChannelServiceReady, InMemoryChannelConditionDeadLetterSinkResolved, + InMemoryChannelConditionEventPoliciesReady, ) const ( @@ -64,6 +65,10 @@ const ( // InMemoryChannelConditionDeadLetterSinkResolved has status True when there is a Dead Letter Sink ref or URI // defined in the Spec.Delivery, is a valid destination and its correctly resolved into a valid URI InMemoryChannelConditionDeadLetterSinkResolved apis.ConditionType = "DeadLetterSinkResolved" + + // InMemoryChannelConditionEventPoliciesReady has status True when all the applying EventPolicies for this + // InMemoryChannel are ready. + InMemoryChannelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -182,3 +187,19 @@ func (imcs *InMemoryChannelStatus) MarkDeadLetterSinkResolvedFailed(reason, mess imcs.DeliveryStatus = eventingduck.DeliveryStatus{} imcCondSet.Manage(imcs).MarkFalse(InMemoryChannelConditionDeadLetterSinkResolved, reason, messageFormat, messageA...) } + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkFalse(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkUnknown(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesTrue() { + imcCondSet.Manage(imcs).MarkTrue(InMemoryChannelConditionEventPoliciesReady) +} + +func (imcs *InMemoryChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + imcCondSet.Manage(imcs).MarkTrueWithReason(InMemoryChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go b/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go index 7c97a86ed21..f1785342672 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1/in_memory_channel_lifecycle_test.go @@ -137,6 +137,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -177,6 +180,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -217,6 +223,9 @@ func TestInMemoryChannelInitializeConditions(t *testing.T) { }, { Type: InMemoryChannelConditionEndpointsReady, Status: corev1.ConditionUnknown, + }, { + Type: InMemoryChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: InMemoryChannelConditionReady, Status: corev1.ConditionUnknown, @@ -244,6 +253,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name string markServiceReady bool markChannelServiceReady bool + markEventPolicyReady bool setAddress bool markEndpointsReady bool DLSResolved *bool @@ -253,6 +263,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "all happy", markServiceReady: true, markChannelServiceReady: true, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -262,6 +273,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "service not ready", markServiceReady: false, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -271,6 +283,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "endpoints not ready", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: false, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -281,6 +294,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { markServiceReady: true, markEndpointsReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, dispatcherStatus: deploymentStatusNotReady, setAddress: true, wantReady: false, @@ -289,6 +303,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "address not set", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: false, @@ -298,6 +313,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "channel service not ready", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -307,6 +323,7 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "dls sad", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -316,6 +333,17 @@ func TestInMemoryChannelIsReady(t *testing.T) { name: "dls not configured", markServiceReady: true, markChannelServiceReady: false, + markEventPolicyReady: true, + markEndpointsReady: true, + dispatcherStatus: deploymentStatusReady, + setAddress: true, + wantReady: false, + DLSResolved: &trueVal, + }, { + name: "EventPolicy not ready", + markServiceReady: true, + markChannelServiceReady: true, + markEventPolicyReady: false, markEndpointsReady: true, dispatcherStatus: deploymentStatusReady, setAddress: true, @@ -336,6 +364,11 @@ func TestInMemoryChannelIsReady(t *testing.T) { } else { cs.MarkChannelServiceFailed("NotReadyChannelService", "testing") } + if test.markEventPolicyReady { + cs.MarkEventPoliciesTrue() + } else { + cs.MarkEndpointsFailed("NotReadyEventPolicy", "testing") + } if test.setAddress { cs.SetAddress(&duckv1.Addressable{URL: &apis.URL{Scheme: "http", Host: "foo.bar"}}) } @@ -437,6 +470,7 @@ func TestInMemoryChannelStatus_SetAddressable(t *testing.T) { func ReadyBrokerStatusWithoutDLS() *InMemoryChannelStatus { imcs := &InMemoryChannelStatus{} imcs.MarkChannelServiceTrue() + imcs.MarkEventPoliciesTrue() imcs.MarkDeadLetterSinkNotConfigured() imcs.MarkEndpointsTrue() imcs.SetAddress(&duckv1.Addressable{URL: apis.HTTP("example.com")}) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 26efd163409..e049772f135 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -20,6 +20,9 @@ import ( "fmt" "strings" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -86,6 +89,71 @@ func GetEventPoliciesForResource(lister listerseventingv1alpha1.EventPolicyListe return relevantPolicies, nil } +// GetApplyingResourcesOfEventPolicyForGK returns all applying resource names of GK of the given event policy. +// It returns only the names, as the resources are part of the same namespace as the event policy. +// +// This function is kind of the "inverse" of GetEventPoliciesForResource. +func GetApplyingResourcesOfEventPolicyForGK(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, gkIndexer cache.Indexer) ([]string, error) { + applyingResources := map[string]struct{}{} + + if eventPolicy.Spec.To == nil { + // empty .spec.to matches everything in namespace + + err := cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, labels.Everything(), func(i interface{}) { + name := i.(metav1.Object).GetName() + applyingResources[name] = struct{}{} + }) + if err != nil { + return nil, fmt.Errorf("failed to list all %s %s resources in %s: %w", gk.Group, gk.Kind, eventPolicy.Namespace, err) + } + } else { + for _, to := range eventPolicy.Spec.To { + if to.Ref != nil { + toGV, err := schema.ParseGroupVersion(to.Ref.APIVersion) + if err != nil { + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Ref.APIVersion, err) + } + + if strings.EqualFold(toGV.Group, gk.Group) && + strings.EqualFold(to.Ref.Kind, gk.Kind) { + + applyingResources[to.Ref.Name] = struct{}{} + } + } + + if to.Selector != nil { + selectorGV, err := schema.ParseGroupVersion(to.Selector.APIVersion) + if err != nil { + return nil, fmt.Errorf("could not parse group version of %q: %w", to.Selector.APIVersion, err) + } + + if strings.EqualFold(selectorGV.Group, gk.Group) && + strings.EqualFold(to.Selector.Kind, gk.Kind) { + + selector, err := metav1.LabelSelectorAsSelector(to.Selector.LabelSelector) + if err != nil { + return nil, fmt.Errorf("could not parse label selector %v: %w", to.Selector.LabelSelector, err) + } + + err = cache.ListAllByNamespace(gkIndexer, eventPolicy.Namespace, selector, func(i interface{}) { + name := i.(metav1.Object).GetName() + applyingResources[name] = struct{}{} + }) + if err != nil { + return nil, fmt.Errorf("could not list resources of GK in %q namespace for selector %v: %w", eventPolicy.Namespace, selector, err) + } + } + } + } + } + + res := []string{} + for name := range applyingResources { + res = append(res, name) + } + return res, nil +} + // ResolveSubjects returns the OIDC service accounts names for the objects referenced in the EventPolicySpecFrom. func ResolveSubjects(resolver *resolver.AuthenticatableResolver, eventPolicy *v1alpha1.EventPolicy) ([]string, error) { allSAs := []string{} @@ -145,3 +213,77 @@ func SubjectContained(sub string, allowedSubs []string) bool { return false } + +func handleApplyingResourcesOfEventPolicy(eventPolicy *v1alpha1.EventPolicy, gk schema.GroupKind, indexer cache.Indexer, handlerFn func(key types.NamespacedName) error) error { + applyingResources, err := GetApplyingResourcesOfEventPolicyForGK(eventPolicy, gk, indexer) + if err != nil { + return fmt.Errorf("could not get applying resources of eventpolicy: %w", err) + } + + for _, resourceName := range applyingResources { + err := handlerFn(types.NamespacedName{ + Namespace: eventPolicy.Namespace, + Name: resourceName, + }) + + if err != nil { + return fmt.Errorf("could not handle resource %q: %w", resourceName, err) + } + } + + return nil +} + +// EventPolicyEventHandler returns an ResourceEventHandler, which passes the referencing resources of the EventPolicy +// to the enqueueFn if the EventPolicy was referencing or got updated and now is referencing the resource of the given GVK. +func EventPolicyEventHandler(indexer cache.Indexer, gk schema.GroupKind, enqueueFn func(key types.NamespacedName)) cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eventPolicy, ok := obj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error { + enqueueFn(key) + return nil + }) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // Here we need to check if the old or the new EventPolicy was referencing the given GVK + oldEventPolicy, ok := oldObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + newEventPolicy, ok := newObj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + // make sure, we handle the keys only once + toHandle := map[types.NamespacedName]struct{}{} + addToHandleList := func(key types.NamespacedName) error { + toHandle[key] = struct{}{} + return nil + } + + handleApplyingResourcesOfEventPolicy(oldEventPolicy, gk, indexer, addToHandleList) + handleApplyingResourcesOfEventPolicy(newEventPolicy, gk, indexer, addToHandleList) + + for k := range toHandle { + enqueueFn(k) + } + }, + DeleteFunc: func(obj interface{}) { + eventPolicy, ok := obj.(*v1alpha1.EventPolicy) + if !ok { + return + } + + handleApplyingResourcesOfEventPolicy(eventPolicy, gk, indexer, func(key types.NamespacedName) error { + enqueueFn(key) + return nil + }) + }, + } +} diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index 64f972c7ba6..124f1423173 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -18,9 +18,12 @@ package auth import ( "context" + "reflect" "strings" "testing" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,6 +33,7 @@ import ( "knative.dev/eventing/pkg/apis/eventing/v1alpha1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" "knative.dev/eventing/pkg/client/clientset/versioned/scheme" + brokerinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker/fake" eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/authstatus" @@ -764,3 +768,369 @@ func TestSubjectContained(t *testing.T) { }) } } + +func TestGetApplyingResourcesOfEventPolicyForGK(t *testing.T) { + tests := []struct { + name string + eventPolicySpecTo []v1alpha1.EventPolicySpecTo + gk schema.GroupKind + brokerObjects []*eventingv1.Broker + want []string + wantErr bool + }{ + { + name: "Returns resource from direct reference", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{}, //for a direct reference, we don't need the indexer later + want: []string{ + "my-broker", + }, + }, { + name: "Ignores resources of other Group&Kind in direct reference", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Another-Kind", + Name: "another-res", + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{}, + want: []string{ + "my-broker", + }, + }, { + name: "Returns object which match selector", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-other-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "other-key": "other-value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, + }, { + name: "Checks on GKs on selector match", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Other-Kind", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{}, + }, { + name: "Empty .spec.to matches everything in namespace", + eventPolicySpecTo: nil, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, + }, { + name: "Returns elements only once in slice", + eventPolicySpecTo: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + { + Selector: &v1alpha1.EventPolicySelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "key": "value", + }, + }, + TypeMeta: &metav1.TypeMeta{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + }, + }, + }, + }, + gk: schema.GroupKind{ + Group: "eventing.knative.dev", + Kind: "Broker", + }, + brokerObjects: []*eventingv1.Broker{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "my-broker", + Namespace: "my-ns", + Labels: map[string]string{ + "key": "value", + }, + }, + }, + }, + want: []string{ + "my-broker", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, _ := reconcilertesting.SetupFakeContext(t) + + brokerIndexer := brokerinformerfake.Get(ctx).Informer().GetIndexer() + for _, b := range tt.brokerObjects { + err := brokerIndexer.Add(b) + if err != nil { + t.Fatalf("could not add broker object to indexer: %v", err) + } + } + + eventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: tt.eventPolicySpecTo, + }, + } + + got, err := GetApplyingResourcesOfEventPolicyForGK(eventPolicy, tt.gk, brokerIndexer) + if (err != nil) != tt.wantErr { + t.Errorf("GetApplyingResourcesOfEventPolicyForGK() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetApplyingResourcesOfEventPolicyForGK() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEventPolicyEventHandler_AddAndDelete(t *testing.T) { + eventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + + gk := schema.GroupKind{ + Group: eventingv1.SchemeGroupVersion.Group, + Kind: "Broker", + } + + wantCalls := []string{ + "my-broker", + } + + calls := map[string]int{} + callbackFn := func(key types.NamespacedName) { + calls[key.Name]++ + } + + handler := EventPolicyEventHandler(nil, gk, callbackFn) + handler.OnAdd(eventPolicy, false) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in ADD was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } + + // do the same for OnDelete + calls = map[string]int{} + handler.OnDelete(eventPolicy) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in DELETE was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } + +} + +func TestEventPolicyEventHandler_Update(t *testing.T) { + oldEventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + newEventPolicy := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-policy", + Namespace: "my-ns", + }, + Spec: v1alpha1.EventPolicySpec{ + To: []v1alpha1.EventPolicySpecTo{ + { + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: eventingv1.SchemeGroupVersion.String(), + Kind: "Broker", + Name: "my-broker", + }, + }, + }, + }, + } + + gk := schema.GroupKind{ + Group: eventingv1.SchemeGroupVersion.Group, + Kind: "Broker", + } + + wantCalls := []string{ + "my-broker", + } + + calls := map[string]int{} + callbackFn := func(key types.NamespacedName) { + calls[key.Name]++ + } + + handler := EventPolicyEventHandler(nil, gk, callbackFn) + handler.OnUpdate(oldEventPolicy, newEventPolicy) + + if len(calls) != len(wantCalls) { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on wrong resources. Want to be called on %v, but was called on %v", wantCalls, calls) + } + for _, wantCallForResource := range wantCalls { + num, ok := calls[wantCallForResource] + if !ok { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on %s 0 times. Expected to be called only once", wantCallForResource) + } + + if num != 1 { + t.Errorf("EventPolicyEventHandler() callback in UPDATE was called on %s %d times. Expected to be called only once", wantCallForResource, num) + } + } +} diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index fe06a2dd633..91e0e469ffd 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -135,7 +135,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, @@ -165,7 +166,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, }, { Name: "Backing channel created", @@ -259,7 +261,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelAddress(backingChannelAddressable), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, @@ -293,7 +296,8 @@ func TestReconcile(t *testing.T) { WithInMemoryChannelAddress(backingChannelAddressable), WithInMemoryChannelSubscribers(subscribers()), WithInMemoryChannelStatusSubscribers(subscriberStatuses()), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewChannel(channelName, testNS, diff --git a/pkg/reconciler/inmemorychannel/controller/controller.go b/pkg/reconciler/inmemorychannel/controller/controller.go index e60a5414e7b..2102b3728e7 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller.go +++ b/pkg/reconciler/inmemorychannel/controller/controller.go @@ -19,18 +19,21 @@ package controller import ( "context" - kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/logging" + "knative.dev/eventing/pkg/auth" "github.com/kelseyhightower/envconfig" "k8s.io/client-go/tools/cache" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/logging" "knative.dev/pkg/system" "knative.dev/pkg/resolver" "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel" inmemorychannelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel" "knative.dev/eventing/pkg/eventingtls" @@ -65,6 +68,7 @@ func NewController( serviceAccountInformer := serviceaccount.Get(ctx) roleBindingInformer := rolebinding.Get(ctx) secretInformer := secretinformer.Get(ctx) + eventPolicyInformer := eventpolicy.Get(ctx) r := &Reconciler{ kubeClientSet: kubeclient.Get(ctx), @@ -75,6 +79,7 @@ func NewController( serviceAccountLister: serviceAccountInformer.Lister(), roleBindingLister: roleBindingInformer.Lister(), secretLister: secretInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), } env := &envConfig{} @@ -140,6 +145,12 @@ func NewController( Handler: controller.HandleAll(globalResync), }) + imcGK := messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel").GroupKind() + + // Enqueue the InMemoryChannel, if we have an EventPolicy which was referencing + // or got updated and now is referencing the InMemoryChannel + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(inmemorychannelInformer.Informer().GetIndexer(), imcGK, impl.EnqueueKey)) + // Setup the watch on the config map of dispatcher config configStore := config.NewEventDispatcherConfigStore(logging.FromContext(ctx)) configStore.WatchConfigs(cmw) diff --git a/pkg/reconciler/inmemorychannel/controller/controller_test.go b/pkg/reconciler/inmemorychannel/controller/controller_test.go index f6c60bda7fa..4a6f1f3d2f0 100644 --- a/pkg/reconciler/inmemorychannel/controller/controller_test.go +++ b/pkg/reconciler/inmemorychannel/controller/controller_test.go @@ -31,6 +31,7 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/inmemorychannel/fake" "knative.dev/eventing/pkg/reconciler/inmemorychannel/controller/config" diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index 9795dd31fff..3d818463dd6 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -20,6 +20,11 @@ import ( "context" "errors" "fmt" + "strings" + + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" @@ -93,6 +98,8 @@ type Reconciler struct { eventDispatcherConfigStore *config.EventDispatcherConfigStore uriResolver *resolver.URIResolver + + eventPolicyLister v1alpha1.EventPolicyLister } // Check that our Reconciler implements Interface @@ -231,6 +238,44 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) imc.GetConditionSet().Manage(imc.GetStatus()).MarkTrue(v1.InMemoryChannelConditionAddressable) + imc.Status.Policies = nil + applyingEvenPolicies, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel"), imc.ObjectMeta) + if err != nil { + logging.FromContext(ctx).Errorw("Unable to get applying event policies for InMemoryChannel", zap.Error(err)) + imc.Status.MarkEventPoliciesFailed("EventPoliciesGetFailed", "Failed to get applying event policies") + } + + if len(applyingEvenPolicies) > 0 { + unreadyEventPolicies := []string{} + for _, policy := range applyingEvenPolicies { + if !policy.Status.IsReady() { + unreadyEventPolicies = append(unreadyEventPolicies, policy.Name) + } else { + // only add Ready policies to the list + imc.Status.Policies = append(imc.Status.Policies, eventingduck.AppliedEventPolicyRef{ + Name: policy.Name, + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }) + } + } + + if len(unreadyEventPolicies) == 0 { + imc.Status.MarkEventPoliciesTrue() + } else { + imc.Status.MarkEventPoliciesFailed("EventPoliciesNotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) + } + + } else { + // we have no applying event policy. So we set the EP condition to True + if featureFlags.IsOIDCAuthentication() { + // in case of OIDC auth, we also set the message with the default authorization mode + imc.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", featureFlags[feature.AuthorizationDefaultMode]) + } else { + // in case OIDC is disabled, we set EP condition to true too, but give some message that authz (EPs) require OIDC + imc.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } + } + // Ok, so now the Dispatcher Deployment & Service have been created, we're golden since the // dispatcher watches the Channel and where it needs to dispatch events to. logging.FromContext(ctx).Debugw("Reconciled InMemoryChannel", zap.Any("InMemoryChannel", imc)) diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index ac06a594d52..11c0aca30cd 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -22,6 +22,8 @@ import ( "strconv" "testing" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" @@ -75,6 +77,9 @@ const ( maxIdleConnsPerHost = 200 imcGeneration = 7 + + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) var ( @@ -183,7 +188,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "the status of deployment is unknown", @@ -206,7 +212,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Service does not exist", @@ -326,6 +333,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -368,6 +376,7 @@ func TestAllCases(t *testing.T) { URL: apis.HTTPS(dlsHost), CACerts: pointer.String(string(eventingtlstesting.CA)), }), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -393,6 +402,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -443,6 +453,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -473,6 +484,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, { @@ -541,6 +553,7 @@ func TestAllCases(t *testing.T) { }), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -588,6 +601,7 @@ func TestAllCases(t *testing.T) { }), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ @@ -617,18 +631,143 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelServiceReady(), WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), - WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), WithInMemoryChannelAddress(duckv1.Addressable{ URL: channelServiceAddress.URL, Audience: &channelAudience, }), + WithInMemoryChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(), ), }}, Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "Should provision applying EventPolicies", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReady(), + WithInMemoryChannelEventPoliciesListed(readyEventPolicyName), + ), + }}, + }, { + Name: "Should mark NotReady on unready EventPolicy", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + ), + }}, + }, { + Name: "Should list only Ready EventPolicy", + Key: imcKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + makeReadyDeployment(), + makeService(), + makeReadyEndpoints(), + NewInMemoryChannel(imcName, testNS, + WithDeadLetterSink(imcDest), + WithInMemoryChannelGeneration(imcGeneration), + ), + makeChannelService(NewInMemoryChannel(imcName, testNS)), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: imcName, + }), + ), + }, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewInMemoryChannel(imcName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelGeneration(imcGeneration), + WithInMemoryChannelStatusObservedGeneration(imcGeneration), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(channelServiceAddress), + WithDeadLetterSink(imcDest), + WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithInMemoryChannelEventPoliciesListed(readyEventPolicyName), + ), + }}, }} logger := logtesting.TestLogger(t) @@ -645,13 +784,14 @@ func TestAllCases(t *testing.T) { } r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - systemNamespace: testNS, - deploymentLister: listers.GetDeploymentLister(), - serviceLister: listers.GetServiceLister(), - endpointsLister: listers.GetEndpointsLister(), - secretLister: listers.GetSecretLister(), - uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), + kubeClientSet: fakekubeclient.Get(ctx), + systemNamespace: testNS, + deploymentLister: listers.GetDeploymentLister(), + serviceLister: listers.GetServiceLister(), + endpointsLister: listers.GetEndpointsLister(), + secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), + uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), } return inmemorychannel.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetInMemoryChannelLister(), @@ -703,6 +843,7 @@ func TestInNamespace(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, WantEvents: []string{ @@ -742,6 +883,7 @@ func TestInNamespace(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithDeadLetterSink(imcDest), WithInMemoryChannelStatusDLS(dlsStatus), + WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled(), ), }}, }, @@ -763,6 +905,7 @@ func TestInNamespace(t *testing.T) { serviceAccountLister: listers.GetServiceAccountLister(), roleBindingLister: listers.GetRoleBindingLister(), secretLister: listers.GetSecretLister(), + eventPolicyLister: listers.GetEventPolicyLister(), eventDispatcherConfigStore: eventDispatcherConfigStore, uriResolver: resolver.NewURIResolverFromTracker(ctx, tracker.New(func(types.NamespacedName) {}, 0)), } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 5e5d8862291..b50a27c351d 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -268,7 +268,8 @@ func TestAllCases(t *testing.T) { }, }), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), }, WantEvents: []string{ Eventf(corev1.EventTypeWarning, "InternalError", "failed to parse Spec.BackoffDelay: expected 'P' period mark at the start: garbage"), @@ -346,7 +347,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), wantSubs: []fanout.Subscription{{ Subscriber: duckv1.Addressable{ URL: apis.HTTP("call1"), @@ -375,7 +377,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1}, wantSubs: []fanout.Subscription{{ Namespace: testNS, @@ -403,7 +406,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{{ Namespace: testNS, @@ -431,7 +435,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{ { @@ -453,7 +458,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1, subscriber3}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{*subscription1, *subscription2}, wantSubs: []fanout.Subscription{ { @@ -482,7 +488,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers([]eventingduckv1.SubscriberSpec{subscriber1WithLinearRetry}), WithInMemoryChannelAddress(channelServiceAddress), - WithInMemoryChannelDLSUnknown()), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), subs: []fanout.Subscription{{ Subscriber: duckv1.Addressable{ URL: apis.HTTP("call1"), @@ -537,7 +544,7 @@ func TestReconciler_ReconcileKind(t *testing.T) { } channelHandler := handler.GetChannelHandler(channelServiceAddress.URL.Host) if channelHandler == nil { - t.Errorf("Did not get handler for %s", channelServiceAddress.URL.Host) + t.Fatalf("Did not get handler for %s", channelServiceAddress.URL.Host) } if diff := cmp.Diff(tc.wantSubs, channelHandler.GetSubscriptions(context.TODO()), cmpopts.IgnoreFields(kncloudevents.RetryConfig{}, "Backoff", "CheckRetry"), cmpopts.IgnoreFields(fanout.Subscription{}, "UID")); diff != "" { t.Error("unexpected subs (+want/-got)", diff) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go index e0ec28d8ce1..9bc37da21b2 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go @@ -40,6 +40,7 @@ func TestReadinessChecker(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(duckv1.Addressable{URL: apis.HTTP("fake-address")}), WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady(), ), }) diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go new file mode 100644 index 00000000000..a54b07a03e0 --- /dev/null +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -0,0 +1,101 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/apis" +) + +// EventPolicyOption enables further configuration of an EventPolicy. +type EventPolicyOption func(*v1alpha1.EventPolicy) + +// NewEventPolicy creates a EventPolicy with EventPolicyOptions. +func NewEventPolicy(name, namespace string, o ...EventPolicyOption) *v1alpha1.EventPolicy { + ep := &v1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } + for _, opt := range o { + opt(ep) + } + ep.SetDefaults(context.Background()) + + return ep +} + +func WithInitEventPolicyConditions(et *v1alpha1.EventPolicy) { + et.Status.InitializeConditions() +} + +func WithReadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { + ep.Status.Conditions = []apis.Condition{ + { + Type: v1alpha1.EventPolicyConditionReady, + Status: corev1.ConditionTrue, + }, + } +} + +func WithUnreadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { + ep.Status.Conditions = []apis.Condition{ + { + Type: v1alpha1.EventPolicyConditionReady, + Status: corev1.ConditionFalse, + }, + } +} + +func WithEventPolicyTo(tos ...v1alpha1.EventPolicySpecTo) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.To = append(ep.Spec.To, tos...) + } +} + +func WithEventPolicyToRef(ref v1alpha1.EventPolicyToReference) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.To = append(ep.Spec.To, v1alpha1.EventPolicySpecTo{ + Ref: &ref, + }) + } +} + +func WithEventPolicyFrom(froms ...v1alpha1.EventPolicySpecFrom) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.From = append(ep.Spec.From, froms...) + } +} + +func WithEventPolicyLabels(labels map[string]string) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.ObjectMeta.Labels = labels + } +} + +func WithEventPolicyOwnerReference(ownerRef metav1.OwnerReference) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + ownerRef, + } + } +} diff --git a/pkg/reconciler/testing/v1/inmemorychannel.go b/pkg/reconciler/testing/v1/inmemorychannel.go index bd6398e67bf..4405cfaf533 100644 --- a/pkg/reconciler/testing/v1/inmemorychannel.go +++ b/pkg/reconciler/testing/v1/inmemorychannel.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + "k8s.io/apimachinery/pkg/types" appsv1 "k8s.io/api/apps/v1" @@ -25,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1 "knative.dev/pkg/apis/duck/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/apis/messaging" @@ -138,6 +142,41 @@ func WithInMemoryChannelEndpointsReady() InMemoryChannelOption { } } +func WithInMemoryChannelEventPoliciesReady() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrue() + } +} + +func WithInMemoryChannelEventPoliciesNotReady(reason, message string) InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithInMemoryChannelEventPoliciesListed(policyNames ...string) InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + for _, names := range policyNames { + imc.Status.Policies = append(imc.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Name: names, + }) + } + } +} + +func WithInMemoryChannelEventPoliciesReadyBecauseOIDCDisabled() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithInMemoryChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled() InMemoryChannelOption { + return func(imc *v1.InMemoryChannel) { + imc.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", feature.AuthorizationAllowSameNamespace) + } +} + func WithInMemoryChannelAddress(a duckv1.Addressable) InMemoryChannelOption { return func(imc *v1.InMemoryChannel) { imc.Status.SetAddress(&a) diff --git a/pkg/reconciler/testing/v1/listers.go b/pkg/reconciler/testing/v1/listers.go index 788b2fab2cc..0c7c3546d2c 100644 --- a/pkg/reconciler/testing/v1/listers.go +++ b/pkg/reconciler/testing/v1/listers.go @@ -31,12 +31,14 @@ import ( "k8s.io/client-go/tools/cache" eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" eventingv1beta2 "knative.dev/eventing/pkg/apis/eventing/v1beta2" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" fakeeventingclientset "knative.dev/eventing/pkg/client/clientset/versioned/fake" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" eventingv1beta2listers "knative.dev/eventing/pkg/client/listers/eventing/v1beta2" flowslisters "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -112,6 +114,10 @@ func (l *Listers) GetEventTypeLister() eventingv1beta2listers.EventTypeLister { return eventingv1beta2listers.NewEventTypeLister(l.indexerFor(&eventingv1beta2.EventType{})) } +func (l *Listers) GetEventPolicyLister() eventingv1alpha1listers.EventPolicyLister { + return eventingv1alpha1listers.NewEventPolicyLister(l.indexerFor(&eventingv1alpha1.EventPolicy{})) +} + func (l *Listers) GetPingSourceLister() sourcelisters.PingSourceLister { return sourcelisters.NewPingSourceLister(l.indexerFor(&sourcesv1.PingSource{})) }