Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use async handler #7415

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Subscription struct {
// Config for a fanout.EventHandler.
type Config struct {
Subscriptions []Subscription `json:"subscriptions"`
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
}
Expand Down Expand Up @@ -240,6 +240,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch
reportArgs := channel.ReportArgs{}
reportArgs.EventType = event.Type()
reportArgs.Ns = ref.Namespace
additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace)
dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders)
return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
Expand Down Expand Up @@ -302,7 +303,6 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
if dispatchResult.err != nil {
f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err))
dispatchResultForFanout.err = dispatchResult.err
return dispatchResultForFanout
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierDipi I think this early return may cause a race condition in the synchronous case, as it may lead to the context being cancelled (request completes) before all of the dead letter sink requests have been made if there is more than one error.

However, this means that we won't aggregate the errors and are only showing the final error. Should we add some kind of error aggregation here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error aggregation sounds good, we could track it into a separate issue

}
case <-time.After(f.timeout):
f.logger.Error("Fanout timed out")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.C
HostName: imc.Status.Address.URL.Host,
Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name),
FanoutConfig: fanout.Config{
AsyncHandler: true,
AsyncHandler: false,
Subscriptions: subs,
},
}, nil
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func TestBrokerConformance(t *testing.T) {

// Install and wait for a Ready Broker.
env.Prerequisite(ctx, t, broker.GoesReady("default", b.WithEnvConfig()...))
env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...))
env.TestSet(ctx, t, broker.DataPlaneConformance("default"))
env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierDipi changing the order of these tests caused the DataPlaneConformance tests to pass 😕

}

func TestBrokerDefaultDelivery(t *testing.T) {
Expand Down