From 2499f30faebce0211d4a3d8de1210454369c8291 Mon Sep 17 00:00:00 2001 From: fanjindong <765912710@qq.com> Date: Fri, 10 Mar 2023 10:58:23 +0800 Subject: [PATCH 1/2] feat: context add MsgId --- broker/amqp/amqp.go | 7 ++++--- broker/pulsar/pulsar.go | 9 ++++++--- broker/pulsar/pulsar_test.go | 26 +++++++++++++++----------- broker/rocketmq/rocket.go | 7 ++++--- example/handler.go | 2 +- message/message.go | 5 +++++ 6 files changed, 35 insertions(+), 21 deletions(-) diff --git a/broker/amqp/amqp.go b/broker/amqp/amqp.go index 9ce77f3..5ac3edc 100644 --- a/broker/amqp/amqp.go +++ b/broker/amqp/amqp.go @@ -172,7 +172,7 @@ func (b *Broker) watch(ctx context.Context) { <-seat wg.Add(1) go func() { - _ = b.process(ctx, data.Body) + _ = b.process(ctx, data) _ = data.Ack(false) wg.Done() seat <- struct{}{} @@ -182,12 +182,13 @@ func (b *Broker) watch(ctx context.Context) { }() } -func (b *Broker) process(ctx context.Context, data []byte) error { - msg, err := b.config.Codec.Decode(data) +func (b *Broker) process(ctx context.Context, message amqp.Delivery) error { + msg, err := b.config.Codec.Decode(message.Body) if err != nil { b.config.Logger.Errorf("process unknown data: %s", err) return err } + msg.SetMsgId(message.MessageId) handler, ok := b.Router(msg.GetName()) if !ok { b.config.Logger.Warningf("process unknown name: %s", msg.GetName()) diff --git a/broker/pulsar/pulsar.go b/broker/pulsar/pulsar.go index 9045dc1..01d039c 100644 --- a/broker/pulsar/pulsar.go +++ b/broker/pulsar/pulsar.go @@ -2,6 +2,7 @@ package pulsar import ( "context" + "fmt" "github.com/apache/pulsar-client-go/pulsar" plog "github.com/apache/pulsar-client-go/pulsar/log" "github.com/bee-org/bee" @@ -213,7 +214,7 @@ func (b *Broker) watch() { defer wg.Done() defer func() { seat <- struct{}{} }() msg := data.Message - if err := b.handler(b.Ctx(), msg.Payload()); err != nil { + if err := b.handler(b.Ctx(), msg); err != nil { b.consumer.NackID(msg.ID()) return } @@ -224,12 +225,14 @@ func (b *Broker) watch() { }() } -func (b *Broker) handler(ctx context.Context, data []byte) error { - msg, err := b.config.Codec.Decode(data) +func (b *Broker) handler(ctx context.Context, message pulsar.Message) error { + msg, err := b.config.Codec.Decode(message.Payload()) if err != nil { b.config.Logger.Errorf("process unknown data: %s", err) return err } + msgId := message.ID() + msg.SetMsgId(fmt.Sprintf("%d:%d:%d", msgId.LedgerID(), msgId.EntryID(), msgId.PartitionIdx())) handler, ok := b.Router(msg.GetName()) if !ok { b.config.Logger.Warningf("process unknown name: %s", msg.GetName()) diff --git a/broker/pulsar/pulsar_test.go b/broker/pulsar/pulsar_test.go index 741f593..8ec5ade 100644 --- a/broker/pulsar/pulsar_test.go +++ b/broker/pulsar/pulsar_test.go @@ -18,7 +18,7 @@ var ( err error ) -func TestMain(m *testing.M) { +func initBroker() { b, err = NewBroker(Config{ URL: os.Getenv("PULSAR_URL"), Topic: "persistent://ddmc/algo/bee", @@ -46,6 +46,9 @@ func TestMain(m *testing.M) { if err = b.Worker(); err != nil { panic(err) } +} +func TestMain(m *testing.M) { + initBroker() os.Exit(m.Run()) } @@ -164,8 +167,8 @@ func TestPulSarBroker_SendDelay(t *testing.T) { t.Errorf("SendDelay() error = %v, wantErr %v", err, tt.wantErr) } got := <-example.DelayResult - if got.Sub(want).Seconds() > 1 { - t.Errorf("SendDelay() got delay = %v, want %v", got.Second(), want.Second()) + if int(got.Sub(want).Seconds()) > 1 { + t.Errorf("SendDelay() got delay = %v, want %v", got, want) } }) } @@ -193,16 +196,17 @@ func TestBroker_Close(t *testing.T) { } } -func TestBroker_ReConnect(t *testing.T) { - for i := 0; i < 100; i++ { - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - _ = b.Send(ctx, "print", strconv.Itoa(i)) - time.Sleep(1 * time.Second) - cancel() - } -} +//func TestBroker_ReConnect(t *testing.T) { +// for i := 0; i < 100; i++ { +// ctx, cancel := context.WithTimeout(ctx, 1*time.Second) +// _ = b.Send(ctx, "print", strconv.Itoa(i)) +// time.Sleep(1 * time.Second) +// cancel() +// } +//} func TestBroker_ErrorBlockSeat(t *testing.T) { + initBroker() _ = b.Send(ctx, "error", "a") _ = b.Send(ctx, "error", "b") _ = b.Send(ctx, "error", "c") diff --git a/broker/rocketmq/rocket.go b/broker/rocketmq/rocket.go index c4e7660..ae2a77c 100644 --- a/broker/rocketmq/rocket.go +++ b/broker/rocketmq/rocket.go @@ -128,12 +128,13 @@ func (b *Broker) Close() error { return b.consumer.Shutdown() } -func (b *Broker) handler(ctx context.Context, data []byte) error { - msg, err := b.config.Codec.Decode(data) +func (b *Broker) handler(ctx context.Context, message *primitive.MessageExt) error { + msg, err := b.config.Codec.Decode(message.Body) if err != nil { b.config.Logger.Errorf("process unknown data: %s", err) return err } + msg.SetMsgId(message.MsgId) handler, ok := b.Router(msg.GetName()) if !ok { b.config.Logger.Warningf("process unknown name: %s", msg.GetName()) @@ -148,7 +149,7 @@ func (b *Broker) handler(ctx context.Context, data []byte) error { func newConsumerHandler(b *Broker) func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error) { return func(ctx context.Context, mes ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, me := range mes { - if err := b.handler(b.Ctx(), me.Body); err != nil { + if err := b.handler(b.Ctx(), me); err != nil { return consumer.ConsumeRetryLater, err } } diff --git a/example/handler.go b/example/handler.go index c59afde..30931f2 100644 --- a/example/handler.go +++ b/example/handler.go @@ -12,7 +12,7 @@ var PrintResult = map[string]struct{}{} func PrintHandler(c *bee.Context) error { var result string err := c.Parse(&result) - fmt.Println("printHandler", result, err) + fmt.Println("printHandler", c.Message().GetMsgId(), result, err) PrintResult[result] = struct{}{} return nil } diff --git a/message/message.go b/message/message.go index 1eaabb6..16add2d 100644 --- a/message/message.go +++ b/message/message.go @@ -11,10 +11,13 @@ type Message interface { SetVersion(v uint8) Message SetRetryCount(rc uint8) Message IncrRetryCount() uint8 + GetMsgId() string + SetMsgId(id string) Message } //Msg Concurrency is unsafe type Msg struct { + id string version uint8 retryCount uint8 name string @@ -35,3 +38,5 @@ func (m *Msg) SetVersion(v uint8) Message { m.version = v; return m } func (m *Msg) SetRetryCount(rc uint8) Message { m.retryCount = rc; return m } func (m *Msg) SetBody(body []byte) Message { m.body = body; return m } func (m *Msg) IncrRetryCount() uint8 { m.retryCount++; return m.retryCount } +func (m *Msg) GetMsgId() string { return m.id } +func (m *Msg) SetMsgId(id string) Message { m.id = id; return m } From c6f98327621a1cf47b26788ffb016dcbb832609c Mon Sep 17 00:00:00 2001 From: fanjindong <765912710@qq.com> Date: Fri, 10 Mar 2023 11:06:08 +0800 Subject: [PATCH 2/2] fix: --- broker/amqp/amqp.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/broker/amqp/amqp.go b/broker/amqp/amqp.go index 5ac3edc..f29ad51 100644 --- a/broker/amqp/amqp.go +++ b/broker/amqp/amqp.go @@ -172,8 +172,11 @@ func (b *Broker) watch(ctx context.Context) { <-seat wg.Add(1) go func() { - _ = b.process(ctx, data) - _ = data.Ack(false) + if err := b.process(ctx, data); err != nil { + _ = data.Nack(false, true) + } else { + _ = data.Ack(false) + } wg.Done() seat <- struct{}{} }()