From 9760b0316f24d9bbb2bb9b84b1e6cac1f41bdd46 Mon Sep 17 00:00:00 2001 From: Arsene Date: Mon, 11 Nov 2024 20:25:38 +0000 Subject: [PATCH] refactor: remove system ask timeout and pass timeout to asks (#515) --- actors/actor_system.go | 2 - actors/actor_system_test.go | 17 +---- actors/api_test.go | 3 - actors/errors.go | 2 + actors/helper_test.go | 1 - actors/option.go | 10 --- actors/option_test.go | 5 -- actors/pid.go | 36 +++++----- actors/pid_option.go | 8 --- actors/pid_option_test.go | 5 -- actors/pid_test.go | 117 ++++++++++++--------------------- actors/receive_context.go | 21 +++--- actors/receive_context_test.go | 36 ++++------ actors/remoting.go | 3 +- actors/remoting_test.go | 8 +-- actors/router_test.go | 15 ++--- actors/scheduler_test.go | 1 - actors/types.go | 2 - bench/benchmark_test.go | 20 ++---- client/client_test.go | 15 ++--- testkit/testkit.go | 3 +- 21 files changed, 115 insertions(+), 215 deletions(-) diff --git a/actors/actor_system.go b/actors/actor_system.go index 29dad368..de69822c 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -245,7 +245,6 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { name: name, logger: log.New(log.ErrorLevel, os.Stderr), expireActorAfter: DefaultPassivationTimeout, - askTimeout: DefaultAskTimeout, actorInitMaxRetries: DefaultInitMaxRetries, supervisorDirective: DefaultSupervisoryStrategy, locker: sync.Mutex{}, @@ -1358,7 +1357,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o // pid inherit the actor system settings defined during instantiation pidOpts := []pidOption{ withInitMaxRetries(x.actorInitMaxRetries), - withAskTimeout(x.askTimeout), withCustomLogger(x.logger), withActorSystem(x), withSupervisorDirective(x.supervisorDirective), diff --git a/actors/actor_system_test.go b/actors/actor_system_test.go index b7f0d1a6..50857339 100644 --- a/actors/actor_system_test.go +++ b/actors/actor_system_test.go @@ -196,7 +196,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)), ) @@ -239,7 +238,7 @@ func TestActorSystem(t *testing.T) { require.True(t, proto.Equal(remoteAddr, addr)) remoting := NewRemoting() - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout) + reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) require.NoError(t, err) require.NotNil(t, reply) @@ -285,7 +284,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -384,7 +382,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -561,7 +558,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -641,7 +637,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -667,7 +662,7 @@ func TestActorSystem(t *testing.T) { Id: "", }, ) - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout) + reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) require.Error(t, err) require.Nil(t, reply) @@ -695,7 +690,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -719,7 +713,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -741,7 +734,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), ) require.NoError(t, err) @@ -1074,7 +1066,6 @@ func TestActorSystem(t *testing.T) { "test", WithExpireActorAfter(passivateAfter), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)), ) @@ -1249,7 +1240,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), WithClustering(provider, 9, 1, gossipPort, clusterPort, new(testActor)), ) @@ -1545,7 +1535,6 @@ func TestActorSystem(t *testing.T) { "test", WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), WithClustering(provider, 9, 1, gossipPort, clusterPort, new(exchanger)), ) @@ -1583,7 +1572,7 @@ func TestActorSystem(t *testing.T) { require.NotNil(t, addr) // send the message to exchanger actor one using remote messaging - reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), DefaultAskTimeout) + reply, err := remoting.RemoteAsk(ctx, addr, new(testpb.TestReply), 20*time.Second) require.NoError(t, err) require.NotNil(t, reply) diff --git a/actors/api_test.go b/actors/api_test.go index bc069c4b..507b5161 100644 --- a/actors/api_test.go +++ b/actors/api_test.go @@ -136,7 +136,6 @@ func TestAsk(t *testing.T) { sys, err := NewActorSystem( "test", WithLogger(logger), - WithReplyTimeout(replyTimeout), WithPassivationDisabled(), ) // assert there are no error @@ -265,7 +264,6 @@ func TestAsk(t *testing.T) { sys, err := NewActorSystem( "test", WithLogger(logger), - WithReplyTimeout(replyTimeout), WithPassivationDisabled(), ) // assert there are no error @@ -309,7 +307,6 @@ func TestAsk(t *testing.T) { sys, err := NewActorSystem( "test", WithLogger(logger), - WithReplyTimeout(replyTimeout), WithPassivationDisabled(), ) // assert there are no error diff --git a/actors/errors.go b/actors/errors.go index 7ccdbd24..8a086b3c 100644 --- a/actors/errors.go +++ b/actors/errors.go @@ -87,6 +87,8 @@ var ( ErrSchedulerNotStarted = errors.New("scheduler has not started") // ErrInvalidMessage is returned when an invalid remote message is sent ErrInvalidMessage = func(err error) error { return fmt.Errorf("invalid remote message: %w", err) } + // ErrInvalidTimeout is returned when a given timeout is negative or zero + ErrInvalidTimeout = errors.New("invalid timeout") ) // eof returns true if the given error is an EOF error diff --git a/actors/helper_test.go b/actors/helper_test.go index 4936ba5e..42055e37 100644 --- a/actors/helper_test.go +++ b/actors/helper_test.go @@ -450,7 +450,6 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, actorSystemName, WithPassivationDisabled(), WithLogger(logger), - WithReplyTimeout(time.Minute), WithRemoting(host, int32(remotingPort)), WithPeerStateLoopInterval(500*time.Millisecond), WithCluster( diff --git a/actors/option.go b/actors/option.go index 59a5fa25..cf488075 100644 --- a/actors/option.go +++ b/actors/option.go @@ -67,16 +67,6 @@ func WithLogger(logger log.Logger) Option { ) } -// WithReplyTimeout sets how long in seconds an actor should reply a command -// in a receive-reply pattern -func WithReplyTimeout(timeout time.Duration) Option { - return OptionFunc( - func(a *actorSystem) { - a.askTimeout = timeout - }, - ) -} - // WithActorInitMaxRetries sets the number of times to retry an actor init process func WithActorInitMaxRetries(max int) Option { return OptionFunc( diff --git a/actors/option_test.go b/actors/option_test.go index cdf7dad6..47d92316 100644 --- a/actors/option_test.go +++ b/actors/option_test.go @@ -52,11 +52,6 @@ func TestOption(t *testing.T) { option: WithExpireActorAfter(2 * time.Second), expected: actorSystem{expireActorAfter: 2. * time.Second}, }, - { - name: "WithReplyTimeout", - option: WithReplyTimeout(2 * time.Second), - expected: actorSystem{askTimeout: 2. * time.Second}, - }, { name: "WithActorInitMaxRetries", option: WithActorInitMaxRetries(2), diff --git a/actors/pid.go b/actors/pid.go index 2ed8f4ee..2f067c78 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -38,6 +38,7 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/tochemey/goakt/v2/address" @@ -97,10 +98,6 @@ type PID struct { // any further resources like memory and cpu. The default value is 120 seconds passivateAfter atomic.Duration - // specifies how long the sender of a mail should wait to receiveLoop a reply - // when using Ask. The default value is 5s - askTimeout atomic.Duration - // specifies the maximum of retries to attempt when the actor // initialization fails. The default value is 5 initMaxRetries atomic.Int32 @@ -214,7 +211,6 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... pid.latestReceiveDuration.Store(0) pid.running.Store(false) pid.passivateAfter.Store(DefaultPassivationTimeout) - pid.askTimeout.Store(DefaultAskTimeout) pid.initTimeout.Store(DefaultInitTimeout) for _, opt := range opts { @@ -461,7 +457,6 @@ func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts . pidOptions := []pidOption{ withInitMaxRetries(int(pid.initMaxRetries.Load())), withPassivationAfter(pid.passivateAfter.Load()), - withAskTimeout(pid.askTimeout.Load()), withCustomLogger(pid.logger), withActorSystem(pid.system), withSupervisorDirective(pid.supervisorDirective), @@ -546,16 +541,18 @@ func (pid *PID) PipeTo(ctx context.Context, to *PID, task future.Task) error { // Ask sends a synchronous message to another actor and expect a response. // This block until a response is received or timed out. -func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (response proto.Message, err error) { +func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) { if !to.IsRunning() { return nil, ErrDead } + if timeout <= 0 { + return nil, ErrInvalidTimeout + } + receiveContext := contextFromPool() receiveContext.build(ctx, pid, to, message, false) - to.doReceive(receiveContext) - timeout := pid.askTimeout.Load() select { case result := <-receiveContext.response: @@ -602,7 +599,7 @@ func (pid *PID) SendAsync(ctx context.Context, actorName string, message proto.M // SendSync sends a synchronous message to another actor and expect a response. // The location of the given actor is transparent to the caller. // This block until a response is received or timed out. -func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message) (response proto.Message, err error) { +func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) { if !pid.IsRunning() { return nil, ErrDead } @@ -613,10 +610,10 @@ func (pid *PID) SendSync(ctx context.Context, actorName string, message proto.Me } if cid != nil { - return pid.Ask(ctx, cid, message) + return pid.Ask(ctx, cid, message, timeout) } - reply, err := pid.RemoteAsk(ctx, addr, message) + reply, err := pid.RemoteAsk(ctx, addr, message, timeout) if err != nil { return nil, err } @@ -639,12 +636,12 @@ func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...proto.Messag // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. // The messages will be processed one after the other in the order they are sent. // This is a design choice to follow the simple principle of one message at a time processing by actors. -func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages ...proto.Message) (responses chan proto.Message, err error) { +func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message, err error) { responses = make(chan proto.Message, len(messages)) defer close(responses) for i := 0; i < len(messages); i++ { - response, err := pid.Ask(ctx, to, messages[i]) + response, err := pid.Ask(ctx, to, messages[i], timeout) if err != nil { return nil, err } @@ -733,11 +730,15 @@ func (pid *PID) RemoteTell(ctx context.Context, to *address.Address, message pro } // RemoteAsk sends a synchronous message to another actor remotely and expect a response. -func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message) (response *anypb.Any, err error) { +func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any, err error) { if pid.remoting == nil { return nil, ErrRemotingDisabled } + if timeout <= 0 { + return nil, ErrInvalidTimeout + } + marshaled, err := anypb.New(message) if err != nil { return nil, err @@ -759,6 +760,7 @@ func (pid *PID) RemoteAsk(ctx context.Context, to *address.Address, message prot Receiver: to.Address, Message: marshaled, }, + Timeout: durationpb.New(timeout), } stream := remoteService.RemoteAsk(ctx) @@ -860,7 +862,7 @@ func (pid *PID) RemoteBatchTell(ctx context.Context, to *address.Address, messag // RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. // Messages are processed one after the other in the order they are sent. // This can hinder performance if it is not properly used. -func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message) (responses []*anypb.Any, err error) { +func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) { if pid.remoting == nil { return nil, ErrRemotingDisabled } @@ -886,6 +888,7 @@ func (pid *PID) RemoteBatchAsk(ctx context.Context, to *address.Address, message Receiver: to.Address, Message: packed, }, + Timeout: durationpb.New(timeout), }, ) } @@ -1210,7 +1213,6 @@ func (pid *PID) init(ctx context.Context) error { func (pid *PID) reset() { pid.latestReceiveTime.Store(time.Time{}) pid.passivateAfter.Store(DefaultPassivationTimeout) - pid.askTimeout.Store(DefaultAskTimeout) pid.shutdownTimeout.Store(DefaultShutdownTimeout) pid.initMaxRetries.Store(DefaultInitMaxRetries) pid.latestReceiveDuration.Store(0) diff --git a/actors/pid_option.go b/actors/pid_option.go index d99b5306..15b98eca 100644 --- a/actors/pid_option.go +++ b/actors/pid_option.go @@ -41,14 +41,6 @@ func withPassivationAfter(duration time.Duration) pidOption { } } -// withAskTimeout sets how long in seconds an actor should reply a command -// in a receive-reply pattern -func withAskTimeout(timeout time.Duration) pidOption { - return func(pid *PID) { - pid.askTimeout.Store(timeout) - } -} - // withInitMaxRetries sets the number of times to retry an actor init process func withInitMaxRetries(max int) pidOption { return func(pid *PID) { diff --git a/actors/pid_option_test.go b/actors/pid_option_test.go index 918b9ab3..02f6d2b4 100644 --- a/actors/pid_option_test.go +++ b/actors/pid_option_test.go @@ -62,11 +62,6 @@ func TestPIDOptions(t *testing.T) { option: withPassivationAfter(time.Second), expected: &PID{passivateAfter: atomicDuration}, }, - { - name: "WithAskTimeout", - option: withAskTimeout(time.Second), - expected: &PID{askTimeout: atomicDuration}, - }, { name: "WithInitMaxRetries", option: withInitMaxRetries(5), diff --git a/actors/pid_test.go b/actors/pid_test.go index 3adf9125..e676817e 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -65,8 +65,7 @@ func TestReceive(t *testing.T) { actorPath, newTestActor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -99,7 +98,6 @@ func TestPassivation(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), withPassivationAfter(passivateAfter), - withAskTimeout(replyTimeout), } ports := dynaport.Get(1) @@ -130,7 +128,6 @@ func TestPassivation(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), withPassivationAfter(passivateAfter), - withAskTimeout(replyTimeout), } ports := dynaport.Get(1) @@ -252,8 +249,7 @@ func TestRestart(t *testing.T) { pid, err := newPID(ctx, actorPath, actor, withInitMaxRetries(1), withPassivationAfter(10*time.Second), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -298,8 +294,7 @@ func TestRestart(t *testing.T) { // create the actor ref pid, err := newPID(ctx, actorPath, actor, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -341,8 +336,7 @@ func TestRestart(t *testing.T) { pid, err := newPID(ctx, actorPath, actor, withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withPassivationAfter(time.Minute), - withAskTimeout(replyTimeout)) + withPassivationAfter(time.Minute)) require.NoError(t, err) assert.NotNil(t, pid) @@ -382,8 +376,7 @@ func TestRestart(t *testing.T) { pid, err := newPID(ctx, actorPath, actor, withInitMaxRetries(1), withPassivationAfter(passivateAfter), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -414,8 +407,7 @@ func TestSupervisorStrategy(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withSupervisorDirective(NewStopDirective()), - withAskTimeout(replyTimeout)) + withSupervisorDirective(NewStopDirective())) require.NoError(t, err) assert.NotNil(t, parent) @@ -452,8 +444,7 @@ func TestSupervisorStrategy(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withPassivationDisabled(), - withAskTimeout(replyTimeout)) + withPassivationDisabled()) require.NoError(t, err) assert.NotNil(t, parent) @@ -496,7 +487,7 @@ func TestSupervisorStrategy(t *testing.T) { withCustomLogger(log.DiscardLogger), withPassivationDisabled(), withSupervisorDirective(new(unhandledSupervisorDirective)), // only for test to handle default case - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) assert.NotNil(t, parent) @@ -539,7 +530,7 @@ func TestSupervisorStrategy(t *testing.T) { withCustomLogger(log.DiscardLogger), withSupervisorDirective(DefaultSupervisoryStrategy), withPassivationDisabled(), - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) assert.NotNil(t, parent) @@ -583,7 +574,7 @@ func TestSupervisorStrategy(t *testing.T) { withCustomLogger(logger), withPassivationDisabled(), withSupervisorDirective(NewRestartDirective()), - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) require.NotNil(t, parent) @@ -627,7 +618,7 @@ func TestSupervisorStrategy(t *testing.T) { withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), withPassivationDisabled(), - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) assert.NotNil(t, parent) @@ -668,7 +659,7 @@ func TestSupervisorStrategy(t *testing.T) { withCustomLogger(logger), withPassivationDisabled(), withSupervisorDirective(NewResumeDirective()), - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) assert.NotNil(t, parent) @@ -720,7 +711,7 @@ func TestSupervisorStrategy(t *testing.T) { withCustomLogger(logger), withPassivationDisabled(), withSupervisorDirective(restart), - withAskTimeout(replyTimeout)) + ) require.NoError(t, err) require.NotNil(t, parent) @@ -775,7 +766,7 @@ func TestMessaging(t *testing.T) { require.NoError(t, err) // send an ask - reply, err := pid1.Ask(ctx, pid2, new(testpb.TestReply)) + reply, err := pid1.Ask(ctx, pid2, new(testpb.TestReply), 20*time.Second) require.NoError(t, err) require.NotNil(t, reply) expected := new(testpb.Reply) @@ -822,7 +813,7 @@ func TestMessaging(t *testing.T) { assert.NoError(t, pid2.Shutdown(ctx)) // send an ask - reply, err := pid1.Ask(ctx, pid2, new(testpb.TestReply)) + reply, err := pid1.Ask(ctx, pid2, new(testpb.TestReply), 20*time.Second) require.Error(t, err) require.EqualError(t, err, ErrDead.Error()) require.Nil(t, reply) @@ -876,7 +867,6 @@ func TestMessaging(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), } ports := dynaport.Get(1) @@ -898,7 +888,7 @@ func TestMessaging(t *testing.T) { require.NoError(t, err) // send an ask - reply, err := pid1.Ask(ctx, pid2, new(testpb.TestTimeout)) + reply, err := pid1.Ask(ctx, pid2, new(testpb.TestTimeout), replyTimeout) require.Error(t, err) require.EqualError(t, err, ErrRequestTimeout.Error()) require.Nil(t, reply) @@ -959,7 +949,7 @@ func TestRemoting(t *testing.T) { require.NoError(t, err) // send the message to exchanger actor one using remote messaging - reply, err := actorRef2.RemoteAsk(ctx, address.From(addr1), new(testpb.TestReply)) + reply, err := actorRef2.RemoteAsk(ctx, address.From(addr1), new(testpb.TestReply), replyTimeout) // perform some assertions require.NoError(t, err) require.NotNil(t, reply) @@ -1025,7 +1015,7 @@ func TestRemoting(t *testing.T) { actorRef2.remoting = nil // send the message to exchanger actor one using remote messaging - reply, err := actorRef2.RemoteAsk(ctx, address.From(addr1), new(testpb.TestReply)) + reply, err := actorRef2.RemoteAsk(ctx, address.From(addr1), new(testpb.TestReply), replyTimeout) // perform some assertions require.Error(t, err) require.Nil(t, reply) @@ -1057,8 +1047,7 @@ func TestActorHandle(t *testing.T) { actorPath, &exchanger{}, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -1083,8 +1072,8 @@ func TestPIDActorSystem(t *testing.T) { actorPath, &exchanger{}, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) + require.NoError(t, err) assert.NotNil(t, pid) sys := pid.ActorSystem() @@ -1105,8 +1094,7 @@ func TestSpawnChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1145,8 +1133,7 @@ func TestSpawnChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1175,8 +1162,7 @@ func TestSpawnChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1212,8 +1198,7 @@ func TestSpawnChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1240,8 +1225,7 @@ func TestSpawnChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1274,8 +1258,7 @@ func TestSpawnChild(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withEventsStream(eventsStream), - withAskTimeout(replyTimeout)) + withEventsStream(eventsStream)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1323,8 +1306,7 @@ func TestPoisonPill(t *testing.T) { actorPath, newTestActor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -1503,8 +1485,7 @@ func TestFailedPostStop(t *testing.T) { actorPath, &testPostStop{}, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) @@ -1523,8 +1504,7 @@ func TestShutdown(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, parent) @@ -1634,7 +1614,7 @@ func TestBatchAsk(t *testing.T) { require.NotNil(t, pid) // batch ask - responses, err := pid.BatchAsk(ctx, pid, new(testpb.TestReply), new(testpb.TestReply)) + responses, err := pid.BatchAsk(ctx, pid, []proto.Message{new(testpb.TestReply), new(testpb.TestReply)}, replyTimeout) require.NoError(t, err) for reply := range responses { require.NoError(t, err) @@ -1667,7 +1647,7 @@ func TestBatchAsk(t *testing.T) { assert.NoError(t, pid.Shutdown(ctx)) // batch ask - responses, err := pid.BatchAsk(ctx, pid, new(testpb.TestReply), new(testpb.TestReply)) + responses, err := pid.BatchAsk(ctx, pid, []proto.Message{new(testpb.TestReply), new(testpb.TestReply)}, replyTimeout) require.Error(t, err) require.Nil(t, responses) }) @@ -1677,7 +1657,6 @@ func TestBatchAsk(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), } // create the actor path @@ -1689,7 +1668,7 @@ func TestBatchAsk(t *testing.T) { require.NotNil(t, pid) // batch ask - responses, err := pid.BatchAsk(ctx, pid, new(testpb.TestTimeout), new(testpb.TestReply)) + responses, err := pid.BatchAsk(ctx, pid, []proto.Message{new(testpb.TestTimeout), new(testpb.TestReply)}, replyTimeout) require.Error(t, err) require.Empty(t, responses) @@ -1987,8 +1966,8 @@ func TestID(t *testing.T) { actorPath, &exchanger{}, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) + require.NoError(t, err) assert.NotNil(t, pid) sys := pid.ActorSystem() @@ -2079,7 +2058,7 @@ func TestRemoteSpawn(t *testing.T) { require.NotNil(t, addr) // send the message to exchanger actor one using remote messaging - reply, err := pid.RemoteAsk(ctx, address.From(addr), new(testpb.TestReply)) + reply, err := pid.RemoteAsk(ctx, address.From(addr), new(testpb.TestReply), replyTimeout) require.NoError(t, err) require.NotNil(t, reply) @@ -2250,8 +2229,7 @@ func TestName(t *testing.T) { actorPath, &exchanger{}, withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) assert.NotNil(t, pid) sys := pid.ActorSystem() @@ -2273,7 +2251,6 @@ func TestPipeTo(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2328,12 +2305,10 @@ func TestPipeTo(t *testing.T) { assert.NoError(t, pid2.Shutdown(ctx)) }) t.Run("With is a dead actor: case 1", func(t *testing.T) { - askTimeout := time.Minute ctx := context.TODO() opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2371,7 +2346,6 @@ func TestPipeTo(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2428,12 +2402,10 @@ func TestPipeTo(t *testing.T) { assert.NoError(t, pid1.Shutdown(ctx)) }) t.Run("With undefined task", func(t *testing.T) { - askTimeout := time.Minute ctx := context.TODO() opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2473,7 +2445,6 @@ func TestPipeTo(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2693,7 +2664,7 @@ func TestSendSync(t *testing.T) { lib.Pause(time.Second) - response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply)) + response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply), replyTimeout) require.NoError(t, err) require.NotNil(t, response) expected := &testpb.Reply{Content: "received message"} @@ -2727,7 +2698,7 @@ func TestSendSync(t *testing.T) { err = actorSystem.Kill(ctx, sender.Name()) require.NoError(t, err) - response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply)) + response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply), replyTimeout) require.Error(t, err) require.Nil(t, response) assert.EqualError(t, err, ErrDead.Error()) @@ -2764,7 +2735,7 @@ func TestSendSync(t *testing.T) { lib.Pause(time.Second) - response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply)) + response, err := sender.SendSync(ctx, receiver.Name(), new(testpb.TestReply), replyTimeout) require.NoError(t, err) require.NotNil(t, response) expected := &testpb.Reply{Content: "received message"} @@ -2800,7 +2771,7 @@ func TestSendSync(t *testing.T) { lib.Pause(time.Second) - response, err := sender.SendSync(ctx, "receiver", new(testpb.TestReply)) + response, err := sender.SendSync(ctx, "receiver", new(testpb.TestReply), time.Minute) require.Nil(t, response) require.Error(t, err) assert.EqualError(t, err, ErrActorNotFound("receiver").Error()) @@ -2826,8 +2797,7 @@ func TestStopChild(t *testing.T) { parent, err := newPID(ctx, actorPath, newTestSupervisor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.NoError(t, err) require.NotNil(t, parent) @@ -2853,8 +2823,7 @@ func TestNewPID(t *testing.T) { nil, newTestActor(), withInitMaxRetries(1), - withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout)) + withCustomLogger(log.DiscardLogger)) require.Error(t, err) assert.Nil(t, pid) diff --git a/actors/receive_context.go b/actors/receive_context.go index b4bf6865..82ce1eb5 100644 --- a/actors/receive_context.go +++ b/actors/receive_context.go @@ -27,6 +27,7 @@ package actors import ( "context" "sync" + "time" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -216,10 +217,10 @@ func (c *ReceiveContext) BatchTell(to *PID, messages ...proto.Message) { // Ask sends a synchronous message to another actor and expect a response. This method is good when interacting with a child actor. // Ask has a timeout which can cause the sender to set the context error. When ask times out, the receiving actor does not know and may still process the message. // It is recommended to set a good timeout to quickly receive response and try to avoid false positives -func (c *ReceiveContext) Ask(to *PID, message proto.Message) (response proto.Message) { +func (c *ReceiveContext) Ask(to *PID, message proto.Message, timeout time.Duration) (response proto.Message) { self := c.self ctx := context.WithoutCancel(c.ctx) - reply, err := self.Ask(ctx, to, message) + reply, err := self.Ask(ctx, to, message, timeout) if err != nil { c.Err(err) } @@ -239,10 +240,10 @@ func (c *ReceiveContext) SendAsync(actorName string, message proto.Message) { // SendSync sends a synchronous message to another actor and expect a response. // The location of the given actor is transparent to the caller. // This block until a response is received or timed out. -func (c *ReceiveContext) SendSync(actorName string, message proto.Message) (response proto.Message) { +func (c *ReceiveContext) SendSync(actorName string, message proto.Message, timeout time.Duration) (response proto.Message) { self := c.self ctx := context.WithoutCancel(c.ctx) - reply, err := self.SendSync(ctx, actorName, message) + reply, err := self.SendSync(ctx, actorName, message, timeout) if err != nil { c.Err(err) } @@ -252,10 +253,10 @@ func (c *ReceiveContext) SendSync(actorName string, message proto.Message) (resp // BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. // The messages will be processed one after the other in the order they are sent // This is a design choice to follow the simple principle of one message at a time processing by actors. -func (c *ReceiveContext) BatchAsk(to *PID, messages ...proto.Message) (responses chan proto.Message) { +func (c *ReceiveContext) BatchAsk(to *PID, messages []proto.Message, timeout time.Duration) (responses chan proto.Message) { recipient := c.self ctx := context.WithoutCancel(c.ctx) - reply, err := recipient.BatchAsk(ctx, to, messages...) + reply, err := recipient.BatchAsk(ctx, to, messages, timeout) if err != nil { c.Err(err) } @@ -273,10 +274,10 @@ func (c *ReceiveContext) RemoteTell(to *address.Address, message proto.Message) // RemoteAsk is used to send a message to an actor remotely and expect a response // immediately. -func (c *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message) (response *anypb.Any) { +func (c *ReceiveContext) RemoteAsk(to *address.Address, message proto.Message, timeout time.Duration) (response *anypb.Any) { recipient := c.self ctx := context.WithoutCancel(c.ctx) - reply, err := recipient.RemoteAsk(ctx, to, message) + reply, err := recipient.RemoteAsk(ctx, to, message, timeout) if err != nil { c.Err(err) } @@ -296,10 +297,10 @@ func (c *ReceiveContext) RemoteBatchTell(to *address.Address, messages []proto.M // RemoteBatchAsk sends a synchronous bunch of messages to a remote actor and expect responses in the same order as the messages. // Messages are processed one after the other in the order they are sent. // This can hinder performance if it is not properly used. -func (c *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message) (responses []*anypb.Any) { +func (c *ReceiveContext) RemoteBatchAsk(to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any) { recipient := c.self ctx := context.WithoutCancel(c.ctx) - replies, err := recipient.RemoteBatchAsk(ctx, to, messages) + replies, err := recipient.RemoteBatchAsk(ctx, to, messages, timeout) if err != nil { c.Err(err) } diff --git a/actors/receive_context_test.go b/actors/receive_context_test.go index 2703346a..e3982463 100644 --- a/actors/receive_context_test.go +++ b/actors/receive_context_test.go @@ -214,7 +214,7 @@ func TestReceiveContext(t *testing.T) { require.NoError(t, err) require.NotNil(t, pid2) - reply := context.Ask(pid2, new(testpb.TestReply)) + reply := context.Ask(pid2, new(testpb.TestReply), time.Minute) require.NotNil(t, reply) expected := new(testpb.Reply) assert.True(t, proto.Equal(expected, reply)) @@ -257,7 +257,7 @@ func TestReceiveContext(t *testing.T) { lib.Pause(time.Second) assert.NoError(t, pid2.Shutdown(ctx)) - context.Ask(pid2, new(testpb.TestReply)) + context.Ask(pid2, new(testpb.TestReply), time.Minute) require.Error(t, context.getError()) lib.Pause(time.Second) @@ -318,7 +318,7 @@ func TestReceiveContext(t *testing.T) { // get the address of the exchanger actor one addr1 := context.RemoteLookup(host, remotingPort, actorName2) // send the message to t exchanger actor one using remote messaging - reply := context.RemoteAsk(address.From(addr1), new(testpb.TestReply)) + reply := context.RemoteAsk(address.From(addr1), new(testpb.TestReply), time.Minute) // perform some assertions require.NotNil(t, reply) require.True(t, reply.MessageIs(new(testpb.Reply))) @@ -389,7 +389,7 @@ func TestReceiveContext(t *testing.T) { Name: actorName2, Id: "", }, - ), new(testpb.TestReply)) + ), new(testpb.TestReply), time.Minute) require.Error(t, context.getError()) lib.Pause(time.Second) @@ -696,7 +696,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -737,7 +736,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -769,7 +767,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -811,7 +808,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -853,7 +849,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -895,7 +890,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -941,7 +935,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -977,7 +970,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -1017,7 +1009,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -1038,7 +1029,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -1064,7 +1054,6 @@ func TestReceiveContext(t *testing.T) { newTestSupervisor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) @@ -1532,7 +1521,7 @@ func TestReceiveContext(t *testing.T) { require.NoError(t, err) require.NotNil(t, pid2) - replies := context.BatchAsk(pid2, new(testpb.TestReply), new(testpb.TestReply)) + replies := context.BatchAsk(pid2, []proto.Message{new(testpb.TestReply), new(testpb.TestReply)}, time.Minute) require.NotNil(t, replies) require.Len(t, replies, 2) for reply := range replies { @@ -1580,7 +1569,7 @@ func TestReceiveContext(t *testing.T) { lib.Pause(time.Second) assert.NoError(t, pid2.Shutdown(ctx)) - context.BatchAsk(pid2, new(testpb.TestReply), new(testpb.TestReply)) + context.BatchAsk(pid2, []proto.Message{new(testpb.TestReply), new(testpb.TestReply)}, time.Minute) require.Error(t, context.getError()) lib.Pause(time.Second) @@ -1685,7 +1674,7 @@ func TestReceiveContext(t *testing.T) { testerAddr := context.RemoteLookup(host, remotingPort, tester) // send the message to t exchanger actor one using remote messaging messages := []proto.Message{new(testpb.TestReply), new(testpb.TestReply), new(testpb.TestReply)} - replies := context.RemoteBatchAsk(address.From(testerAddr), messages) + replies := context.RemoteBatchAsk(address.From(testerAddr), messages, time.Minute) require.NoError(t, context.getError()) require.Len(t, replies, 3) lib.Pause(time.Second) @@ -1740,7 +1729,7 @@ func TestReceiveContext(t *testing.T) { testerAddr := context.RemoteLookup(host, remotingPort, tester) // send the message to t exchanger actor one using remote messaging messages := []proto.Message{new(testpb.TestReply), new(testpb.TestReply), new(testpb.TestReply)} - replies := context.RemoteBatchAsk(address.From(testerAddr), messages) + replies := context.RemoteBatchAsk(address.From(testerAddr), messages, time.Minute) err = context.getError() require.Error(t, err) require.Empty(t, replies) @@ -1938,7 +1927,7 @@ func TestReceiveContext(t *testing.T) { Name: actorName2, Id: "", }, - ), []proto.Message{new(testpb.TestReply)}) + ), []proto.Message{new(testpb.TestReply)}, time.Minute) require.Error(t, context.getError()) lib.Pause(time.Second) @@ -2076,7 +2065,6 @@ func TestReceiveContext(t *testing.T) { opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2137,12 +2125,10 @@ func TestReceiveContext(t *testing.T) { assert.NoError(t, pid2.Shutdown(ctx)) }) t.Run("With failed PipeTo", func(t *testing.T) { - askTimeout := time.Minute ctx := context.TODO() opts := []pidOption{ withInitMaxRetries(1), - withAskTimeout(askTimeout), withPassivationDisabled(), withCustomLogger(log.DiscardLogger), } @@ -2293,7 +2279,7 @@ func TestReceiveContext(t *testing.T) { require.NoError(t, err) require.NotNil(t, pid2) - reply := context.SendSync(pid2.Name(), new(testpb.TestReply)) + reply := context.SendSync(pid2.Name(), new(testpb.TestReply), time.Minute) require.NotNil(t, reply) expected := new(testpb.Reply) assert.True(t, proto.Equal(expected, reply)) @@ -2338,7 +2324,7 @@ func TestReceiveContext(t *testing.T) { lib.Pause(time.Second) assert.NoError(t, pid2.Shutdown(ctx)) - context.SendSync(pid2.Name(), new(testpb.TestReply)) + context.SendSync(pid2.Name(), new(testpb.TestReply), time.Minute) require.Error(t, context.getError()) t.Cleanup( diff --git a/actors/remoting.go b/actors/remoting.go index d8a9980c..980915f6 100644 --- a/actors/remoting.go +++ b/actors/remoting.go @@ -217,7 +217,7 @@ func (r *Remoting) RemoteBatchTell(ctx context.Context, to *address.Address, mes } // RemoteBatchAsk sends bulk messages to an actor with responses expected -func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message) (responses []*anypb.Any, err error) { +func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, messages []proto.Message, timeout time.Duration) (responses []*anypb.Any, err error) { var requests []*internalpb.RemoteAskRequest for _, message := range messages { packed, err := anypb.New(message) @@ -232,6 +232,7 @@ func (r *Remoting) RemoteBatchAsk(ctx context.Context, to *address.Address, mess Receiver: to.Address, Message: packed, }, + Timeout: durationpb.New(timeout), }, ) } diff --git a/actors/remoting_test.go b/actors/remoting_test.go index 8877efc4..51d7d422 100644 --- a/actors/remoting_test.go +++ b/actors/remoting_test.go @@ -838,7 +838,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - replies, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}) + replies, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) // perform some assertions require.NoError(t, err) require.Len(t, replies, 1) @@ -906,7 +906,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, address.From(addr), []proto.Message{message}) + reply, err := remoting.RemoteBatchAsk(ctx, address.From(addr), []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.Nil(t, reply) @@ -967,7 +967,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}) + reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.EqualError(t, err, "failed_precondition: remoting is not enabled") @@ -1092,7 +1092,7 @@ func TestRemoteAsk(t *testing.T) { // create a message to send to the test actor message := new(testpb.TestReply) // send the message to the actor - reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}) + reply, err := remoting.RemoteBatchAsk(ctx, addr, []proto.Message{message}, time.Minute) // perform some assertions require.Error(t, err) require.Contains(t, err.Error(), "not found") diff --git a/actors/router_test.go b/actors/router_test.go index fad359db..704fa525 100644 --- a/actors/router_test.go +++ b/actors/router_test.go @@ -48,8 +48,7 @@ func TestRouter(t *testing.T) { system, err := NewActorSystem( "testSystem", WithPassivationDisabled(), - WithLogger(logger), - WithReplyTimeout(time.Minute)) + WithLogger(logger)) require.NoError(t, err) require.NotNil(t, system) @@ -108,8 +107,7 @@ func TestRouter(t *testing.T) { system, err := NewActorSystem( "testSystem", WithPassivationDisabled(), - WithLogger(logger), - WithReplyTimeout(time.Minute)) + WithLogger(logger)) require.NoError(t, err) require.NotNil(t, system) @@ -156,8 +154,7 @@ func TestRouter(t *testing.T) { system, err := NewActorSystem( "testSystem", WithPassivationDisabled(), - WithLogger(logger), - WithReplyTimeout(time.Minute)) + WithLogger(logger)) require.NoError(t, err) require.NotNil(t, system) @@ -207,8 +204,7 @@ func TestRouter(t *testing.T) { system, err := NewActorSystem( "testSystem", WithPassivationDisabled(), - WithLogger(logger), - WithReplyTimeout(time.Minute)) + WithLogger(logger)) require.NoError(t, err) require.NotNil(t, system) @@ -269,8 +265,7 @@ func TestRouter(t *testing.T) { system, err := NewActorSystem( "testSystem", WithPassivationDisabled(), - WithLogger(logger), - WithReplyTimeout(time.Minute)) + WithLogger(logger)) require.NoError(t, err) require.NotNil(t, system) diff --git a/actors/scheduler_test.go b/actors/scheduler_test.go index e1950ed7..c86f3341 100644 --- a/actors/scheduler_test.go +++ b/actors/scheduler_test.go @@ -147,7 +147,6 @@ func TestScheduler(t *testing.T) { newTestActor(), withInitMaxRetries(1), withCustomLogger(log.DiscardLogger), - withAskTimeout(replyTimeout), ) require.NoError(t, err) diff --git a/actors/types.go b/actors/types.go index 8220ab73..7271b1ce 100644 --- a/actors/types.go +++ b/actors/types.go @@ -31,8 +31,6 @@ import ( const ( // DefaultPassivationTimeout defines the default passivation timeout DefaultPassivationTimeout = 2 * time.Minute - // DefaultAskTimeout defines the default Ask timeout - DefaultAskTimeout = 20 * time.Second // DefaultInitMaxRetries defines the default value for retrying actor initialization DefaultInitMaxRetries = 5 // DefaultShutdownTimeout defines the default shutdown timeout diff --git a/bench/benchmark_test.go b/bench/benchmark_test.go index 74022179..ff2ec0d7 100644 --- a/bench/benchmark_test.go +++ b/bench/benchmark_test.go @@ -46,8 +46,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithSupervisorDirective(actors.NewStopDirective()), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithSupervisorDirective(actors.NewStopDirective())) // start the actor system _ = actorSystem.Start(ctx) @@ -91,8 +90,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithSupervisorDirective(actors.NewStopDirective()), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithSupervisorDirective(actors.NewStopDirective())) // start the actor system _ = actorSystem.Start(ctx) @@ -130,8 +128,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithSupervisorDirective(actors.NewStopDirective()), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithSupervisorDirective(actors.NewStopDirective())) // start the actor system _ = actorSystem.Start(ctx) @@ -169,8 +166,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithSupervisorDirective(actors.NewStopDirective()), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithSupervisorDirective(actors.NewStopDirective())) // start the actor system _ = actorSystem.Start(ctx) @@ -208,8 +204,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithExpireActorAfter(5*time.Second), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithExpireActorAfter(5*time.Second)) // start the actor system _ = actorSystem.Start(ctx) @@ -251,8 +246,7 @@ func BenchmarkActor(b *testing.B) { actorSystem, _ := actors.NewActorSystem("bench", actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(1), - actors.WithSupervisorDirective(actors.NewStopDirective()), - actors.WithReplyTimeout(receivingTimeout)) + actors.WithSupervisorDirective(actors.NewStopDirective())) // start the actor system _ = actorSystem.Start(ctx) @@ -272,7 +266,7 @@ func BenchmarkActor(b *testing.B) { b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - if _, err := sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest)); err != nil { + if _, err := sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest), receivingTimeout); err != nil { b.Fatal(err) } atomic.AddInt64(&counter, 1) diff --git a/client/client_test.go b/client/client_test.go index 485aac11..94ac68ec 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -98,7 +98,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -182,7 +182,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -268,7 +268,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -350,7 +350,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -433,7 +433,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -515,7 +515,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -602,7 +602,7 @@ func TestClient(t *testing.T) { lib.Pause(time.Second) // send a message - reply, err := client.Ask(ctx, actor, new(testspb.TestReply), actors.DefaultAskTimeout) + reply, err := client.Ask(ctx, actor, new(testspb.TestReply), time.Minute) require.NoError(t, err) require.NotNil(t, reply) expectedReply := &testpb.Reply{Content: "received message"} @@ -788,7 +788,6 @@ func startNode(t *testing.T, logger log.Logger, nodeName, serverAddr string) (sy actorSystemName, actors.WithPassivationDisabled(), actors.WithLogger(logger), - actors.WithReplyTimeout(time.Minute), actors.WithRemoting(host, int32(remotePort)), actors.WithPeerStateLoopInterval(100*time.Millisecond), actors.WithCluster(clusterConfig), diff --git a/testkit/testkit.go b/testkit/testkit.go index 3e8a7888..dbdeb133 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -57,8 +57,7 @@ func New(ctx context.Context, t *testing.T, opts ...Option) *TestKit { actors.WithPassivationDisabled(), actors.WithLogger(testkit.logger), actors.WithActorInitTimeout(time.Second), - actors.WithActorInitMaxRetries(5), - actors.WithReplyTimeout(time.Minute)) + actors.WithActorInitMaxRetries(5)) if err != nil { t.Fatal(err.Error()) }