Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile event policies for mt-broker #8090

Merged
merged 10 commits into from
Jul 12, 2024
65 changes: 65 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
corev1listers "k8s.io/client-go/listers/core/v1"
Expand All @@ -46,6 +47,7 @@ import (
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -264,6 +266,11 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

// Reconcile the EventPolicy for the Broker.
if err := r.reconcileBrokerChannelEventPolicies(ctx, b, triggerChan, featureFlags); err != nil {
return fmt.Errorf("failed to reconcile EventPolicy for Broker %s: %w", b.Name, err)
}

// So, at this point the Broker is ready and everything should be solid
// for the triggers to act upon.
return nil
Expand Down Expand Up @@ -428,6 +435,64 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf
return channelable, nil
}

func (r *Reconciler) reconcileBrokerChannelEventPolicies(ctx context.Context, b *eventingv1.Broker, triggerChan *duckv1.Channelable, featureFlags feature.Flags) error {
logger := logging.FromContext(ctx)

expected := resources.MakeEventPolicyForBackingChannel(b, triggerChan)
if featureFlags.IsOIDCAuthentication() {
// Get the EventPolicy, create if not exists.
foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name)
if apierrs.IsNotFound(err) {
// Create the EventPolicy since it doesn't exist.
logger.Debugw("Creating EventPolicy for Broker %s", expected.Name)

_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create EventPolicy for Broker %s: %w", expected.Name, err)
}
return nil
}
if err != nil {
return fmt.Errorf("failed to get EventPolicy for Broker %s: %w", expected.Name, err)
}
if policyNeedsUpdate(foundEP, expected) {
// Update the EventPolicy since it exists and needs update.
logger.Debugw("Updating EventPolicy for Broker %s", expected.Name)
expected.SetResourceVersion(foundEP.GetResourceVersion())
_, err = r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update EventPolicy for Broker %s: %w", expected.Name, err)
}
}
return nil
}

// List all the orphaned EventPolicies that have owner reference set to the Broker and delete them.
selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(b))
if err != nil {
return fmt.Errorf("could not get valid selector for broker's channel EventPolicy %s/%s: %w", b.Namespace, b.Name, err)
}
eventPolicies, err := r.eventPolicyLister.EventPolicies(expected.Namespace).List(selector)
if err != nil {
return fmt.Errorf("failed to list EventPolicies for Broker %s: %w", expected.Name, err)
}
for _, ep := range eventPolicies {
if metav1.IsControlledBy(ep, b) {
logger.Debugw("Deleting EventPolicy for Broker %s", expected.Name)
err := r.eventingClientSet.EventingV1alpha1().EventPolicies(ep.Namespace).Delete(ctx, ep.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete EventPolicy for Broker %s: %w", expected.Name, err)
}
logger.Debugw("Deleted EventPolicy for Broker %s", expected.Name)
}
}
return nil
}

func policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool {
return !equality.Semantic.DeepDerivative(expected, foundEP)
}

// TriggerChannelLabels are all the labels placed on the Trigger Channel for the given brokerName. This
// should only be used by Broker and Trigger code.
func TriggerChannelLabels(brokerName, brokerNamespace string) map[string]string {
Expand Down
81 changes: 81 additions & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
Expand All @@ -57,6 +58,7 @@ import (
. "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger/fake"
"knative.dev/eventing/pkg/reconciler/broker/resources"
. "knative.dev/eventing/pkg/reconciler/testing/v1"
)

Expand Down Expand Up @@ -144,6 +146,12 @@ var (
Version: "v1",
Kind: "Broker",
}

channelV1GVK = metav1.GroupVersionKind{
Group: "messaging.knative.dev",
Version: "v1",
Kind: "InMemoryChannel",
}
)

