From 4c5309192be8c07da8325d67f76b9864cc9255bb Mon Sep 17 00:00:00 2001 From: Eray Ates Date: Tue, 21 Jan 2025 17:05:26 +0100 Subject: [PATCH] fix: error check Signed-off-by: Eray Ates --- error.go | 16 ++++++++++++++++ error_test.go | 15 +++++++++++++++ producerdlq.go | 6 +++--- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/error.go b/error.go index 719c0d2..830f5c3 100644 --- a/error.go +++ b/error.go @@ -41,6 +41,22 @@ func (e *DLQError) Error() string { return "DLQ indexed error" } +func (e *DLQError) Unwrap() error { + if e.Err == nil { + return ErrDLQ + } + + if errors.Is(e.Err, ErrDLQ) { + return e.Err + } + + return errors.Join(e.Err, ErrDLQ) +} + +func (e *DLQError) IsZero() bool { + return e.Err == nil && len(e.Indexes) == 0 +} + func WrapErrDLQ(err error) *DLQError { return &DLQError{Err: err} } diff --git a/error_test.go b/error_test.go index 930fe9c..fa76201 100644 --- a/error_test.go +++ b/error_test.go @@ -1,6 +1,7 @@ package wkafka import ( + "errors" "fmt" "testing" @@ -135,3 +136,17 @@ func Test_isDQLError(t *testing.T) { }) } } + +func Test_isErr(t *testing.T) { + err := errors.New("testing") + + checkErr := WrapErrDLQ(fmt.Errorf("some error: %w", err)) + + if !errors.Is(checkErr, err) { + t.Errorf("isErr() = %v, want %v", false, true) + } + + if !errors.Is(checkErr, ErrDLQ) { + t.Errorf("isErr() = %v, want %v", false, true) + } +} diff --git a/producerdlq.go b/producerdlq.go index e4f593f..81cc510 100644 --- a/producerdlq.go +++ b/producerdlq.go @@ -15,7 +15,7 @@ func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, rec recordsSend := make([]*kgo.Record, 0, len(records)) for i, r := range records { - errOrg := err.Err + errStr := err.Error() if len(err.Indexes) > 0 { errIndex, ok := err.Indexes[i] if !ok { @@ -23,7 +23,7 @@ func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, rec } if errIndex != nil { - errOrg = errIndex + errStr = errIndex.Error() } } @@ -34,7 +34,7 @@ func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, rec Headers: append( r.Headers, kgo.RecordHeader{Key: "process", Value: clientID}, - kgo.RecordHeader{Key: "error", Value: []byte(errOrg.Error())}, + kgo.RecordHeader{Key: "error", Value: []byte(errStr)}, kgo.RecordHeader{Key: "offset", Value: []byte(strconv.FormatInt(r.Offset, 10))}, kgo.RecordHeader{Key: "partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))}, kgo.RecordHeader{Key: "topic", Value: []byte(r.Topic)},