Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix provisioning of sequences reply and steps audience #7501

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/core/resources/sequence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
CACerts:
type: string
description: Certification Authority (CA) certificates in PEM format that the source trusts when sending events to the reply.
audience:
description: Audience is the OIDC audience of the reply. This only needs to be set if the target is not an Addressable and thus the Audience can't be received from the target itself. If specified, it takes precedence over the target's Audience.
type: string
steps:
description: Steps is the list of Destinations (processors / functions) that will be called in the order provided. Each step has its own delivery options
type: array
Expand Down
12 changes: 8 additions & 4 deletions pkg/reconciler/sequence/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription {
Name: SequenceChannelName(s.Name, stepNumber),
},
Subscriber: &duckv1.Destination{
Ref: s.Spec.Steps[stepNumber].Destination.Ref,
URI: s.Spec.Steps[stepNumber].Destination.URI,
Ref: s.Spec.Steps[stepNumber].Destination.Ref,
URI: s.Spec.Steps[stepNumber].Destination.URI,
Audience: s.Spec.Steps[stepNumber].Destination.Audience,
CACerts: s.Spec.Steps[stepNumber].Destination.CACerts,
},
Delivery: s.Spec.Steps[stepNumber].Delivery,
},
Expand All @@ -71,8 +73,10 @@ func NewSubscription(stepNumber int, s *v1.Sequence) *messagingv1.Subscription {
}
} else if s.Spec.Reply != nil {
r.Spec.Reply = &duckv1.Destination{
Ref: s.Spec.Reply.Ref,
URI: s.Spec.Reply.URI,
Ref: s.Spec.Reply.Ref,
URI: s.Spec.Reply.URI,
Audience: s.Spec.Reply.Audience,
CACerts: s.Spec.Reply.CACerts,
}
}
return r
Expand Down
148 changes: 148 additions & 0 deletions test/auth/features/oidc/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@ limitations under the License.
package oidc

import (
"github.com/cloudevents/sdk-go/v2/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/reconciler/sequence/resources"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/channel_template"
"knative.dev/eventing/test/rekt/resources/sequence"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/service"
)

func SequenceHasAudienceOfInputChannel(sequenceName, sequenceNamespace string, channelGVR schema.GroupVersionResource, channelKind string) *feature.Feature {
Expand All @@ -40,3 +48,143 @@ func SequenceHasAudienceOfInputChannel(sequenceName, sequenceNamespace string, c

return f
}

func SequenceSendsEventWithOIDC() *feature.FeatureSet {
return &feature.FeatureSet{
Name: "Sequence send events with OIDC support",
Features: []*feature.Feature{
SequenceSendsEventWithOIDCTokenToSteps(),
SequenceSendsEventWithOIDCTokenToReply(),
},
}
}

func SequenceSendsEventWithOIDCTokenToSteps() *feature.Feature {
f := feature.NewFeatureNamed("Sequence supports OIDC in internal flow between steps")

channelTemplate := channel_template.ChannelTemplate{
TypeMeta: channel_impl.TypeMeta(),
Spec: map[string]interface{}{},
}

sequenceName := feature.MakeRandomK8sName("sequence")
step1Name := feature.MakeRandomK8sName("step1")
step2Name := feature.MakeRandomK8sName("step2")
sourceName := feature.MakeRandomK8sName("source")

step1Audience := "step1-aud"
step2Audience := "step2-aud"

step1Append := "-step1"
step2Append := "-step2"

f.Setup("install step 1", eventshub.Install(step1Name,
eventshub.ReplyWithAppendedData(step1Append),
eventshub.OIDCReceiverAudience(step1Audience),
eventshub.StartReceiver))
f.Setup("install step 2", eventshub.Install(step2Name,
eventshub.ReplyWithAppendedData(step2Append),
eventshub.OIDCReceiverAudience(step2Audience),
eventshub.StartReceiver))

cfg := []manifest.CfgFn{
sequence.WithChannelTemplate(channelTemplate),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step1Name),
Audience: &step1Audience,
}),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step2Name),
Audience: &step2Audience,
}),
}

f.Setup("Install Sequence", sequence.Install(sequenceName, cfg...))
f.Setup("Sequence goes ready", sequence.IsReady(sequenceName))

event := test.FullEvent()
event.SetData("text/plain", "hello")
f.Requirement("install source", eventshub.Install(sourceName,
eventshub.StartSenderToResource(sequence.GVR(), sequenceName),
eventshub.InputEvent(event)))

expectedMsg := string(event.Data())
expectedMsg += step1Append
f.Alpha("Sequence with steps having an OIDC audience").
Must("Delivers events correctly to steps",
assert.OnStore(step2Name).MatchEvent(
test.HasData([]byte(expectedMsg)),
).AtLeast(1))

return f
}

func SequenceSendsEventWithOIDCTokenToReply() *feature.Feature {
f := feature.NewFeatureNamed("Sequence supports OIDC for reply")

channelTemplate := channel_template.ChannelTemplate{
TypeMeta: channel_impl.TypeMeta(),
Spec: map[string]interface{}{},
}

sequenceName := feature.MakeRandomK8sName("sequence")
step1Name := feature.MakeRandomK8sName("step1")
step2Name := feature.MakeRandomK8sName("step2")
replySinkName := feature.MakeRandomK8sName("reply-sink")
sourceName := feature.MakeRandomK8sName("source")

step1Audience := "step1-aud"
step2Audience := "step2-aud"
replySinkAudience := "reply-sink-aud"

step1Append := "-step1"
step2Append := "-step2"

f.Setup("install step 1", eventshub.Install(step1Name,
eventshub.ReplyWithAppendedData(step1Append),
eventshub.OIDCReceiverAudience(step1Audience),
eventshub.StartReceiver))
f.Setup("install step 2", eventshub.Install(step2Name,
eventshub.ReplyWithAppendedData(step2Append),
eventshub.OIDCReceiverAudience(step2Audience),
eventshub.StartReceiver))
f.Setup("install sink", eventshub.Install(replySinkName,
eventshub.OIDCReceiverAudience(replySinkAudience),
eventshub.StartReceiver))

cfg := []manifest.CfgFn{
sequence.WithChannelTemplate(channelTemplate),
sequence.WithReplyFromDestination(&duckv1.Destination{
Ref: service.AsKReference(replySinkName),
Audience: &replySinkAudience,
}),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step1Name),
Audience: &step1Audience,
}),
sequence.WithStepFromDestination(&duckv1.Destination{
Ref: service.AsKReference(step2Name),
Audience: &step2Audience,
}),
}

