Skip to content

Commit

Permalink
fix: retry DLQ
Browse files Browse the repository at this point in the history
Signed-off-by: Eray Ates <[email protected]>
  • Loading branch information
rytsh committed Jan 16, 2025
1 parent 68592cf commit 8dde48b
Show file tree
Hide file tree
Showing 18 changed files with 362 additions and 157 deletions.
45 changes: 27 additions & 18 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ type Client struct {
consumerMutex sync.RWMutex
logger Logger

dlqRecord *kgo.Record
dlqCheckTrigger func()
dlqRetryAt time.Time
hook *hooker
cancel context.CancelFunc
trigger []func(context.Context)
dlqRecord DLQRecord
dlqRetryTrigger func(opts []OptionDLQTriggerFn)
dlqMutex sync.RWMutex

hook *hooker
cancel context.CancelFunc
trigger []func(context.Context)

topicsCheck []string
appName string
Expand Down Expand Up @@ -388,8 +389,8 @@ func (c *Client) Skip(ctx context.Context, modify func(SkipMap) SkipMap) {
c.modifySkip(modify)

c.callTrigger(ctx)
if c.dlqCheckTrigger != nil {
c.dlqCheckTrigger()
if c.dlqRetryTrigger != nil {
c.dlqRetryTrigger(nil)
}

c.logger.Info("wkafka skip modified", "skip", c.consumerConfig.Skip)
Expand All @@ -409,23 +410,31 @@ func (c *Client) ClientID() []byte {

// setDLQRecord to set stucked DLQRecord.
// - Using in DLQ iteration.
func (c *Client) setDLQRecord(r *kgo.Record, t time.Time) {
c.dlqRecord = r
c.dlqRetryAt = t
}
func (c *Client) setDLQRecord(r *kgo.Record, t time.Time, err error) {
c.dlqMutex.Lock()
defer c.dlqMutex.Unlock()

// DLQRetryAt returns stucked DLQRecord's retry time.
// - Using in DLQ iteration only if DLQRecord is not nil.
func (c *Client) DLQRetryAt() time.Time {
return c.dlqRetryAt
c.dlqRecord.Err = err
c.dlqRecord.Record = r
c.dlqRecord.RetryAt = t
}

// DLQRecord returns stucked DLQRecord if exists.
// - Warning: return pointer and not modify it.
func (c *Client) DLQRecord() *kgo.Record {
// - Warning: return pointer record and not modify it.
func (c *Client) DLQRecord() DLQRecord {
c.dlqMutex.RLock()
defer c.dlqMutex.RUnlock()

return c.dlqRecord
}

// DLQRetry to trigger DLQ retry and not wait sleep.
func (c *Client) DLQRetry(opts ...OptionDLQTriggerFn) {
if c.dlqRetryTrigger != nil {
c.dlqRetryTrigger(opts)
}
}

func (c *Client) callTrigger(ctx context.Context) {
go func() {
for _, t := range c.trigger {
Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
DLQProcess: processDLQ,
}

o.Client.dlqCheckTrigger = processDLQ.Trigger
o.Client.dlqRetryTrigger = processDLQ.Trigger

return nil
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
DLQProcess: processDLQ,
}

o.Client.dlqCheckTrigger = processDLQ.Trigger
o.Client.dlqRetryTrigger = processDLQ.Trigger

return nil
}
Expand Down
75 changes: 66 additions & 9 deletions dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,25 @@ import (
type DLQRecord struct {
Record *kgo.Record
RetryAt time.Time
Err error
}

type dlqProcess[T any] struct {
customer *customer[T]

isRevokedRecord func(r *kgo.Record) bool
setDLQRecord func(r *kgo.Record, t time.Time)
setDLQRecord func(r *kgo.Record, t time.Time, err error)
callTrigger func(ctx context.Context)
processDLQ func(ctx context.Context, msg T) error

checkFunc func()
checkFunc func(opts []OptionDLQTriggerFn)
checkFuncMutex sync.Mutex
}

func newDLQProcess[T any](
c *customer[T],
isRevokedRecord func(r *kgo.Record) bool,
dlqRecord func(r *kgo.Record, t time.Time),
dlqRecord func(r *kgo.Record, t time.Time, err error),
callTrigger func(ctx context.Context),
processDLQ func(ctx context.Context, msg T) error,
) *dlqProcess[T] {
Expand Down Expand Up @@ -86,7 +87,7 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {

firstIteration := true
defer func() {
d.setDLQRecord(nil, time.Time{})
d.setDLQRecord(nil, time.Time{}, nil)
d.setCheckFunc(nil)
d.callTrigger(ctx)
}()
Expand All @@ -105,18 +106,40 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
if err := d.iterationRecordDLQ(ctx, r); err != nil {
errOrg, ok := IsDQLError(err)
var errWrapped error
var errOrgDefault error
if ok {
errOrgDefault = errOrg.Err
errWrapped = wrapErr(r, errOrg.Err, true)
} else {
errOrgDefault = err
errWrapped = wrapErr(r, err, true)
}

d.customer.Logger.Error("DLQ process failed", "error", errWrapped, "retry_interval", wait.CurrentInterval().Truncate(time.Second).String())

d.setDLQRecord(r, time.Now().Add(wait.CurrentInterval()))
d.setDLQRecord(r, time.Now().Add(wait.CurrentInterval()), errOrgDefault)

if firstIteration {
d.setCheckFunc(func() {
d.setCheckFunc(func(opts []OptionDLQTriggerFn) {
o := &OptionDLQTrigger{}
for _, opt := range opts {
opt(o)
}

if o.Force {
wait.Trigger()

return
}

if o.Specs != nil {
if r.Topic == o.Specs.Topic && r.Partition == o.Specs.Partition && r.Offset == o.Specs.Offset {
wait.Trigger()

return
}
}

if d.customer.Skip(d.customer.Cfg, r) {
wait.Trigger()
}
Expand All @@ -140,18 +163,52 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
return nil
}

func (d *dlqProcess[T]) Trigger() {
func (d *dlqProcess[T]) Trigger(opts []OptionDLQTriggerFn) {
d.checkFuncMutex.Lock()
defer d.checkFuncMutex.Unlock()

if d.checkFunc != nil {
d.checkFunc()
d.checkFunc(opts)
}
}

func (d *dlqProcess[T]) setCheckFunc(fn func()) {
func (d *dlqProcess[T]) setCheckFunc(fn func(opts []OptionDLQTriggerFn)) {
d.checkFuncMutex.Lock()
defer d.checkFuncMutex.Unlock()

d.checkFunc = fn
}

// ////////////////////////////////////////////////////////////////////////////

type DLQTriggerSpecs struct {
Topic string `cfg:"topic" json:"topic"`
Partition int32 `cfg:"partition" json:"partition"`
Offset int64 `cfg:"offset" json:"offset"`
}

type OptionDLQTrigger struct {
Force bool `cfg:"force" json:"force"`
Specs *DLQTriggerSpecs `cfg:"specs" json:"specs"`
}

func (o *OptionDLQTrigger) ToOption() OptionDLQTriggerFn {
return func(opt *OptionDLQTrigger) {
opt.Force = o.Force
opt.Specs = o.Specs
}
}

type OptionDLQTriggerFn func(*OptionDLQTrigger)

func WithForceDLQTrigger() OptionDLQTriggerFn {
return func(o *OptionDLQTrigger) {
o.Force = true
}
}

func WithDLQTriggerSpecs(specs *DLQTriggerSpecs) OptionDLQTriggerFn {
return func(o *OptionDLQTrigger) {
o.Specs = specs
}
}

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions plugins/handler/_ui/dist/assets/index-COF3pdLZ.css

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion plugins/handler/_ui/dist/assets/index-Cd4AzzrC.css

This file was deleted.

2 changes: 1 addition & 1 deletion plugins/handler/_ui/dist/index.html
Original file line number Diff line number Diff line change
@@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/><link rel="icon" type="image/svg+xml" href="./logo.svg"/><title>wkafka</title><script type="module" crossorigin src="./assets/index-CnR-J8Dz.js"></script><link rel="stylesheet" crossorigin href="./assets/index-Cd4AzzrC.css"></head><body></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/><link rel="icon" type="image/svg+xml" href="./logo.svg"/><title>wkafka</title><script type="module" crossorigin src="./assets/index-Bv-H8zf2.js"></script><link rel="stylesheet" crossorigin href="./assets/index-COF3pdLZ.css"></head><body></body></html>
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
<script lang="ts">
import { skip } from "@/helper/api";
import { retry, skip } from "@/helper/api";
const skipFunc = () => {
skip(topic, partition, offset);
};
const retryFunc = () => {
retry(topic, partition, offset);
};
export let topic: string;
export let partition: number;
export let offset: number;
Expand All @@ -16,3 +20,10 @@
>
Skip
</button>

<button
class="bg-gray-600 hover:bg-yellow-500 text-white font-bold px-10 border-t border-b border-black"
on:click|preventDefault|stopPropagation={retryFunc}
>
Retry
</button>
2 changes: 2 additions & 0 deletions plugins/handler/_ui/src/components/Counter.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
});
const startTimer = (dt: string) => {
clearInterval(timer);
let end = Date.parse(dt);
timer = countDownTimer(end, (v: string) => {
Expand Down
8 changes: 5 additions & 3 deletions plugins/handler/_ui/src/components/Info.svelte
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<script lang="ts">
import { storeInfo } from "@/store/store";
import TreeView from "svelte-tree-view";
import Skip from "./Skip.svelte";
import Action from "./Action.svelte";
import { getField, getFieldWithDecode } from "@/helper/codec";
import View from "./View.svelte";
import type { Info } from "@/store/model";
Expand Down Expand Up @@ -66,13 +66,15 @@
<View value64={value.dlq_record.value} title="Record" />
<View
value64={getField("error", value.dlq_record.headers)}
title="Error"
title="Record Error"
/>
<hr class="my-2" />
<View value={value.error} title="Process Error" />
</div>
</div>
<div class="flex flex-row justify-between items-baseline">
<div class="mt-2">
<Skip
<Action
topic={value.dlq_record.topic}
partition={value.dlq_record.partition}
offset={value.dlq_record.offset}
Expand Down
11 changes: 0 additions & 11 deletions plugins/handler/_ui/src/components/Record.svelte

This file was deleted.

7 changes: 5 additions & 2 deletions plugins/handler/_ui/src/components/View.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
export let title = "";
export let value64 = "";
export let value = "";
let toggle = false;
let value = "";
let jsonValue: unknown;
let className = "";
export { className as class };
onMount(() => {
value = base64ToStr(value64);
if (value64 != "") {
value = base64ToStr(value64);
}
jsonValue = parseJSON(value);
if (!!jsonValue) {
Expand Down
23 changes: 23 additions & 0 deletions plugins/handler/_ui/src/helper/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import axios from 'axios';
const endpoints = {
info: '../v1/info',
skip: '../v1/skip',
retryDLQ: '../v1/retry-dlq',
event: '../v1/event',
}

Expand Down Expand Up @@ -59,3 +60,25 @@ export const skip = async (topic: string, partition: number, offset: number) =>
addToast('skip request failed', 'alert');
}
}


export const retry = async (topic: string, partition: number, offset: number) => {
try {
await axios.post(endpoints.retryDLQ, {
"specs": {
"topic": topic,
"partition": partition,
"offset": offset
}
}, {
headers: {
'Content-Type': 'application/json'
}
});

addToast('retry request sent');
} catch (error) {
console.error(error);
addToast('retry request failed', 'alert');
}
}
1 change: 1 addition & 0 deletions plugins/handler/_ui/src/store/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface Info {
skip?: Map<string, Map<number, OffsetConfig>>;
dlq_record?: DlqRecord;
retry_at?: string;
error?: string;

updated_at: number;
}
Expand Down
Loading

0 comments on commit 8dde48b

Please sign in to comment.