Skip to content

Commit

Permalink
fix: application should exit when cluster engine fails to start (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Apr 19, 2024
1 parent 408564c commit 4a57506
Show file tree
Hide file tree
Showing 10 changed files with 751 additions and 14 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ jobs:
files: ./coverage.out
fail_ci_if_error: false
verbose: true
exclude: goaktpb/
1 change: 0 additions & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,3 @@ jobs:
files: ./coverage.out
fail_ci_if_error: false
verbose: true
exclude: goaktpb/
4 changes: 3 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ FROM tochemey/docker-go:1.21.0-1.0.0
RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
RUN go install connectrpc.com/connect/cmd/protoc-gen-connect-go@latest
RUN apk --no-cache add git ca-certificates gcc musl-dev libc-dev binutils-gold
RUN go install github.com/ory/go-acc@latest


test:
Expand Down Expand Up @@ -38,6 +39,7 @@ mock:
# generate the mocks
RUN mockery --all --dir hash --keeptree --exported=true --with-expecter=true --inpackage=true --output ./mocks/hash --case snake
RUN mockery --all --dir discovery --keeptree --exported=true --with-expecter=true --inpackage=true --output ./mocks/discovery --case snake
RUN mockery --all --dir internal/cluster --keeptree --exported=true --with-expecter=true --inpackage=true --output ./mocks/cluster --case snake

SAVE ARTIFACT ./mocks mocks AS LOCAL mocks

Expand All @@ -54,7 +56,7 @@ lint:
local-test:
FROM +vendor

RUN go test -mod=vendor `go list ./... | grep -v ./goaktpb | grep -v ./examples | grep -v ./mocks` -race -v -coverprofile=coverage.out -covermode=atomic -coverpkg=./...
RUN go-acc ./... -o coverage.out --ignore goaktpb,examples,mocks,internal/internalpb -- -mod=vendor -race -v

SAVE ARTIFACT coverage.out AS LOCAL coverage.out

Expand Down
13 changes: 9 additions & 4 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,9 @@ func (x *actorSystem) Start(ctx context.Context) error {
x.started.Store(true)

if x.clusterEnabled.Load() {
x.enableClustering(spanCtx)
if err := x.enableClustering(spanCtx); err != nil {
return err
}
}

if x.remotingEnabled.Load() {
Expand Down Expand Up @@ -1047,7 +1049,7 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot

// enableClustering enables clustering. When clustering is enabled remoting is also enabled to facilitate remote
// communication
func (x *actorSystem) enableClustering(ctx context.Context) {
func (x *actorSystem) enableClustering(ctx context.Context) error {
x.logger.Info("enabling clustering...")

cluster, err := cluster.NewNode(x.Name(),
Expand All @@ -1057,12 +1059,14 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
cluster.WithHasher(x.partitionHasher),
)
if err != nil {
x.logger.Panic(errors.Wrap(err, "failed to initialize cluster engine"))
x.logger.Error(errors.Wrap(err, "failed to initialize cluster engine"))
return err
}

x.logger.Info("starting cluster engine...")
if err := cluster.Start(ctx); err != nil {
x.logger.Panic(errors.Wrap(err, "failed to start cluster engine"))
x.logger.Error(errors.Wrap(err, "failed to start cluster engine"))
return err
}

bootstrapChan := make(chan struct{}, 1)
Expand All @@ -1085,6 +1089,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) {
go x.broadcast(ctx)

x.logger.Info("clustering enabled...:)")
return nil
}

// enableRemoting enables the remoting service to handle remote messaging
Expand Down
36 changes: 32 additions & 4 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os"
"sort"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -40,11 +41,14 @@ import (
"github.com/travisjeffery/go-dynaport"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/atomic"
"google.golang.org/protobuf/proto"

"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/goaktpb"
"github.com/tochemey/goakt/log"
clustermocks "github.com/tochemey/goakt/mocks/cluster"
testkit "github.com/tochemey/goakt/mocks/discovery"
"github.com/tochemey/goakt/telemetry"
testpb "github.com/tochemey/goakt/test/data/testpb"
Expand Down Expand Up @@ -1351,7 +1355,6 @@ func TestActorSystem(t *testing.T) {
assert.EqualError(t, err, ErrActorSystemNotStarted.Error())
assert.Nil(t, actorRef)
})

t.Run("With happy path Register", func(t *testing.T) {
ctx := context.TODO()
logger := log.New(log.DebugLevel, os.Stdout)
Expand All @@ -1376,7 +1379,6 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)
})
})

t.Run("With Register when actor system not started", func(t *testing.T) {
ctx := context.TODO()
logger := log.New(log.DebugLevel, os.Stdout)
Expand All @@ -1398,7 +1400,6 @@ func TestActorSystem(t *testing.T) {
assert.Error(t, err)
})
})

t.Run("With happy path Deregister", func(t *testing.T) {
ctx := context.TODO()
logger := log.New(log.DebugLevel, os.Stdout)
Expand Down Expand Up @@ -1426,7 +1427,6 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)
})
})

t.Run("With Deregister when actor system not started", func(t *testing.T) {
ctx := context.TODO()
logger := log.New(log.DebugLevel, os.Stdout)
Expand All @@ -1447,4 +1447,32 @@ func TestActorSystem(t *testing.T) {
assert.Error(t, err)
})
})

t.Run("With cluster start failure", func(t *testing.T) {
ctx := context.TODO()
logger := log.DiscardLogger
mockedCluster := new(clustermocks.Interface)
mockedErr := errors.New("failed to start")
mockedCluster.EXPECT().Start(ctx).Return(mockedErr)

// mock the discovery provider
provider := new(testkit.Provider)
config := discovery.NewConfig()
sd := discovery.NewServiceDiscovery(provider, config)

system := &actorSystem{
name: "testSystem",
logger: logger,
cluster: mockedCluster,
clusterEnabled: *atomic.NewBool(true),
telemetry: telemetry.New(),
mutex: sync.Mutex{},
tracer: noop.NewTracerProvider().Tracer("testSystem"),
scheduler: newScheduler(logger, time.Second, withSchedulerCluster(mockedCluster)),
serviceDiscovery: sd,
}

err := system.Start(ctx)
require.Error(t, err)
})
}
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ ignore:
- "examples"
- "internal/internalpb"
- "testkit"
- "**/mocks"
- "mocks"
4 changes: 2 additions & 2 deletions internal/cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (n *Node) Start(ctx context.Context) error {
logger.Infof("GoAkt cluster Node=(%s) successfully started. 🎉", n.name)
}

// let us create an instance of the Node engine
eng, err := olric.New(conf)
if err != nil {
logger.Error(errors.Wrapf(err, "failed to start the cluster Node=(%s).💥", n.name))
Expand All @@ -216,10 +215,11 @@ func (n *Node) Start(ctx context.Context) error {
n.server = eng
go func() {
if err = n.server.Start(); err != nil {
logger.Error(errors.Wrapf(err, "failed to start the cluster Node=(%s).💥", n.name))
if e := n.server.Shutdown(ctx); e != nil {
logger.Panic(e)
}
// the expectation is to exit the application
logger.Fatal(errors.Wrapf(err, "failed to start the cluster Node=(%s).💥", n.name))
}
}()

Expand Down
Loading

0 comments on commit 4a57506

Please sign in to comment.