Skip to content

Commit

Permalink
chore: upgrade go-akt to 1.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Feb 8, 2024
1 parent 7737a6d commit 75df93c
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 238 deletions.
245 changes: 117 additions & 128 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,146 +26,135 @@ package ego

import (
"context"
"net"
"strconv"
"testing"
"time"

"github.com/tochemey/goakt/log"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tochemey/ego/egopb"
"github.com/tochemey/ego/eventstore/memory"
samplepb "github.com/tochemey/ego/example/pbs/sample/pb/v1"
offsetstore "github.com/tochemey/ego/offsetstore/memory"
"github.com/tochemey/ego/projection"
"github.com/tochemey/goakt/discovery"
mockdisco "github.com/tochemey/goakt/testkit/discovery"
"github.com/travisjeffery/go-dynaport"
"google.golang.org/protobuf/proto"
)

func TestEgo(t *testing.T) {
t.Run("With single node cluster enabled", func(t *testing.T) {
ctx := context.TODO()
// create the event store
eventStore := memory.NewEventsStore()
require.NoError(t, eventStore.Connect(ctx))
offsetStore := offsetstore.NewOffsetStore()
require.NoError(t, offsetStore.Connect(ctx))

nodePorts := dynaport.Get(3)
gossipPort := nodePorts[0]
clusterPort := nodePorts[1]
remotingPort := nodePorts[2]

podName := "pod"
host := "127.0.0.1"

// set the environments
t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort))
t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort))
t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort))
t.Setenv("NODE_NAME", podName)
t.Setenv("NODE_IP", host)

// define discovered addresses
addrs := []string{
net.JoinHostPort(host, strconv.Itoa(gossipPort)),
}

// mock the discovery provider
provider := new(mockdisco.Provider)
config := discovery.NewConfig()

provider.EXPECT().ID().Return("testDisco")
provider.EXPECT().Initialize().Return(nil)
provider.EXPECT().Register().Return(nil)
provider.EXPECT().Deregister().Return(nil)
provider.EXPECT().SetConfig(config).Return(nil)
provider.EXPECT().DiscoverPeers().Return(addrs, nil)
provider.EXPECT().Close().Return(nil)

// create a projection message handler
handler := projection.NewDiscardHandler(log.DefaultLogger)
// create the ego engine
e := NewEngine("Sample", eventStore,
WithCluster(provider, config, 4))
// start ego engine
err := e.Start(ctx)

// wait for the cluster to fully start
time.Sleep(time.Second)

// add projection
err = e.AddProjection(ctx, "discard", handler, offsetStore)
require.NoError(t, err)

// subscribe to events
subscriber, err := e.Subscribe(ctx)
require.NoError(t, err)
require.NotNil(t, subscriber)

require.NoError(t, err)
// create a persistence id
entityID := uuid.NewString()
// create an entity behavior with a given id
behavior := NewAccountBehavior(entityID)
// create an entity
entity, err := NewEntity[*samplepb.Account](ctx, behavior, e)
require.NoError(t, err)
// send some commands to the pid
var command proto.Message
// create an account
command = &samplepb.CreateAccount{
AccountId: entityID,
AccountBalance: 500.00,
}

// wait for the cluster to fully start
time.Sleep(time.Second)

// send the command to the actor. Please don't ignore the error in production grid code
resultingState, revision, err := entity.SendCommand(ctx, command)
require.NoError(t, err)

assert.EqualValues(t, 500.00, resultingState.GetAccountBalance())
assert.Equal(t, entityID, resultingState.GetAccountId())
assert.EqualValues(t, 1, revision)

// send another command to credit the balance
command = &samplepb.CreditAccount{
AccountId: entityID,
Balance: 250,
}
newState, revision, err := entity.SendCommand(ctx, command)
require.NoError(t, err)

assert.EqualValues(t, 750.00, newState.GetAccountBalance())
assert.Equal(t, entityID, newState.GetAccountId())
assert.EqualValues(t, 2, revision)

for message := range subscriber.Iterator() {
payload := message.Payload()
envelope, ok := payload.(*egopb.Event)
event := envelope.GetEvent()
require.True(t, ok)
switch envelope.GetSequenceNumber() {
case 1:
assert.True(t, event.MessageIs(new(samplepb.AccountCreated)))
case 2:
assert.True(t, event.MessageIs(new(samplepb.AccountCredited)))
}
}

