Skip to content

Commit

Permalink
refactor: skip system messages from ctx.Unhandled method (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Mar 30, 2024
1 parent a89c3b2 commit 704d465
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 7 deletions.
62 changes: 62 additions & 0 deletions actors/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,68 @@ func TestReceiveContext(t *testing.T) {
context.Shutdown()
})
})

t.Run("With Unhandled with system messages", func(t *testing.T) {
ctx := context.TODO()
// create the deadletter stream
eventsStream := eventstream.New()

// create a consumer
consumer := eventsStream.AddSubscriber()
eventsStream.Subscribe(consumer, eventsTopic)

// create a Ping actor
opts := []pidOption{
withInitMaxRetries(1),
withCustomLogger(log.DiscardLogger),
withEventsStream(eventsStream),
}

// create actor1
actor1 := &exchanger{}
actorPath1 := NewPath("Exchange1", NewAddress("sys", "host", 1))
pid1, err := newPID(ctx, actorPath1, actor1, opts...)

require.NoError(t, err)
require.NotNil(t, pid1)

// create an instance of receive context
context := &receiveContext{
ctx: ctx,
message: new(goaktpb.PostStart),
sender: NoSender,
recipient: pid1,
mu: sync.Mutex{},
isAsyncMessage: true,
}

// calling unhandled will push the current message to deadletters
context.Unhandled()

// wait for messages to be published
time.Sleep(time.Second)

var items []*goaktpb.Deadletter
for message := range consumer.Iterator() {
payload := message.Payload()
assert.Equal(t, eventsTopic, message.Topic())
deadletter, ok := payload.(*goaktpb.Deadletter)
if ok {
items = append(items, deadletter)
}
}

require.Empty(t, items)

assert.EqualValues(t, 1, len(consumer.Topics()))

t.Cleanup(func() {
// shutdown the consumer
consumer.Shutdown()
context.Shutdown()
})
})

t.Run("With successful BatchTell", func(t *testing.T) {
ctx := context.TODO()
// create a Ping actor
Expand Down
9 changes: 9 additions & 0 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,15 @@ func (p *pid) handleError(receiveCtx ReceiveContext, err error) {
if p.eventsStream == nil {
return
}

// skip system messages
switch receiveCtx.Message().(type) {
case *goaktpb.PreStart, *goaktpb.PostStart, *goaktpb.PostStop:
return
default:
// pass through
}

msg, _ := anypb.New(receiveCtx.Message())
var senderAddr *goaktpb.Address
if receiveCtx.Sender() != nil || receiveCtx.Sender() != NoSender {
Expand Down
4 changes: 3 additions & 1 deletion examples/actor-cluster/dnssd/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

goakt "github.com/tochemey/goakt/actors"
samplepb "github.com/tochemey/goakt/examples/protos/samplepb"
"github.com/tochemey/goakt/goaktpb"
"github.com/tochemey/goakt/log"
)

Expand Down Expand Up @@ -62,6 +63,7 @@ func (p *AccountEntity) PreStart(ctx context.Context) error {

func (p *AccountEntity) Receive(ctx goakt.ReceiveContext) {
switch msg := ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.CreateAccount:
p.logger.Info("creating account by setting the balance...")
// check whether the create operation has been done already
Expand Down Expand Up @@ -105,7 +107,7 @@ func (p *AccountEntity) Receive(ctx goakt.ReceiveContext) {
})

default:
p.logger.Panic(goakt.ErrUnhandled)
ctx.Unhandled()
}
}

Expand Down
4 changes: 3 additions & 1 deletion examples/actor-cluster/k8s/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

goakt "github.com/tochemey/goakt/actors"
samplepb "github.com/tochemey/goakt/examples/protos/samplepb"
"github.com/tochemey/goakt/goaktpb"
"github.com/tochemey/goakt/log"
)

Expand Down Expand Up @@ -62,6 +63,7 @@ func (p *AccountEntity) PreStart(ctx context.Context) error {

func (p *AccountEntity) Receive(ctx goakt.ReceiveContext) {
switch msg := ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.CreateAccount:
p.logger.Info("creating account by setting the balance...")
// check whether the create operation has been done already
Expand Down Expand Up @@ -105,7 +107,7 @@ func (p *AccountEntity) Receive(ctx goakt.ReceiveContext) {
})

default:
p.logger.Panic(goakt.ErrUnhandled)
ctx.Unhandled()
}
}

Expand Down
3 changes: 3 additions & 0 deletions examples/actor-observability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (

goakt "github.com/tochemey/goakt/actors"
samplepb "github.com/tochemey/goakt/examples/protos/samplepb"
"github.com/tochemey/goakt/goaktpb"
"github.com/tochemey/goakt/log"
)

Expand Down Expand Up @@ -153,6 +154,7 @@ func (p *PingActor) PreStart(ctx context.Context) error {

func (p *PingActor) Receive(ctx goakt.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.Pong:
p.count.Inc()
// reply the sender in case there is a sender
Expand Down Expand Up @@ -186,6 +188,7 @@ func (p *PongActor) PreStart(ctx context.Context) error {

func (p *PongActor) Receive(ctx goakt.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.Ping:
p.count.Inc()
// reply the sender in case there is a sender
Expand Down
2 changes: 1 addition & 1 deletion examples/actor-to-actor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *PongActor) Receive(ctx goakt.ReceiveContext) {
_ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(samplepb.Pong))
p.count.Add(1)
default:
p.logger.Panic(goakt.ErrUnhandled)
ctx.Unhandled()
}
}

Expand Down
5 changes: 3 additions & 2 deletions examples/remoting/ping/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (p *PingActor) PreStart(ctx context.Context) error {

func (p *PingActor) Receive(ctx goakt.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.Pong:
// reply the sender in case there is a sender
if ctx.RemoteSender() != goakt.RemoteNoSender {
Expand All @@ -116,12 +117,12 @@ func (p *PingActor) Receive(ctx goakt.ReceiveContext) {
}
p.count.Add(1)
default:
p.logger.Panic(goakt.ErrUnhandled)
ctx.Unhandled()
}
}

func (p *PingActor) PostStop(ctx context.Context) error {
p.logger.Info("About to stop")
p.logger.Infof("Processed=%d address", p.count.Load())
p.logger.Infof("Processed=%d messages", p.count.Load())
return nil
}
6 changes: 4 additions & 2 deletions examples/remoting/pong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

goakt "github.com/tochemey/goakt/actors"
samplepb "github.com/tochemey/goakt/examples/protos/samplepb"
"github.com/tochemey/goakt/goaktpb"
"github.com/tochemey/goakt/log"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ func (p *PongActor) PreStart(ctx context.Context) error {

func (p *PongActor) Receive(ctx goakt.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *samplepb.Ping:
// reply the sender in case there is a sender
if ctx.RemoteSender() != goakt.RemoteNoSender {
Expand All @@ -103,12 +105,12 @@ func (p *PongActor) Receive(ctx goakt.ReceiveContext) {
}
p.count.Add(1)
default:
p.logger.Panic(goakt.ErrUnhandled)
ctx.Unhandled()
}
}

func (p *PongActor) PostStop(ctx context.Context) error {
p.logger.Info("About to stop")
p.logger.Infof("Processed=%d public", p.count.Load())
p.logger.Infof("Processed=%d messages", p.count.Load())
return nil
}

0 comments on commit 704d465

Please sign in to comment.