Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v1.15] backport upstream broker_deleted_recreated rewrite #1398

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 86 additions & 3 deletions test/rekt/features/broker_deleted_recreated.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@
package features

import (
"k8s.io/apimachinery/pkg/types"
"context"
"math"
"strconv"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/environment"

cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"knative.dev/eventing-kafka-broker/test/e2e_new/bogus_config"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
"knative.dev/reconciler-test/pkg/eventshub/assert"

"knative.dev/pkg/system"

Expand All @@ -36,15 +46,88 @@ import (
brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
)

func compose(steps ...feature.StepFn) feature.StepFn {
return func(ctx context.Context, t feature.T) {
for _, s := range steps {
s(ctx, t)
}
}
}

// BrokerDeletedRecreated tests that when a broker and trigger is deleted and re-created, the original sink will eventually stop receiving events
func BrokerDeletedRecreated() *feature.Feature {
f := feature.NewFeatureNamed("broker deleted and recreated")

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")

f.Setup("test broker", featuressteps.BrokerSmokeTest(brokerName, triggerName))
sink1 := feature.MakeRandomK8sName("asink")
sink2 := feature.MakeRandomK8sName("bsink")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

eventMatchers := []cetest.EventMatcher{
cetest.HasId(event.ID()),
cetest.HasSource(event.Source()),
cetest.HasType(event.Type()),
cetest.HasSubject(event.Subject()),
}

backoffPolicy := eventingduck.BackoffPolicyLinear

f.Setup("test broker", compose(
eventshub.Install(sink1, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink1), ""),
),
trigger.IsReady(triggerName),
eventshub.Install(
feature.MakeRandomK8sName("source"),
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.AddSequence,
eventshub.InputEvent(event),
// We want to send 1 event/s until the timeout
func(ctx context.Context, envs map[string]string) error {
_, timeout := environment.PollTimingsFromContext(ctx)
envs["PERIOD"] = "1" // in seconds
envs["MAX_MESSAGES"] = strconv.Itoa(int(math.Ceil(timeout.Seconds())))
return nil
},
),
assert.OnStore(sink1).MatchEvent(eventMatchers...).AtLeast(1),
))

f.Requirement("delete broker", featuressteps.DeleteBroker(brokerName))
f.Assert("test broker after deletion", featuressteps.BrokerSmokeTest(brokerName, triggerName))
f.Assert("test broker after deletion", compose(
eventshub.Install(sink2, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink2), ""),
),
trigger.IsReady(triggerName),
// We need to check both that
// 1. sink1 eventually stops receiving new events
// 2. sink2 eventually starts receiving all events
// therefore, we check that eventually, the last few events sent (16 for no particular reason) are all received by the sink2 only
// and contain an uninterrupted (without any missing sequence numbers) source sequence as sent by the source with eventshub.AddSequence
EventSequenceOnStores(sink1, sink2).
MatchingReceived(eventMatchers...). // ... when ...
OrderedBySourceSequence(). // ..., and taken the ...
LastN(16). // ... events, the sequence...
ContainsOnlyEventsObservedBy(sink2). // ...and...
IsAnUninterruptedSourceSequence().
Eventually(),
))

return f
}
Expand Down
251 changes: 251 additions & 0 deletions test/rekt/features/sequence_assertions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright 2024 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"cmp"
"context"
"fmt"

"knative.dev/reconciler-test/pkg/eventshub/assert"

"slices"

cetest "github.com/cloudevents/sdk-go/v2/test"
types2 "github.com/cloudevents/sdk-go/v2/types"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
)

type sequenceTransformationOrAssertion func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error)

type SequenceAssertionBuilder struct {
storeNames []string
matchers []eventshub.EventInfoMatcherCtx
transformsOrAssertions []sequenceTransformationOrAssertion
}

