Skip to content

Commit

Permalink
[release-1.11] Delivery failure (#1188)
Browse files Browse the repository at this point in the history
* Add knativeerrordest to the event extension when delivery failure

* Check if the extension is set, else set it. If not act like nothing happend

* Cleanup and requeue variable

* Set hedders in msg and requeue

* Lint

* Wrong place to set headers

* Forgot the status code

* Nack if there's a header

* Send event directly to RabbitMQ

* Handle error and send an event for it

* Add functions to handle DLQ in the new way

* Moved things around

* Added missing arg

* We should always Ack in dispatch to DLQ

* Confirm mode. Added missing DLX argument

* Fix for DLX

* Missed variable

* Initial testing work

* Improved tests

* Quick push

* now the failed messages are directly routed to the DLQ exchange defined in the trigger, if its not defined in the trigger then in the broker, and if neither is defined then they are not rerouted

* Update test. Still failing

* fixed tests and removed event nil case when binding.ToEvent fails cause its always going to be nil

* Added two simple test, to test the dlq function

* fixed unit tests

* this is just an smoke test for consumefromqueue function to see if it stops when a context cancel gets there

* added defered confirmation to connection mock

* Extended tests

* improving coverage

---------

Co-authored-by: Thomas Johansen <[email protected]>
Co-authored-by: Gabriel Freites <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2023
1 parent cc5ce6a commit 7031a4e
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 57 deletions.
19 changes: 18 additions & 1 deletion cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type envConfig struct {
BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"`
SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"`
SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"`
DLX bool `envconfig:"DLX" required:"false"`
DLXName string `envconfig:"DLX_NAME" required:"false"`

// Number of concurrent messages in flight
Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"`
Expand Down Expand Up @@ -97,11 +99,26 @@ func main() {
BackoffPolicy: backoffPolicy,
WorkerCount: env.Parallelism,
Reporter: reporter,
DLX: env.DLX,
DLXName: env.DLXName,
}

var err error
rmqHelper := rabbit.NewRabbitMQConnectionHandler(1000, logger)
rmqHelper.Setup(ctx, rabbit.VHostHandler(env.RabbitURL, env.RabbitMQVhost), rabbit.ChannelQoS, rabbit.DialWrapper)
rmqHelper.Setup(ctx, rabbit.VHostHandler(
env.RabbitURL,
env.RabbitMQVhost),
func(c rabbit.RabbitMQConnectionInterface, ch rabbit.RabbitMQChannelInterface) error {
if err := rabbit.ChannelQoS(c, ch); err != nil {
return err
}

if err := rabbit.ChannelConfirm(c, ch); err != nil {
return err
}
return nil
},
rabbit.DialWrapper)
for {
if err = d.ConsumeFromQueue(ctx, rmqHelper.GetConnection(), rmqHelper.GetChannel(), env.QueueName); err != nil {
if errors.Is(err, context.Canceled) {
Expand Down
170 changes: 146 additions & 24 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -55,6 +56,8 @@ type Dispatcher struct {
BackoffPolicy eventingduckv1.BackoffPolicyType
WorkerCount int
Reporter dispatcher.StatsReporter
DLX bool
DLXName string
}

// ConsumeFromQueue consumes messages from the given message channel and queue.
Expand Down Expand Up @@ -91,7 +94,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
}

logging.FromContext(ctx).Info("rabbitmq receiver started, exit with CTRL+C")
logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount))
logging.FromContext(ctx).Infow("starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount))

wg := &sync.WaitGroup{}
wg.Add(d.WorkerCount)
Expand All @@ -101,7 +104,11 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
go func() {
defer wg.Done()
for msg := range workerQueue {
d.dispatch(ctx, msg, ceClient)
if d.DLX {
_ = d.dispatchDLQ(ctx, msg, ceClient)
} else {
_ = d.dispatch(ctx, msg, ceClient, channel)
}
}
}()
}
Expand Down Expand Up @@ -142,25 +149,117 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
return httpResult.StatusCode, !retry
}
logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v", retriesResult.Result)
logging.FromContext(ctx).Warnf("invalid result type, not HTTP Result: %v", retriesResult.Result)
return -1, false
}

logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
logging.FromContext(ctx).Warnf("invalid result type, not RetriesResult")
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) error {
start := time.Now()
dlqExchange := d.DLXName

ctx, span := readSpan(ctx, msg)
defer span.End()

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warn("failed parsing event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

return fmt.Errorf("failed parsing event: %s", err)
}

if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, *event)
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("something happened: %v", err)
}
return
}

if !isSuccess {
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return fmt.Errorf("failed to deliver to %q", d.SubscriberURL)
} else if response != nil {
logging.FromContext(ctx).Infof("sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)
event.SetExtension("knativeerrordata", result)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL)
}
}

if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
return nil
}

// Defaulting to Ack as this always hits the DLQ.
func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) error {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return fmt.Errorf("failed creating event from delivery, err: %s", err)
}

ctx, span := readSpan(ctx, msg)
Expand All @@ -180,41 +279,64 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warnf("failed to deliver to %q %s", d.SubscriberURL, msg)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
return fmt.Errorf("failed to deliver to %q", d.SubscriberURL)
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL)
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
return nil
}

func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error {
// no dlq defined in the trigger nor the broker, return
if exchangeName == "" {
return nil
}

tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
dc, err := channel.PublishWithDeferredConfirm(
exchangeName,
"", // routing key
false, // mandatory
false, // immediate
*rabbit.CloudEventToRabbitMQMessage(event, tp, ts))
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

if ack := dc.Wait(); !ack {
return errors.New("failed to publish message: nacked")
}
return nil
}

func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) {
Expand Down
Loading

0 comments on commit 7031a4e

Please sign in to comment.