Skip to content

Commit

Permalink
fix: error check
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 21, 2025
1 parent d3ddd7c commit 4c53091
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
16 changes: 16 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ func (e *DLQError) Error() string {
return "DLQ indexed error"
}

func (e *DLQError) Unwrap() error {
if e.Err == nil {
return ErrDLQ
}

if errors.Is(e.Err, ErrDLQ) {
return e.Err
}

return errors.Join(e.Err, ErrDLQ)
}

func (e *DLQError) IsZero() bool {
return e.Err == nil && len(e.Indexes) == 0
}

func WrapErrDLQ(err error) *DLQError {
return &DLQError{Err: err}
}
Expand Down
15 changes: 15 additions & 0 deletions error_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wkafka

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -135,3 +136,17 @@ func Test_isDQLError(t *testing.T) {
})
}
}

func Test_isErr(t *testing.T) {
err := errors.New("testing")

checkErr := WrapErrDLQ(fmt.Errorf("some error: %w", err))

if !errors.Is(checkErr, err) {
t.Errorf("isErr() = %v, want %v", false, true)
}

if !errors.Is(checkErr, ErrDLQ) {
t.Errorf("isErr() = %v, want %v", false, true)
}
}
6 changes: 3 additions & 3 deletions producerdlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, rec
recordsSend := make([]*kgo.Record, 0, len(records))

for i, r := range records {
errOrg := err.Err
errStr := err.Error()
if len(err.Indexes) > 0 {
errIndex, ok := err.Indexes[i]
if !ok {
continue
}

if errIndex != nil {
errOrg = errIndex
errStr = errIndex.Error()
}
}

Expand All @@ -34,7 +34,7 @@ func producerDLQ(topic string, clientID []byte, fn func(ctx context.Context, rec
Headers: append(
r.Headers,
kgo.RecordHeader{Key: "process", Value: clientID},
kgo.RecordHeader{Key: "error", Value: []byte(errOrg.Error())},
kgo.RecordHeader{Key: "error", Value: []byte(errStr)},
kgo.RecordHeader{Key: "offset", Value: []byte(strconv.FormatInt(r.Offset, 10))},
kgo.RecordHeader{Key: "partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))},
kgo.RecordHeader{Key: "topic", Value: []byte(r.Topic)},
Expand Down

0 comments on commit 4c53091

Please sign in to comment.