func getEventInfoSourceSequenceNumber(eventInfo eventshub.EventInfo) (int32, error) {
sequenceExtension, ok := eventInfo.Event.Extensions()["sequence"]
if !ok {
return 0, fmt.Errorf("event does not contain a sequence extension: %s", eventInfo.String())
}

sequenceNumber, err := types2.ToInteger(sequenceExtension)
if err != nil {
return 0, fmt.Errorf("event \"sequence\" extension value %q is not a number: %s", sequenceExtension, eventInfo.String())
}

return sequenceNumber, nil
}

func orderBySourceSequence(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
sorted := slices.Clone(events)

var conversionError error

slices.SortFunc(sorted, func(a, b eventshub.EventInfo) int {
var err error
var an, bn int32
an, err = getEventInfoSourceSequenceNumber(a)
if err != nil {
conversionError = err
return 0
}

bn, err = getEventInfoSourceSequenceNumber(b)
if err != nil {
conversionError = err
return 0
}

return cmp.Compare(an, bn)
})

if conversionError != nil {
return nil, conversionError
}

return sorted, nil
}

func reverse(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
reversed := slices.Clone(events)
slices.Reverse(reversed)
return reversed, nil
}

func firstN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[:n], nil
}

func lastN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[len(events)-n:], nil
}

// EventSequenceOnStores starts an assertion about sequence of events received by the given named stores
// The assertions are specially designed for checking sequences as generated by sources with eventshub.AddSequence
func EventSequenceOnStores(names ...string) SequenceAssertionBuilder {
return SequenceAssertionBuilder{
storeNames: names,
}
}

func (b SequenceAssertionBuilder) MatchingReceived(matchers ...cetest.EventMatcher) SequenceAssertionBuilder {
b.matchers = append(b.matchers, assert.MatchKind(eventshub.EventReceived).WithContext())
b.matchers = append(b.matchers, assert.MatchEvent(matchers...).WithContext())
return b
}

func (b SequenceAssertionBuilder) Reversed() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, reverse)
return b
}

func (b SequenceAssertionBuilder) OrderedBySourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, orderBySourceSequence)
return b
}

func (b SequenceAssertionBuilder) FirstN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return firstN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) LastN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return lastN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnly(matcher eventshub.EventInfoMatcher) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
for _, event := range events {
err := matcher(event)
if err != nil {
return nil, err
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnlyEventsObservedBy(observerName string) SequenceAssertionBuilder {
return b.ContainsOnly(func(info eventshub.EventInfo) error {
if info.Observer != observerName {
return fmt.Errorf("expected observer to be %s, got %s", observerName, info.Observer)
}
return nil
})
}

func (b SequenceAssertionBuilder) IsAnUninterruptedSourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) == 0 {
return nil, fmt.Errorf("no events received")
}

firstSequenceNumber, err := getEventInfoSourceSequenceNumber(events[0])
if err != nil {
return nil, err
}

expectedSequenceNumber := firstSequenceNumber - 1
for _, event := range events {
expectedSequenceNumber++

sequenceNumber, err := getEventInfoSourceSequenceNumber(event)
if err != nil {
return nil, err
}

if sequenceNumber != expectedSequenceNumber {
return nil, fmt.Errorf("expected sequence number %d, got %d", expectedSequenceNumber, sequenceNumber)
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) Eventually() feature.StepFn {
return func(ctx context.Context, t feature.T) {
retryInterval, retryTimeout := environment.PollTimingsFromContext(ctx)

var internalErr error

err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) {
events := make([]eventshub.EventInfo, 0)
for _, storeName := range b.storeNames {
store := eventshub.StoreFromContext(ctx, storeName)

storeEvents, _, _, err := store.Find(func(info eventshub.EventInfo) error {
for _, matcher := range b.matchers {
err := matcher.WithContext(ctx)(info)
if err != nil {
return err
}
}
return nil
})

if err != nil {
internalErr = err
return false, nil
}

events = append(events, storeEvents...)
}

for _, transformOrAssertion := range b.transformsOrAssertions {
var err error
events, err = transformOrAssertion(ctx, events)
if err != nil {
internalErr = err
return false, nil
}
}

internalErr = nil
return true, nil
})

if internalErr != nil {
t.Fatal(internalErr)
}

if err != nil {
t.Fatal(err)
}
}
}