diff --git a/config/core/resources/sequence.yaml b/config/core/resources/sequence.yaml index 636fcf9b7a5..6ef3641caa9 100644 --- a/config/core/resources/sequence.yaml +++ b/config/core/resources/sequence.yaml @@ -74,6 +74,12 @@ spec: uri: description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. type: string + CACerts: + type: string + description: Certification Authority (CA) certificates in PEM format that the source trusts when sending events to the reply. + audience: + description: Audience is the OIDC audience of the reply. This only needs to be set if the target is not an Addressable and thus the Audience can't be received from the target itself. If specified, it takes precedence over the target's Audience. + type: string steps: description: Steps is the list of Destinations (processors / functions) that will be called in the order provided. Each step has its own delivery options type: array diff --git a/pkg/reconciler/sequence/resources/subscription.go b/pkg/reconciler/sequence/resources/subscription.go index 5cf5b3b58e4..4f9c365ceaa 100644 --- a/pkg/reconciler/sequence/resources/subscription.go +++ b/pkg/reconciler/sequence/resources/subscription.go @@ -52,8 +52,10 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription { Name: SequenceChannelName(s.Name, stepNumber), }, Subscriber: &duckv1.Destination{ - Ref: s.Spec.Steps[stepNumber].Destination.Ref, - URI: s.Spec.Steps[stepNumber].Destination.URI, + Ref: s.Spec.Steps[stepNumber].Destination.Ref, + URI: s.Spec.Steps[stepNumber].Destination.URI, + Audience: s.Spec.Steps[stepNumber].Destination.Audience, + CACerts: s.Spec.Steps[stepNumber].Destination.CACerts, }, Delivery: s.Spec.Steps[stepNumber].Delivery, }, @@ -71,8 +73,10 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription { } } else if s.Spec.Reply != nil { r.Spec.Reply = &duckv1.Destination{ - Ref: s.Spec.Reply.Ref, - URI: s.Spec.Reply.URI, + Ref: s.Spec.Reply.Ref, + URI: s.Spec.Reply.URI, + Audience: s.Spec.Reply.Audience, + CACerts: s.Spec.Reply.CACerts, } } return r diff --git a/test/auth/features/oidc/sequence.go b/test/auth/features/oidc/sequence.go index 1c0013cb5f0..e126e8da2c8 100644 --- a/test/auth/features/oidc/sequence.go +++ b/test/auth/features/oidc/sequence.go @@ -17,13 +17,21 @@ limitations under the License. package oidc import ( + "github.com/cloudevents/sdk-go/v2/test" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/reconciler/sequence/resources" "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/channel_template" "knative.dev/eventing/test/rekt/resources/sequence" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/pkg/resources/service" ) func SequenceHasAudienceOfInputChannel(sequenceName, sequenceNamespace string, channelGVR schema.GroupVersionResource, channelKind string) *feature.Feature { @@ -40,3 +48,143 @@ func SequenceHasAudienceOfInputChannel(sequenceName, sequenceNamespace string, c return f } + +func SequenceSendsEventWithOIDC() *feature.FeatureSet { + return &feature.FeatureSet{ + Name: "Sequence send events with OIDC support", + Features: []*feature.Feature{ + SequenceSendsEventWithOIDCTokenToSteps(), + SequenceSendsEventWithOIDCTokenToReply(), + }, + } +} + +func SequenceSendsEventWithOIDCTokenToSteps() *feature.Feature { + f := feature.NewFeatureNamed("Sequence supports OIDC in internal flow between steps") + + channelTemplate := channel_template.ChannelTemplate{ + TypeMeta: channel_impl.TypeMeta(), + Spec: map[string]interface{}{}, + } + + sequenceName := feature.MakeRandomK8sName("sequence") + step1Name := feature.MakeRandomK8sName("step1") + step2Name := feature.MakeRandomK8sName("step2") + sourceName := feature.MakeRandomK8sName("source") + + step1Audience := "step1-aud" + step2Audience := "step2-aud" + + step1Append := "-step1" + step2Append := "-step2" + + f.Setup("install step 1", eventshub.Install(step1Name, + eventshub.ReplyWithAppendedData(step1Append), + eventshub.OIDCReceiverAudience(step1Audience), + eventshub.StartReceiver)) + f.Setup("install step 2", eventshub.Install(step2Name, + eventshub.ReplyWithAppendedData(step2Append), + eventshub.OIDCReceiverAudience(step2Audience), + eventshub.StartReceiver)) + + cfg := []manifest.CfgFn{ + sequence.WithChannelTemplate(channelTemplate), + sequence.WithStepFromDestination(&duckv1.Destination{ + Ref: service.AsKReference(step1Name), + Audience: &step1Audience, + }), + sequence.WithStepFromDestination(&duckv1.Destination{ + Ref: service.AsKReference(step2Name), + Audience: &step2Audience, + }), + } + + f.Setup("Install Sequence", sequence.Install(sequenceName, cfg...)) + f.Setup("Sequence goes ready", sequence.IsReady(sequenceName)) + + event := test.FullEvent() + event.SetData("text/plain", "hello") + f.Requirement("install source", eventshub.Install(sourceName, + eventshub.StartSenderToResource(sequence.GVR(), sequenceName), + eventshub.InputEvent(event))) + + expectedMsg := string(event.Data()) + expectedMsg += step1Append + f.Alpha("Sequence with steps having an OIDC audience"). + Must("Delivers events correctly to steps", + assert.OnStore(step2Name).MatchEvent( + test.HasData([]byte(expectedMsg)), + ).AtLeast(1)) + + return f +} + +func SequenceSendsEventWithOIDCTokenToReply() *feature.Feature { + f := feature.NewFeatureNamed("Sequence supports OIDC for reply") + + channelTemplate := channel_template.ChannelTemplate{ + TypeMeta: channel_impl.TypeMeta(), + Spec: map[string]interface{}{}, + } + + sequenceName := feature.MakeRandomK8sName("sequence") + step1Name := feature.MakeRandomK8sName("step1") + step2Name := feature.MakeRandomK8sName("step2") + replySinkName := feature.MakeRandomK8sName("reply-sink") + sourceName := feature.MakeRandomK8sName("source") + + step1Audience := "step1-aud" + step2Audience := "step2-aud" + replySinkAudience := "reply-sink-aud" + + step1Append := "-step1" + step2Append := "-step2" + + f.Setup("install step 1", eventshub.Install(step1Name, + eventshub.ReplyWithAppendedData(step1Append), + eventshub.OIDCReceiverAudience(step1Audience), + eventshub.StartReceiver)) + f.Setup("install step 2", eventshub.Install(step2Name, + eventshub.ReplyWithAppendedData(step2Append), + eventshub.OIDCReceiverAudience(step2Audience), + eventshub.StartReceiver)) + f.Setup("install sink", eventshub.Install(replySinkName, + eventshub.OIDCReceiverAudience(replySinkAudience), + eventshub.StartReceiver)) + + cfg := []manifest.CfgFn{ + sequence.WithChannelTemplate(channelTemplate), + sequence.WithReplyFromDestination(&duckv1.Destination{ + Ref: service.AsKReference(replySinkName), + Audience: &replySinkAudience, + }), + sequence.WithStepFromDestination(&duckv1.Destination{ + Ref: service.AsKReference(step1Name), + Audience: &step1Audience, + }), + sequence.WithStepFromDestination(&duckv1.Destination{ + Ref: service.AsKReference(step2Name), + Audience: &step2Audience, + }), + } + + f.Setup("Install Sequence", sequence.Install(sequenceName, cfg...)) + f.Setup("Sequence goes ready", sequence.IsReady(sequenceName)) + + event := test.FullEvent() + event.SetData("text/plain", "hello") + f.Requirement("install source", eventshub.Install(sourceName, + eventshub.StartSenderToResource(sequence.GVR(), sequenceName), + eventshub.InputEvent(event))) + + expectedMsg := string(event.Data()) + expectedMsg += step1Append + expectedMsg += step2Append + f.Alpha("Sequence with steps having an OIDC audience"). + Must("Delivers events correctly to reply", + assert.OnStore(replySinkName).MatchEvent( + test.HasData([]byte(expectedMsg)), + ).AtLeast(1)) + + return f +} diff --git a/test/auth/oidc_test.go b/test/auth/oidc_test.go index eda31347168..b8c217df4e6 100644 --- a/test/auth/oidc_test.go +++ b/test/auth/oidc_test.go @@ -32,7 +32,7 @@ import ( "knative.dev/eventing/test/auth/features/oidc" brokerfeatures "knative.dev/eventing/test/rekt/features/broker" "knative.dev/eventing/test/rekt/features/channel" - containersource "knative.dev/eventing/test/rekt/features/containersource" + "knative.dev/eventing/test/rekt/features/containersource" parallelfeatures "knative.dev/eventing/test/rekt/features/parallel" sequencefeatures "knative.dev/eventing/test/rekt/features/sequence" "knative.dev/eventing/test/rekt/resources/broker" @@ -159,3 +159,17 @@ func TestContainerSourceSendsEventsWithOIDCSupport(t *testing.T) { env.Test(ctx, t, containersource.SendsEventsWithSinkRefOIDC()) } + +func TestSequenceSendsEventsWithOIDCSupport(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.TestSet(ctx, t, oidc.SequenceSendsEventWithOIDC()) +} diff --git a/test/rekt/resources/sequence/sequence.go b/test/rekt/resources/sequence/sequence.go index d29ce9afdc5..d740fe98092 100644 --- a/test/rekt/resources/sequence/sequence.go +++ b/test/rekt/resources/sequence/sequence.go @@ -19,6 +19,7 @@ package sequence import ( "context" "embed" + "strings" "time" "k8s.io/apimachinery/pkg/runtime/schema" @@ -112,6 +113,48 @@ func WithStep(ref *duckv1.KReference, uri string) manifest.CfgFn { } } +func WithStepFromDestination(dest *duckv1.Destination) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["steps"]; !set { + cfg["steps"] = []map[string]interface{}{} + } + + step := map[string]interface{}{} + + uri := dest.URI + ref := dest.Ref + + if dest.CACerts != nil { + // This is a multi-line string and should be indented accordingly. + // Replace "new line" with "new line + spaces". + step["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ") + } + + if dest.Audience != nil { + step["audience"] = *dest.Audience + } + + if uri != nil { + step["uri"] = uri.String() + } + if ref != nil { + if _, set := step["ref"]; !set { + step["ref"] = map[string]interface{}{} + } + sref := step["ref"].(map[string]interface{}) + sref["apiVersion"] = ref.APIVersion + sref["kind"] = ref.Kind + sref["namespace"] = ref.Namespace + sref["name"] = ref.Name + } + + steps := cfg["steps"].([]map[string]interface{}) + steps = append(steps, step) + + cfg["steps"] = steps + } +} + // WithReply adds the top level reply config to a Parallel spec. func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn { return func(cfg map[string]interface{}) { @@ -136,6 +179,42 @@ func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn { } } +func WithReplyFromDestination(dest *duckv1.Destination) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["reply"]; !set { + cfg["reply"] = map[string]interface{}{} + } + reply := cfg["reply"].(map[string]interface{}) + + uri := dest.URI + ref := dest.Ref + + if dest.CACerts != nil { + // This is a multi-line string and should be indented accordingly. + // Replace "new line" with "new line + spaces". + reply["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ") + } + + if dest.Audience != nil { + reply["audience"] = *dest.Audience + } + + if uri != nil { + reply["uri"] = uri.String() + } + if ref != nil { + if _, set := reply["ref"]; !set { + reply["ref"] = map[string]interface{}{} + } + rref := reply["ref"].(map[string]interface{}) + rref["apiVersion"] = ref.APIVersion + rref["kind"] = ref.Kind + rref["namespace"] = ref.Namespace + rref["name"] = ref.Name + } + } +} + // WithChannelTemplate adds the top level channel references. func WithChannelTemplate(template channel_template.ChannelTemplate) manifest.CfgFn { return func(cfg map[string]interface{}) { diff --git a/test/rekt/resources/sequence/sequence.yaml b/test/rekt/resources/sequence/sequence.yaml index 32f89ba2cd0..59e6c9bd581 100644 --- a/test/rekt/resources/sequence/sequence.yaml +++ b/test/rekt/resources/sequence/sequence.yaml @@ -51,6 +51,13 @@ spec: {{ if .uri }} uri: {{ .uri }} {{ end }} + {{ if .CACerts }} + CACerts: |- + {{ .CACerts }} + {{ end }} + {{ if .audience }} + audience: {{ .audience }} + {{ end }} {{ end }} {{ if .reply }} reply: @@ -64,4 +71,11 @@ spec: {{ if .reply.uri }} uri: {{ .reply.uri }} {{ end }} + {{ if .reply.CACerts }} + CACerts: |- + {{ .reply.CACerts }} + {{ end }} + {{ if .reply.audience }} + audience: {{ .reply.audience }} + {{ end }} {{ end }}