diff --git a/test/rekt/features/broker_deleted_recreated.go b/test/rekt/features/broker_deleted_recreated.go index 07d070b076..d6465f7396 100644 --- a/test/rekt/features/broker_deleted_recreated.go +++ b/test/rekt/features/broker_deleted_recreated.go @@ -17,10 +17,20 @@ package features import ( - "k8s.io/apimachinery/pkg/types" + "context" + "math" + "strconv" + + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "knative.dev/eventing-kafka-broker/test/e2e_new/bogus_config" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" + "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/pkg/system" @@ -36,15 +46,88 @@ import ( brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" ) +func compose(steps ...feature.StepFn) feature.StepFn { + return func(ctx context.Context, t feature.T) { + for _, s := range steps { + s(ctx, t) + } + } +} + +// BrokerDeletedRecreated tests that when a broker and trigger is deleted and re-created, the original sink will eventually stop receiving events func BrokerDeletedRecreated() *feature.Feature { f := feature.NewFeatureNamed("broker deleted and recreated") brokerName := feature.MakeRandomK8sName("broker") triggerName := feature.MakeRandomK8sName("trigger") - f.Setup("test broker", featuressteps.BrokerSmokeTest(brokerName, triggerName)) + sink1 := feature.MakeRandomK8sName("asink") + sink2 := feature.MakeRandomK8sName("bsink") + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + eventMatchers := []cetest.EventMatcher{ + cetest.HasId(event.ID()), + cetest.HasSource(event.Source()), + cetest.HasType(event.Type()), + cetest.HasSubject(event.Subject()), + } + + backoffPolicy := eventingduck.BackoffPolicyLinear + + f.Setup("test broker", compose( + eventshub.Install(sink1, eventshub.StartReceiver), + broker.Install(brokerName, broker.WithEnvConfig()...), + broker.IsReady(brokerName), + trigger.Install( + triggerName, + trigger.WithBrokerName(brokerName), + trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")), + trigger.WithSubscriber(service.AsKReference(sink1), ""), + ), + trigger.IsReady(triggerName), + eventshub.Install( + feature.MakeRandomK8sName("source"), + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.AddSequence, + eventshub.InputEvent(event), + // We want to send 1 event/s until the timeout + func(ctx context.Context, envs map[string]string) error { + _, timeout := environment.PollTimingsFromContext(ctx) + envs["PERIOD"] = "1" // in seconds + envs["MAX_MESSAGES"] = strconv.Itoa(int(math.Ceil(timeout.Seconds()))) + return nil + }, + ), + assert.OnStore(sink1).MatchEvent(eventMatchers...).AtLeast(1), + )) + f.Requirement("delete broker", featuressteps.DeleteBroker(brokerName)) - f.Assert("test broker after deletion", featuressteps.BrokerSmokeTest(brokerName, triggerName)) + f.Assert("test broker after deletion", compose( + eventshub.Install(sink2, eventshub.StartReceiver), + broker.Install(brokerName, broker.WithEnvConfig()...), + broker.IsReady(brokerName), + trigger.Install( + triggerName, + trigger.WithBrokerName(brokerName), + trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")), + trigger.WithSubscriber(service.AsKReference(sink2), ""), + ), + trigger.IsReady(triggerName), + // We need to check both that + // 1. sink1 eventually stops receiving new events + // 2. sink2 eventually starts receiving all events + // therefore, we check that eventually, the last few events sent (16 for no particular reason) are all received by the sink2 only + // and contain an uninterrupted (without any missing sequence numbers) source sequence as sent by the source with eventshub.AddSequence + EventSequenceOnStores(sink1, sink2). + MatchingReceived(eventMatchers...). // ... when ... + OrderedBySourceSequence(). // ..., and taken the ... + LastN(16). // ... events, the sequence... + ContainsOnlyEventsObservedBy(sink2). // ...and... + IsAnUninterruptedSourceSequence(). + Eventually(), + )) return f } diff --git a/test/rekt/features/sequence_assertions.go b/test/rekt/features/sequence_assertions.go new file mode 100644 index 0000000000..6862afb5e8 --- /dev/null +++ b/test/rekt/features/sequence_assertions.go @@ -0,0 +1,251 @@ +/* + * Copyright 2024 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + "cmp" + "context" + "fmt" + + "knative.dev/reconciler-test/pkg/eventshub/assert" + + "slices" + + cetest "github.com/cloudevents/sdk-go/v2/test" + types2 "github.com/cloudevents/sdk-go/v2/types" + "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" +) + +type sequenceTransformationOrAssertion func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) + +type SequenceAssertionBuilder struct { + storeNames []string + matchers []eventshub.EventInfoMatcherCtx + transformsOrAssertions []sequenceTransformationOrAssertion +} + +func getEventInfoSourceSequenceNumber(eventInfo eventshub.EventInfo) (int32, error) { + sequenceExtension, ok := eventInfo.Event.Extensions()["sequence"] + if !ok { + return 0, fmt.Errorf("event does not contain a sequence extension: %s", eventInfo.String()) + } + + sequenceNumber, err := types2.ToInteger(sequenceExtension) + if err != nil { + return 0, fmt.Errorf("event \"sequence\" extension value %q is not a number: %s", sequenceExtension, eventInfo.String()) + } + + return sequenceNumber, nil +} + +func orderBySourceSequence(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + sorted := slices.Clone(events) + + var conversionError error + + slices.SortFunc(sorted, func(a, b eventshub.EventInfo) int { + var err error + var an, bn int32 + an, err = getEventInfoSourceSequenceNumber(a) + if err != nil { + conversionError = err + return 0 + } + + bn, err = getEventInfoSourceSequenceNumber(b) + if err != nil { + conversionError = err + return 0 + } + + return cmp.Compare(an, bn) + }) + + if conversionError != nil { + return nil, conversionError + } + + return sorted, nil +} + +func reverse(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + reversed := slices.Clone(events) + slices.Reverse(reversed) + return reversed, nil +} + +func firstN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + if len(events) < n { + return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events)) + } + + return events[:n], nil +} + +func lastN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + if len(events) < n { + return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events)) + } + + return events[len(events)-n:], nil +} + +// EventSequenceOnStores starts an assertion about sequence of events received by the given named stores +// The assertions are specially designed for checking sequences as generated by sources with eventshub.AddSequence +func EventSequenceOnStores(names ...string) SequenceAssertionBuilder { + return SequenceAssertionBuilder{ + storeNames: names, + } +} + +func (b SequenceAssertionBuilder) MatchingReceived(matchers ...cetest.EventMatcher) SequenceAssertionBuilder { + b.matchers = append(b.matchers, assert.MatchKind(eventshub.EventReceived).WithContext()) + b.matchers = append(b.matchers, assert.MatchEvent(matchers...).WithContext()) + return b +} + +func (b SequenceAssertionBuilder) Reversed() SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, reverse) + return b +} + +func (b SequenceAssertionBuilder) OrderedBySourceSequence() SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, orderBySourceSequence) + return b +} + +func (b SequenceAssertionBuilder) FirstN(n int) SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + return firstN(n, events) + }) + return b +} + +func (b SequenceAssertionBuilder) LastN(n int) SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + return lastN(n, events) + }) + return b +} + +func (b SequenceAssertionBuilder) ContainsOnly(matcher eventshub.EventInfoMatcher) SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + for _, event := range events { + err := matcher(event) + if err != nil { + return nil, err + } + } + + return events, nil + }) + return b +} + +func (b SequenceAssertionBuilder) ContainsOnlyEventsObservedBy(observerName string) SequenceAssertionBuilder { + return b.ContainsOnly(func(info eventshub.EventInfo) error { + if info.Observer != observerName { + return fmt.Errorf("expected observer to be %s, got %s", observerName, info.Observer) + } + return nil + }) +} + +func (b SequenceAssertionBuilder) IsAnUninterruptedSourceSequence() SequenceAssertionBuilder { + b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) { + if len(events) == 0 { + return nil, fmt.Errorf("no events received") + } + + firstSequenceNumber, err := getEventInfoSourceSequenceNumber(events[0]) + if err != nil { + return nil, err + } + + expectedSequenceNumber := firstSequenceNumber - 1 + for _, event := range events { + expectedSequenceNumber++ + + sequenceNumber, err := getEventInfoSourceSequenceNumber(event) + if err != nil { + return nil, err + } + + if sequenceNumber != expectedSequenceNumber { + return nil, fmt.Errorf("expected sequence number %d, got %d", expectedSequenceNumber, sequenceNumber) + } + } + + return events, nil + }) + return b +} + +func (b SequenceAssertionBuilder) Eventually() feature.StepFn { + return func(ctx context.Context, t feature.T) { + retryInterval, retryTimeout := environment.PollTimingsFromContext(ctx) + + var internalErr error + + err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) { + events := make([]eventshub.EventInfo, 0) + for _, storeName := range b.storeNames { + store := eventshub.StoreFromContext(ctx, storeName) + + storeEvents, _, _, err := store.Find(func(info eventshub.EventInfo) error { + for _, matcher := range b.matchers { + err := matcher.WithContext(ctx)(info) + if err != nil { + return err + } + } + return nil + }) + + if err != nil { + internalErr = err + return false, nil + } + + events = append(events, storeEvents...) + } + + for _, transformOrAssertion := range b.transformsOrAssertions { + var err error + events, err = transformOrAssertion(ctx, events) + if err != nil { + internalErr = err + return false, nil + } + } + + internalErr = nil + return true, nil + }) + + if internalErr != nil { + t.Fatal(internalErr) + } + + if err != nil { + t.Fatal(err) + } + } +}