f.Setup("Install Sequence", sequence.Install(sequenceName, cfg...))
f.Setup("Sequence goes ready", sequence.IsReady(sequenceName))

event := test.FullEvent()
event.SetData("text/plain", "hello")
f.Requirement("install source", eventshub.Install(sourceName,
eventshub.StartSenderToResource(sequence.GVR(), sequenceName),
eventshub.InputEvent(event)))

expectedMsg := string(event.Data())
expectedMsg += step1Append
expectedMsg += step2Append
f.Alpha("Sequence with steps having an OIDC audience").
Must("Delivers events correctly to reply",
assert.OnStore(replySinkName).MatchEvent(
test.HasData([]byte(expectedMsg)),
).AtLeast(1))

return f
}
16 changes: 15 additions & 1 deletion test/auth/oidc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"knative.dev/eventing/test/auth/features/oidc"
brokerfeatures "knative.dev/eventing/test/rekt/features/broker"
"knative.dev/eventing/test/rekt/features/channel"
containersource "knative.dev/eventing/test/rekt/features/containersource"
"knative.dev/eventing/test/rekt/features/containersource"
parallelfeatures "knative.dev/eventing/test/rekt/features/parallel"
sequencefeatures "knative.dev/eventing/test/rekt/features/sequence"
"knative.dev/eventing/test/rekt/resources/broker"
Expand Down Expand Up @@ -159,3 +159,17 @@ func TestContainerSourceSendsEventsWithOIDCSupport(t *testing.T) {

env.Test(ctx, t, containersource.SendsEventsWithSinkRefOIDC())
}

func TestSequenceSendsEventsWithOIDCSupport(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.TestSet(ctx, t, oidc.SequenceSendsEventWithOIDC())
}
79 changes: 79 additions & 0 deletions test/rekt/resources/sequence/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sequence
import (
"context"
"embed"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -112,6 +113,48 @@ func WithStep(ref *duckv1.KReference, uri string) manifest.CfgFn {
}
}

func WithStepFromDestination(dest *duckv1.Destination) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["steps"]; !set {
cfg["steps"] = []map[string]interface{}{}
}

step := map[string]interface{}{}

uri := dest.URI
ref := dest.Ref

if dest.CACerts != nil {
// This is a multi-line string and should be indented accordingly.
// Replace "new line" with "new line + spaces".
step["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ")
}

if dest.Audience != nil {
step["audience"] = *dest.Audience
}

if uri != nil {
step["uri"] = uri.String()
}
if ref != nil {
if _, set := step["ref"]; !set {
step["ref"] = map[string]interface{}{}
}
sref := step["ref"].(map[string]interface{})
sref["apiVersion"] = ref.APIVersion
sref["kind"] = ref.Kind
sref["namespace"] = ref.Namespace
sref["name"] = ref.Name
}

steps := cfg["steps"].([]map[string]interface{})
steps = append(steps, step)

cfg["steps"] = steps
}
}

// WithReply adds the top level reply config to a Parallel spec.
func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand All @@ -136,6 +179,42 @@ func WithReply(ref *duckv1.KReference, uri string) manifest.CfgFn {
}
}

func WithReplyFromDestination(dest *duckv1.Destination) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["reply"]; !set {
cfg["reply"] = map[string]interface{}{}
}
reply := cfg["reply"].(map[string]interface{})

uri := dest.URI
ref := dest.Ref

if dest.CACerts != nil {
// This is a multi-line string and should be indented accordingly.
// Replace "new line" with "new line + spaces".
reply["CACerts"] = strings.ReplaceAll(*dest.CACerts, "\n", "\n ")
}

if dest.Audience != nil {
reply["audience"] = *dest.Audience
}

if uri != nil {
reply["uri"] = uri.String()
}
if ref != nil {
if _, set := reply["ref"]; !set {
reply["ref"] = map[string]interface{}{}
}
rref := reply["ref"].(map[string]interface{})
rref["apiVersion"] = ref.APIVersion
rref["kind"] = ref.Kind
rref["namespace"] = ref.Namespace
rref["name"] = ref.Name
}
}
}

// WithChannelTemplate adds the top level channel references.
func WithChannelTemplate(template channel_template.ChannelTemplate) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand Down
14 changes: 14 additions & 0 deletions test/rekt/resources/sequence/sequence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ spec:
{{ if .uri }}
uri: {{ .uri }}
{{ end }}
{{ if .CACerts }}
CACerts: |-
{{ .CACerts }}
{{ end }}
{{ if .audience }}
audience: {{ .audience }}
{{ end }}
{{ end }}
{{ if .reply }}
reply:
Expand All @@ -64,4 +71,11 @@ spec:
{{ if .reply.uri }}
uri: {{ .reply.uri }}
{{ end }}
{{ if .reply.CACerts }}
CACerts: |-
{{ .reply.CACerts }}
{{ end }}
{{ if .reply.audience }}
audience: {{ .reply.audience }}
{{ end }}
{{ end }}
Loading