Skip to content

Commit

Permalink
Adding the rekt test for pingsource with broker as sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo6Leo committed Oct 27, 2023
1 parent 0684dbe commit 3c67516
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
75 changes: 75 additions & 0 deletions test/rekt/features/pingsource/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package pingsource

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/util/sets"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/eventtype"
"knative.dev/eventing/test/rekt/resources/trigger"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/pod"

"github.com/cloudevents/sdk-go/v2/test"
"knative.dev/reconciler-test/pkg/environment"
Expand All @@ -42,6 +45,10 @@ import (
"knative.dev/eventing/test/rekt/resources/pingsource"
)

const (
exampleImage = "ko://knative.dev/eventing/test/test_images/print"
)

func SendsEventsWithSinkRef() *feature.Feature {
source := feature.MakeRandomK8sName("pingsource")
sink := feature.MakeRandomK8sName("sink")
Expand Down Expand Up @@ -191,3 +198,71 @@ func SendsEventsWithEventTypes() *feature.Feature {

return f
}

func SendsEventsWithBrokerAsSinkTLS() *feature.Feature {
src := feature.MakeRandomK8sName("pingsource")
brokerName := feature.MakeRandomK8sName("broker")
sinkName := feature.MakeRandomK8sName("sink")
triggerName := feature.MakeRandomK8sName("trigger")
f := feature.NewFeature()

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))
f.Setup("Broker has HTTPS address", broker.ValidateAddress(brokerName, addressable.AssertHTTPSAddress))

f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiverTLS))

f.Setup("install trigger", func(ctx context.Context, t feature.T) {
d := service.AsDestinationRef(sinkName)
d.CACerts = eventshub.GetCaCerts(ctx)
trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t)
})
f.Setup("Wait for Trigger to become ready", trigger.IsReady(triggerName))

f.Requirement("install PingSource", func(ctx context.Context, t feature.T) {
d := broker.AsDestinationRef(brokerName)

brokerAddr, err := broker.Address(ctx, brokerName)
if err != nil {
t.Fatal("failed to get the address of the broker service", brokerName, err)
}

d.CACerts = brokerAddr.CACerts

cfg := []manifest.CfgFn{
pingsource.WithSink(&duckv1.Destination{URI: brokerAddr.URL, CACerts: d.CACerts}),
pingsource.WithData("text/plain", "hello, world!"),
}

pingsource.Install(src, cfg...)(ctx, t)
})

f.Requirement("PingSource goes ready", pingsource.IsReady(src))

examplePodName := feature.MakeRandomK8sName("example")

// create a pod so that PingSource delivers an event to its sink
// event body is similar to this:
// {"kind":"Pod","namespace":"test-wmbcixlv","name":"example-axvlzbvc","apiVersion":"v1"}
f.Requirement("install example pod",
pod.Install(examplePodName, exampleImage,
pod.WithLabels(map[string]string{"e2e": "testing"})),
)

f.Stable("PingSource as event source").
Must("delivers events",
assert.OnStore(sinkName).MatchEvent(test.HasType("dev.knative.sources.ping")).AtLeast(1),
)
eventassert.MatchEvent(
test.HasType("dev.knative.ping.ref.add"),
test.DataContains(`"kind":"Pod"`),
test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)),
)

return f

}
17 changes: 17 additions & 0 deletions test/rekt/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package rekt

import (
"testing"
"time"

"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
Expand Down Expand Up @@ -103,3 +104,19 @@ func TestPingSourceWithSecondsInSchedule(t *testing.T) {

env.Test(ctx, t, pingsource.SendsEventsWithSecondsInSchedule())
}

func TestPingSourceDataPlane_BrokerAsSinkTLS(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
environment.WithPollTimings(5*time.Second, 2*time.Minute),
)

env.Test(ctx, t, pingsource.SendsEventsWithBrokerAsSinkTLS())
}

0 comments on commit 3c67516

Please sign in to comment.