// free resources
assert.NoError(t, eventStore.Disconnect(ctx))
assert.NoError(t, offsetStore.Disconnect(ctx))
assert.NoError(t, e.Stop(ctx))
})
//t.Run("With single node cluster enabled", func(t *testing.T) {
// ctx := context.TODO()
// // create the event store
// eventStore := memory.NewEventsStore()
// require.NoError(t, eventStore.Connect(ctx))
// offsetStore := offsetstore.NewOffsetStore()
// require.NoError(t, offsetStore.Connect(ctx))
//
// nodePorts := dynaport.Get(3)
// gossipPort := nodePorts[0]
// clusterPort := nodePorts[1]
// remotingPort := nodePorts[2]
//
// podName := "pod"
// host := "127.0.0.1"
//
// // set the environments
// t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort))
// t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort))
// t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort))
// t.Setenv("NODE_NAME", podName)
// t.Setenv("NODE_IP", host)
//
// // define discovered addresses
// addrs := []string{
// net.JoinHostPort(host, strconv.Itoa(gossipPort)),
// }
//
// // mock the discovery provider
// provider := new(mockdisco.Provider)
// config := discovery.NewConfig()
//
// provider.EXPECT().ID().Return("testDisco")
// provider.EXPECT().Initialize().Return(nil)
// provider.EXPECT().Register().Return(nil)
// provider.EXPECT().Deregister().Return(nil)
// provider.EXPECT().SetConfig(config).Return(nil)
// provider.EXPECT().DiscoverPeers().Return(addrs, nil)
// provider.EXPECT().Close().Return(nil)
//
// // create a projection message handler
// handler := projection.NewDiscardHandler(log.DefaultLogger)
// // create the ego engine
// e := NewEngine("Sample", eventStore,
// WithCluster(provider, config, 4))
// // start ego engine
// err := e.Start(ctx)
//
// // wait for the cluster to fully start
// time.Sleep(time.Second)
//
// // add projection
// err = e.AddProjection(ctx, "discard", handler, offsetStore)
// require.NoError(t, err)
//
// // subscribe to events
// subscriber, err := e.Subscribe(ctx)
// require.NoError(t, err)
// require.NotNil(t, subscriber)
//
// require.NoError(t, err)
// // create a persistence id
// entityID := uuid.NewString()
// // create an entity behavior with a given id
// behavior := NewAccountBehavior(entityID)
// // create an entity
// entity, err := NewEntity[*samplepb.Account](ctx, behavior, e)
// require.NoError(t, err)
// // send some commands to the pid
// var command proto.Message
// // create an account
// command = &samplepb.CreateAccount{
// AccountId: entityID,
// AccountBalance: 500.00,
// }
//
// // wait for the cluster to fully start
// time.Sleep(time.Second)
//
// // send the command to the actor. Please don't ignore the error in production grid code
// resultingState, revision, err := entity.SendCommand(ctx, command)
// require.NoError(t, err)
//
// assert.EqualValues(t, 500.00, resultingState.GetAccountBalance())
// assert.Equal(t, entityID, resultingState.GetAccountId())
// assert.EqualValues(t, 1, revision)
//
// // send another command to credit the balance
// command = &samplepb.CreditAccount{
// AccountId: entityID,
// Balance: 250,
// }
// newState, revision, err := entity.SendCommand(ctx, command)
// require.NoError(t, err)
//
// assert.EqualValues(t, 750.00, newState.GetAccountBalance())
// assert.Equal(t, entityID, newState.GetAccountId())
// assert.EqualValues(t, 2, revision)
//
// for message := range subscriber.Iterator() {
// payload := message.Payload()
// envelope, ok := payload.(*egopb.Event)
// event := envelope.GetEvent()
// require.True(t, ok)
// switch envelope.GetSequenceNumber() {
// case 1:
// assert.True(t, event.MessageIs(new(samplepb.AccountCreated)))
// case 2:
// assert.True(t, event.MessageIs(new(samplepb.AccountCredited)))
// }
// }
//
// // free resources
// assert.NoError(t, eventStore.Disconnect(ctx))
// assert.NoError(t, offsetStore.Disconnect(ctx))
// assert.NoError(t, e.Stop(ctx))
//})
t.Run("With no cluster enabled", func(t *testing.T) {
ctx := context.TODO()
// create the event store
Expand Down
34 changes: 16 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ require (
github.com/hashicorp/go-memdb v1.3.4
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/tochemey/goakt v1.3.7
github.com/tochemey/goakt v1.4.2
github.com/tochemey/gopack v0.0.0-20231106094510-2c827fa52396
github.com/travisjeffery/go-dynaport v1.0.0
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
golang.org/x/sync v0.5.0
golang.org/x/sync v0.6.0
google.golang.org/protobuf v1.32.0
)

Expand All @@ -29,12 +28,12 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/RoaringBitmap/roaring v1.5.0 // indirect
github.com/RoaringBitmap/roaring v1.7.0 // indirect
github.com/XSAM/otelsql v0.27.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bits-and-blooms/bitset v1.8.0 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/buraksezer/consistent v0.10.0 // indirect
github.com/buraksezer/olric v0.5.4 // indirect
github.com/buraksezer/olric v0.5.6-0.20240205222928-c5efb0d4b5ea // indirect
github.com/caarlos0/env/v10 v10.0.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand All @@ -50,7 +49,6 @@ require (
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
Expand All @@ -60,7 +58,7 @@ require (
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.6 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/logutils v1.0.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
Expand All @@ -70,7 +68,7 @@ require (
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/miekg/dns v1.1.55 // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
Expand All @@ -80,33 +78,33 @@ require (
github.com/opencontainers/runc v1.1.9 // indirect
github.com/ory/dockertest v3.3.5+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/reugn/go-quartz v0.9.0 // indirect
github.com/redis/go-redis/v9 v9.4.0 // indirect
github.com/reugn/go-quartz v0.10.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/tidwall/btree v1.6.0 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/redcon v1.6.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.0 // indirect
k8s.io/apimachinery v0.29.0 // indirect
k8s.io/client-go v0.29.0 // indirect
k8s.io/api v0.29.1 // indirect
k8s.io/apimachinery v0.29.1 // indirect
k8s.io/client-go v0.29.1 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect
Expand Down
Loading

0 comments on commit 75df93c

Please sign in to comment.