Skip to content

Commit

Permalink
Merge pull request #8 from bee-org/feat/messageId
Browse files Browse the repository at this point in the history
feat: context add MsgId
  • Loading branch information
fanjindong authored Mar 10, 2023
2 parents 025f12a + c6f9832 commit 8229461
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 22 deletions.
12 changes: 8 additions & 4 deletions broker/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,11 @@ func (b *Broker) watch(ctx context.Context) {
<-seat
wg.Add(1)
go func() {
_ = b.process(ctx, data.Body)
_ = data.Ack(false)
if err := b.process(ctx, data); err != nil {
_ = data.Nack(false, true)
} else {
_ = data.Ack(false)
}
wg.Done()
seat <- struct{}{}
}()
Expand All @@ -182,12 +185,13 @@ func (b *Broker) watch(ctx context.Context) {
}()
}

func (b *Broker) process(ctx context.Context, data []byte) error {
msg, err := b.config.Codec.Decode(data)
func (b *Broker) process(ctx context.Context, message amqp.Delivery) error {
msg, err := b.config.Codec.Decode(message.Body)
if err != nil {
b.config.Logger.Errorf("process unknown data: %s", err)
return err
}
msg.SetMsgId(message.MessageId)
handler, ok := b.Router(msg.GetName())
if !ok {
b.config.Logger.Warningf("process unknown name: %s", msg.GetName())
Expand Down
9 changes: 6 additions & 3 deletions broker/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pulsar

import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
plog "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/bee-org/bee"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (b *Broker) watch() {
defer wg.Done()
defer func() { seat <- struct{}{} }()
msg := data.Message
if err := b.handler(b.Ctx(), msg.Payload()); err != nil {
if err := b.handler(b.Ctx(), msg); err != nil {
b.consumer.NackID(msg.ID())
return
}
Expand All @@ -224,12 +225,14 @@ func (b *Broker) watch() {
}()
}

func (b *Broker) handler(ctx context.Context, data []byte) error {
msg, err := b.config.Codec.Decode(data)
func (b *Broker) handler(ctx context.Context, message pulsar.Message) error {
msg, err := b.config.Codec.Decode(message.Payload())
if err != nil {
b.config.Logger.Errorf("process unknown data: %s", err)
return err
}
msgId := message.ID()
msg.SetMsgId(fmt.Sprintf("%d:%d:%d", msgId.LedgerID(), msgId.EntryID(), msgId.PartitionIdx()))
handler, ok := b.Router(msg.GetName())
if !ok {
b.config.Logger.Warningf("process unknown name: %s", msg.GetName())
Expand Down
26 changes: 15 additions & 11 deletions broker/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
err error
)

func TestMain(m *testing.M) {
func initBroker() {
b, err = NewBroker(Config{
URL: os.Getenv("PULSAR_URL"),
Topic: "persistent://ddmc/algo/bee",
Expand Down Expand Up @@ -46,6 +46,9 @@ func TestMain(m *testing.M) {
if err = b.Worker(); err != nil {
panic(err)
}
}
func TestMain(m *testing.M) {
initBroker()
os.Exit(m.Run())
}

Expand Down Expand Up @@ -164,8 +167,8 @@ func TestPulSarBroker_SendDelay(t *testing.T) {
t.Errorf("SendDelay() error = %v, wantErr %v", err, tt.wantErr)
}
got := <-example.DelayResult
if got.Sub(want).Seconds() > 1 {
t.Errorf("SendDelay() got delay = %v, want %v", got.Second(), want.Second())
if int(got.Sub(want).Seconds()) > 1 {
t.Errorf("SendDelay() got delay = %v, want %v", got, want)
}
})
}
Expand Down Expand Up @@ -193,16 +196,17 @@ func TestBroker_Close(t *testing.T) {
}
}

func TestBroker_ReConnect(t *testing.T) {
for i := 0; i < 100; i++ {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
_ = b.Send(ctx, "print", strconv.Itoa(i))
time.Sleep(1 * time.Second)
cancel()
}
}
//func TestBroker_ReConnect(t *testing.T) {
// for i := 0; i < 100; i++ {
// ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
// _ = b.Send(ctx, "print", strconv.Itoa(i))
// time.Sleep(1 * time.Second)
// cancel()
// }
//}

func TestBroker_ErrorBlockSeat(t *testing.T) {
initBroker()
_ = b.Send(ctx, "error", "a")
_ = b.Send(ctx, "error", "b")
_ = b.Send(ctx, "error", "c")
Expand Down
7 changes: 4 additions & 3 deletions broker/rocketmq/rocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ func (b *Broker) Close() error {
return b.consumer.Shutdown()
}

func (b *Broker) handler(ctx context.Context, data []byte) error {
msg, err := b.config.Codec.Decode(data)
func (b *Broker) handler(ctx context.Context, message *primitive.MessageExt) error {
msg, err := b.config.Codec.Decode(message.Body)
if err != nil {
b.config.Logger.Errorf("process unknown data: %s", err)
return err
}
msg.SetMsgId(message.MsgId)
handler, ok := b.Router(msg.GetName())
if !ok {
b.config.Logger.Warningf("process unknown name: %s", msg.GetName())
Expand All @@ -148,7 +149,7 @@ func (b *Broker) handler(ctx context.Context, data []byte) error {
func newConsumerHandler(b *Broker) func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
return func(ctx context.Context, mes ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, me := range mes {
if err := b.handler(b.Ctx(), me.Body); err != nil {
if err := b.handler(b.Ctx(), me); err != nil {
return consumer.ConsumeRetryLater, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion example/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var PrintResult = map[string]struct{}{}
func PrintHandler(c *bee.Context) error {
var result string
err := c.Parse(&result)
fmt.Println("printHandler", result, err)
fmt.Println("printHandler", c.Message().GetMsgId(), result, err)
PrintResult[result] = struct{}{}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ type Message interface {
SetVersion(v uint8) Message
SetRetryCount(rc uint8) Message
IncrRetryCount() uint8
GetMsgId() string
SetMsgId(id string) Message
}

//Msg Concurrency is unsafe
type Msg struct {
id string
version uint8
retryCount uint8
name string
Expand All @@ -35,3 +38,5 @@ func (m *Msg) SetVersion(v uint8) Message { m.version = v; return m }
func (m *Msg) SetRetryCount(rc uint8) Message { m.retryCount = rc; return m }
func (m *Msg) SetBody(body []byte) Message { m.body = body; return m }
func (m *Msg) IncrRetryCount() uint8 { m.retryCount++; return m.retryCount }
func (m *Msg) GetMsgId() string { return m.id }
func (m *Msg) SetMsgId(id string) Message { m.id = id; return m }

0 comments on commit 8229461

Please sign in to comment.