Skip to content

Commit

Permalink
List applying EventPolicies in Sequence status (#8012)
Browse files Browse the repository at this point in the history
* add SequenceConditionEventPoliciesReady in status

Signed-off-by: rahulii <[email protected]>

* add eventPolicy lister to the sequence reconciler

Signed-off-by: rahulii <[email protected]>

* call auth updatestatus func

Signed-off-by: rahulii <[email protected]>

* run goimports

Signed-off-by: rahulii <[email protected]>

* fix lifecyle tests

Signed-off-by: rahulii <[email protected]>

* add tests funcs

Signed-off-by: rahulii <[email protected]>

* fix sequence test cases

Signed-off-by: rahulii <[email protected]>

* add event policies test cases

Signed-off-by: rahulii <[email protected]>

* change group and add more test cases

Signed-off-by: rahulii <[email protected]>

* minor fix

Signed-off-by: rahulii <[email protected]>

* fix unit test case

Signed-off-by: rahulii <[email protected]>

* fix to review comment

Signed-off-by: rahulii <[email protected]>

* fix linting issues

Signed-off-by: rahulii <[email protected]>

---------

Signed-off-by: rahulii <[email protected]>
  • Loading branch information
rahulii authored Jul 1, 2024
1 parent 3b1bfb4 commit 96c30bd
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 38 deletions.
28 changes: 27 additions & 1 deletion pkg/apis/flows/v1/sequence_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"
)

var sCondSet = apis.NewLivingConditionSet(SequenceConditionReady, SequenceConditionChannelsReady, SequenceConditionSubscriptionsReady, SequenceConditionAddressable)
var sCondSet = apis.NewLivingConditionSet(
SequenceConditionReady,
SequenceConditionChannelsReady,
SequenceConditionSubscriptionsReady,
SequenceConditionAddressable,
SequenceConditionEventPoliciesReady,
)