func TestReconcile(t *testing.T) {
Expand Down Expand Up @@ -780,6 +788,9 @@ func TestReconcile(t *testing.T) {
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantErr: false,
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -824,6 +835,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -866,6 +880,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand Down Expand Up @@ -911,6 +928,9 @@ func TestReconcile(t *testing.T) {
WithEventPolicyToRef(brokerV1GVK, brokerName),
),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
Expand All @@ -932,6 +952,47 @@ func TestReconcile(t *testing.T) {
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
}, {
Name: "Should create an Event Policy for a Broker's underlying Channel",
Key: testKey,
Objects: []runtime.Object{
NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithInitBrokerConditions),
createChannel(withChannelReady),
imcConfigMap(),
NewEndpoints(filterServiceName, systemNS,
WithEndpointsLabels(FilterLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
NewEndpoints(ingressServiceName, systemNS,
WithEndpointsLabels(IngressLabels()),
WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})),
},
WantCreates: []runtime.Object{
makeEventPolicy(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewBroker(brokerName, testNS,
WithBrokerClass(eventing.MTChannelBrokerClassValue),
WithBrokerConfig(config()),
WithBrokerReady,
WithBrokerAddress(&duckv1.Addressable{
URL: brokerAddress,
Audience: &brokerAudience,
}),
WithChannelAddressAnnotation(triggerChannelURL),
WithChannelAPIVersionAnnotation(triggerChannelAPIVersion),
WithChannelKindAnnotation(triggerChannelKind),
WithChannelNameAnnotation(triggerChannelName),
WithDLSNotConfigured(),
WithBrokerEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled(),
),
}},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}

Expand Down Expand Up @@ -1227,3 +1288,23 @@ func makeTLSSecret() *corev1.Secret {
Type: corev1.SecretTypeTLS,
}
}

func makeEventPolicy() *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.BrokerEventPolicyName(brokerName, triggerChannelName), testNS,
WithEventPolicyToRef(channelV1GVK, triggerChannelName),
WithEventPolicyFromSub(resources.OIDCBrokerSub),
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: brokerName,
},
}...),
WithEventPolicyLabels(map[string]string{
"eventing.knative.dev/" + "broker-group": brokerV1GVK.Group,
"eventing.knative.dev/" + "broker-version": brokerV1GVK.Version,
"eventing.knative.dev/" + "broker-kind": brokerV1GVK.Kind,
"eventing.knative.dev/" + "broker-name": brokerName,
}),
)
}
78 changes: 78 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/pkg/kmeta"
)

const (
BackingChannelEventPolicyLabelPrefix = "eventing.knative.dev/"
OIDCBrokerSub = "system:serviceaccount:knative-eventing:mt-broker-ingress-oidc"
brokerKind = "Broker"
)

func MakeEventPolicyForBackingChannel(b *eventingv1.Broker, backingChannel *eventingduckv1.Channelable) *eventingv1alpha1.EventPolicy {
return &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: backingChannel.Namespace,
Name: BrokerEventPolicyName(b.Name, backingChannel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventingv1.SchemeGroupVersion.String(),
Kind: brokerKind,
Name: b.Name,
},
},
Labels: LabelsForBackingChannelsEventPolicy(b),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: ptr.To(OIDCBrokerSub),
},
},
},
}
}

func LabelsForBackingChannelsEventPolicy(broker *eventingv1.Broker) map[string]string {
return map[string]string{
BackingChannelEventPolicyLabelPrefix + "broker-group": eventingv1.SchemeGroupVersion.Group,
BackingChannelEventPolicyLabelPrefix + "broker-version": eventingv1.SchemeGroupVersion.Version,
BackingChannelEventPolicyLabelPrefix + "broker-kind": brokerKind,
BackingChannelEventPolicyLabelPrefix + "broker-name": broker.Name,
}
}

func BrokerEventPolicyName(brokerName, channelName string) string {
return kmeta.ChildName(brokerName, "-ep-"+channelName)
}
77 changes: 77 additions & 0 deletions pkg/reconciler/broker/resources/eventpolicy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
"testing"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
)

func TestMakeEventPolicyForBackingChannel(t *testing.T) {
broker := &eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{
Name: "test-broker",
Namespace: "test-namespace",
},
}
backingChannel := &eventingduckv1.Channelable{
ObjectMeta: metav1.ObjectMeta{
Name: "test-channel",
Namespace: "test-namespace",
},
}
want := &eventingv1alpha1.EventPolicy{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-namespace",
Name: BrokerEventPolicyName(broker.Name, backingChannel.Name),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "eventing.knative.dev/v1",
Kind: "Broker",
Name: "test-broker",
},
},
Labels: LabelsForBackingChannelsEventPolicy(broker),
},
Spec: eventingv1alpha1.EventPolicySpec{
To: []eventingv1alpha1.EventPolicySpecTo{
{
Ref: &eventingv1alpha1.EventPolicyToReference{
APIVersion: backingChannel.APIVersion,
Kind: backingChannel.Kind,
Name: backingChannel.Name,
},
},
},
From: []eventingv1alpha1.EventPolicySpecFrom{
{
Sub: ptr.To(OIDCBrokerSub),
},
},
},
}
got := MakeEventPolicyForBackingChannel(broker, backingChannel)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("MakeEventPolicyForBackingChannel() (-want, +got) = %v", diff)
}
}
Loading
Loading