Skip to content

Commit

Permalink
feat: expose an API for custom mailboxes (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 28, 2024
1 parent 9343db4 commit 6d8ec14
Show file tree
Hide file tree
Showing 29 changed files with 861 additions and 349 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Also, check reference section at the end of the post for more material regarding
- [Actor System](#actor-system)
- [Behaviors](#behaviors)
- [Router](#router)
- [Mailbox](#mailbox)
- [Events Stream](#events-stream)
- [Supported events](#supported-events)
- [Messaging](#messaging)
Expand Down Expand Up @@ -209,6 +210,14 @@ Go-Akt comes shipped with the following routing strategies:
A router a just like any other actor that can be spawned. To spawn router just call the [ActorSystem](./actors/actor_system.go) `SpawnRouter` method.
Router as well as their routees are not passivated.

### Mailbox

Once can implement a custom mailbox. See [Mailbox](./actors/mailbox.go).
Go-Akt comes with the following mailboxes built-in:

- [`UnboundedMailbox`](./actors/unbounded_mailbox.go): this is the default mailbox. It is implemented using the lock-free Multi-Producer-Single-Consumer Queue.
- [`BoundedMailbox`](./actors/bounded_mailbox.go): this is a thread-safe mailbox implemented using the Ring-Buffer Queue. When the mailbox is full any new message is sent to the deadletter queue. Setting a reasonable capacity for the queue can enhance throughput.

### Events Stream

To receive some system events and act on them for some particular business cases, you just need to call the actor system `Subscribe`.
Expand Down
19 changes: 13 additions & 6 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type ActorSystem interface {
// Stop stops the actor system
Stop(ctx context.Context) error
// Spawn creates an actor in the system and starts it
Spawn(ctx context.Context, name string, actor Actor) (*PID, error)
Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
Expand Down Expand Up @@ -378,7 +378,7 @@ func (x *actorSystem) NumActors() uint64 {
}

// Spawn creates or returns the instance of a given actor in the system
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (*PID, error) {
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error) {
if !x.started.Load() {
return nil, ErrActorSystemNotStarted
}
Expand All @@ -393,7 +393,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (*PID
}
}

pid, err := x.configPID(ctx, name, actor)
pid, err := x.configPID(ctx, name, actor, opts...)
if err != nil {
return nil, err
}
Expand All @@ -410,8 +410,9 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
return nil, ErrActorSystemNotStarted
}

actor := newFuncActor(name, receiveFunc, opts...)
pid, err := x.configPID(ctx, name, actor)
config := newFuncConfig(opts...)
actor := newFuncActor(name, receiveFunc, config)
pid, err := x.configPID(ctx, name, actor, WithMailbox(config.mailbox))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1308,7 +1309,7 @@ func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer)

// configPID constructs a PID provided the actor name and the actor kind
// this is a utility function used when spawning actors
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (*PID, error) {
func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error) {
addr := x.actorAddress(name)
if err := addr.Validate(); err != nil {
return nil, err
Expand All @@ -1326,6 +1327,12 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor) (
withInitTimeout(x.actorInitTimeout),
}

spawnConfig := newSpawnConfig(opts...)
// set the mailbox option
if spawnConfig.mailbox != nil {
pidOpts = append(pidOpts, withMailbox(spawnConfig.mailbox))
}

// enable stash
if x.stashEnabled {
pidOpts = append(pidOpts, withStash())
Expand Down
39 changes: 39 additions & 0 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,4 +1418,43 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)
})
})
t.Run("With Spawn with custom mailbox", func(t *testing.T) {
ctx := context.TODO()
actorSystem, _ := NewActorSystem("testSys", WithLogger(log.DiscardLogger))

// start the actor system
err := actorSystem.Start(ctx)
assert.NoError(t, err)

// wait for complete start
lib.Pause(time.Second)

// create the black hole actor
actor := newTestActor()
pid, err := actorSystem.Spawn(ctx, "test", actor, WithMailbox(NewBoundedMailbox(10)))
assert.NoError(t, err)
assert.NotNil(t, pid)

// wait a while
lib.Pause(time.Second)
assert.EqualValues(t, 1, pid.ProcessedCount())
require.True(t, pid.IsRunning())

// every message sent to the actor will result in deadletters
counter := 0
for i := 1; i <= 5; i++ {
require.NoError(t, Tell(ctx, pid, new(testpb.TestSend)))
counter = counter + 1
}

lib.Pause(time.Second)

assert.EqualValues(t, counter, pid.ProcessedCount()-1)
require.NoError(t, err)

t.Cleanup(func() {
err = actorSystem.Stop(ctx)
assert.NoError(t, err)
})
})
}
4 changes: 2 additions & 2 deletions actors/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestTell(t *testing.T) {
require.NoError(t, err)
// wait for processing to be done
lib.Pause(500 * time.Millisecond)
require.EqualValues(t, 2, actor.counter.Load())
require.EqualValues(t, 2, actorRef.ProcessedCount()-1)

err = sys.Stop(ctx)
assert.NoError(t, err)
Expand Down Expand Up @@ -761,7 +761,7 @@ func TestRemoteTell(t *testing.T) {

// wait for processing to complete on the actor side
lib.Pause(500 * time.Millisecond)
require.EqualValues(t, 10, actor.counter.Load())
require.EqualValues(t, 10, actorRef.ProcessedCount()-1)

// stop the actor after some time
lib.Pause(time.Second)
Expand Down
109 changes: 109 additions & 0 deletions actors/bounded_mailbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package actors

import (
"sync"
)

// BoundedMailbox defines a bounded mailbox using ring buffer queue
// This mailbox is thread-safe
type BoundedMailbox struct {
buffer []*ReceiveContext
head, tail int
len, cap int
full bool
lock *sync.Mutex
}

// enforce compilation error
var _ Mailbox = (*BoundedMailbox)(nil)

// NewBoundedMailbox creates a new instance BoundedMailbox
func NewBoundedMailbox(cap int) *BoundedMailbox {
return &BoundedMailbox{
buffer: make([]*ReceiveContext, cap),
head: 0,
tail: 0,
len: 0,
cap: cap,
lock: new(sync.Mutex),
}
}

// Enqueue places the given value in the mailbox
// This will return an error when the mailbox is full
func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error {
if mailbox.isFull() {
return ErrFullMailbox
}

mailbox.lock.Lock()
mailbox.buffer[mailbox.tail] = msg
mailbox.tail = (mailbox.tail + 1) % mailbox.cap
mailbox.len++
mailbox.full = mailbox.head == mailbox.tail
mailbox.lock.Unlock()
return nil
}

// Dequeue takes the mail from the mailbox
// It returns nil when the mailbox is empty
func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext) {
if mailbox.IsEmpty() {
return nil
}

mailbox.lock.Lock()
item := mailbox.buffer[mailbox.head]
mailbox.full = false
mailbox.head = (mailbox.head + 1) % mailbox.cap
mailbox.len--
mailbox.lock.Unlock()
return item
}

// IsEmpty returns true when the mailbox is empty
func (mailbox *BoundedMailbox) IsEmpty() bool {
mailbox.lock.Lock()
empty := mailbox.head == mailbox.tail && !mailbox.full
mailbox.lock.Unlock()
return empty
}

// Len returns queue length
func (mailbox *BoundedMailbox) Len() int64 {
mailbox.lock.Lock()
length := mailbox.len
mailbox.lock.Unlock()
return int64(length)
}

func (mailbox *BoundedMailbox) isFull() bool {
mailbox.lock.Lock()
full := mailbox.full
mailbox.lock.Unlock()
return full
}
52 changes: 52 additions & 0 deletions actors/bounded_mailbox_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package actors

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBoundedMailbox(t *testing.T) {
mailbox := NewBoundedMailbox(20)
for i := 0; i < 20; i++ {
require.NoError(t, mailbox.Enqueue(&ReceiveContext{}))
}
assert.False(t, mailbox.IsEmpty())
err := mailbox.Enqueue(&ReceiveContext{})
require.Error(t, err)
assert.EqualError(t, err, ErrFullMailbox.Error())
dequeue := mailbox.Dequeue()
require.NotNil(t, dequeue)
assert.EqualValues(t, 19, mailbox.Len())
counter := 19
for counter > 0 {
mailbox.Dequeue()
counter--
}
assert.True(t, mailbox.IsEmpty())
}
2 changes: 2 additions & 0 deletions actors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var (
ErrUndefinedTask = errors.New("task is not defined")
// ErrInvalidHost is returned when a request is sent to an invalid host
ErrInvalidHost = errors.New("invalid host")
// ErrFullMailbox is returned when the mailbox is full
ErrFullMailbox = errors.New("mailbox is full")
)

// eof returns true if the given error is an EOF error
Expand Down
Loading

0 comments on commit 6d8ec14

Please sign in to comment.