const (
// SequenceConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -45,6 +51,10 @@ const (
// SequenceConditionAddressable has status true when this Sequence meets
// the Addressable contract and has a non-empty hostname.
SequenceConditionAddressable apis.ConditionType = "Addressable"

// SequenceConditionEventPoliciesReady has status True when all the applying EventPolicies for this
// Sequence are ready.
SequenceConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -189,6 +199,22 @@ func (ss *SequenceStatus) MarkAddressableNotReady(reason, messageFormat string,
sCondSet.Manage(ss).MarkUnknown(SequenceConditionAddressable, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkFalse(SequenceConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkUnknown(SequenceConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) MarkEventPoliciesTrue() {
sCondSet.Manage(ss).MarkTrue(SequenceConditionEventPoliciesReady)
}

func (ss *SequenceStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
sCondSet.Manage(ss).MarkTrueWithReason(SequenceConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (ss *SequenceStatus) setAddress(address *duckv1.Addressable) {
if address == nil || address.URL == nil {
ss.Address = duckv1.Addressable{}
Expand Down
94 changes: 62 additions & 32 deletions pkg/apis/flows/v1/sequence_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func TestSequenceInitializeConditions(t *testing.T) {
}, {
Type: SequenceConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: SequenceConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: SequenceConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -172,6 +175,9 @@ func TestSequenceInitializeConditions(t *testing.T) {
}, {
Type: SequenceConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: SequenceConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: SequenceConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -199,6 +205,9 @@ func TestSequenceInitializeConditions(t *testing.T) {
}, {
Type: SequenceConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: SequenceConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
}, {
Type: SequenceConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -396,52 +405,73 @@ func TestSequencePropagateChannelStatuses(t *testing.T) {

func TestSequenceReady(t *testing.T) {
tests := []struct {
name string
subs []*messagingv1.Subscription
channels []*eventingduckv1.Channelable
want bool
name string
subs []*messagingv1.Subscription
channels []*eventingduckv1.Channelable
eventPoliciesReady bool
want bool
}{{
name: "empty",
subs: []*messagingv1.Subscription{},
channels: []*eventingduckv1.Channelable{},
want: false,
name: "empty",
subs: []*messagingv1.Subscription{},
channels: []*eventingduckv1.Channelable{},
eventPoliciesReady: true,
want: false,
}, {
name: "one channelable not ready, one subscription ready",
channels: []*eventingduckv1.Channelable{getChannelable(false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "one channelable not ready, one subscription ready",
channels: []*eventingduckv1.Channelable{getChannelable(false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "one channelable ready, one subscription not ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
name: "one channelable ready, one subscription not ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "one channelable ready, one subscription ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
name: "one channelable ready, one subscription ready, event policy ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: true,
want: true,
}, {
name: "one channelable ready, one not, two subscriptions ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "one channelable ready, one subscription ready, event policy not ready",
channels: []*eventingduckv1.Channelable{getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
eventPoliciesReady: false,
want: false,
}, {
name: "two channelables ready, one subscription ready, one not",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
name: "one channelable ready, one not, two subscriptions ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: false,
}, {
name: "two channelables ready, two subscriptions ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
name: "two channelables ready, one subscription ready, one not",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
eventPoliciesReady: true,
want: false,
}, {
name: "two channelables ready, two subscriptions ready",
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
eventPoliciesReady: true,
want: true,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := SequenceStatus{}
ps.PropagateChannelStatuses(test.channels)
ps.PropagateSubscriptionStatuses(test.subs)

if test.eventPoliciesReady {
ps.MarkEventPoliciesTrue()
} else {
ps.MarkEventPoliciesFailed("", "")
}

got := ps.IsReady()
want := test.want
if want != got {
Expand Down
15 changes: 15 additions & 0 deletions pkg/reconciler/sequence/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (

"k8s.io/client-go/tools/cache"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"

flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
"knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence"
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence"
Expand All @@ -42,12 +45,14 @@ func NewController(

sequenceInformer := sequence.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
eventPolicyInformer := eventpolicy.Get(ctx)

r := &Reconciler{
sequenceLister: sequenceInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
eventPolicyLister: eventPolicyInformer.Lister(),
}
impl := sequencereconciler.NewImpl(ctx, r)

Expand All @@ -61,5 +66,15 @@ func NewController(
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

sequenceGK := flowsv1.SchemeGroupVersion.WithKind("Sequence").GroupKind()

// Enqueue the Sequence, if we have an EventPolicy which was referencing
// or got updated and now is referencing the Sequence
eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(
sequenceInformer.Informer().GetIndexer(),
sequenceGK,
impl.EnqueueKey,
))

return impl
}
11 changes: 10 additions & 1 deletion pkg/reconciler/sequence/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,28 @@ package sequence
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
Namespace: "knative-eventing",
},
}))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
"knative.dev/eventing/pkg/duck"
Expand All @@ -58,6 +61,8 @@ type Reconciler struct {

// dynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

eventPolicyLister eventingv1alpha1listers.EventPolicyLister
}

// Check that our Reconciler implements sequencereconciler.Interface
Expand All @@ -74,6 +79,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, s *v1.Sequence) pkgrecon
// 3. Rinse and repeat step #2 above for each Step in the list
// 4. If there's a Reply, then the last Subscription will be configured to send the reply to that.

featureFlags := feature.FromContext(ctx)

gvr, _ := meta.UnsafeGuessKindToResource(s.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())
channelResourceInterface := r.dynamicClientSet.Resource(gvr).Namespace(s.Namespace)
if channelResourceInterface == nil {
Expand Down Expand Up @@ -122,6 +129,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, s *v1.Sequence) pkgrecon
return err
}

err := auth.UpdateStatusWithEventPolicies(featureFlags, &s.Status.AppliedEventPoliciesStatus, &s.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Sequence"), s.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update channel status with EventPolicies: %v", err)
}

return r.removeUnwantedSubscriptions(ctx, s, subs)
}

Expand Down
Loading

0 comments on commit 96c30bd

Please sign in to comment.