diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 3f89ff0d..47c36923 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -72,6 +72,19 @@ -define(HANDLE_EFFECTS(Effects, EvtType, State0), handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)). +-define(ASYNC_DIST(Node, Send), + case Node == node() of + true -> + Send, + ok; + false -> + %% use async_dist for remote sends + process_flag(async_dist, true), + Send, + process_flag(async_dist, false), + ok + end). + -type query_fun() :: ra:query_fun(). -type query_options() :: #{condition => ra:query_condition()}. @@ -1345,8 +1358,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) -> {State, [Next | Actions]}; handle_effect(_, {send_msg, To, Msg}, _, State, Actions) -> %% default is to send without any wrapping - %% TODO: handle send failure? how? - _ = send(To, Msg, State#state.conf), + ToNode = get_node(To), + ?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)), {State, Actions}; handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, _, State, Actions) -> @@ -1355,13 +1368,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, true -> case can_execute_locally(RaftState, ToNode, State) of true -> - send_msg(Eff, State); + ?ASYNC_DIST(ToNode, send_msg(Eff, State)); false -> ok end; false when RaftState == leader -> %% the effect got here so we can execute - send_msg(Eff, State); + ?ASYNC_DIST(ToNode, send_msg(Eff, State)); false -> ok end, @@ -1935,10 +1948,10 @@ handle_tick_metrics(State) -> can_execute_locally(RaftState, TargetNode, #state{server_state = ServerState} = State) -> - Membership = ra_server:get_membership(ServerState), case RaftState of - follower when Membership == voter -> - TargetNode == node(); + follower -> + TargetNode == node() andalso + voter == ra_server:get_membership(ServerState); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. diff --git a/src/ra_system.erl b/src/ra_system.erl index 54b9722c..6023d8e8 100644 --- a/src/ra_system.erl +++ b/src/ra_system.erl @@ -58,8 +58,8 @@ low_priority_commands_flush_size => non_neg_integer(), low_priority_commands_in_memory_size => non_neg_integer(), server_recovery_strategy => undefined | - registered | - {module(), atom(), list()} + registered | + {module(), atom(), list()} }. -export_type([ diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 457cc91f..83369bc6 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -117,15 +117,16 @@ end_per_testcase(_TestCase, Config) -> Config. single_server_processes_command(Config) -> - % ok = logger:set_primary_config(level, all), Name = ?config(test_name, Config), - N1 = nth_server_name(Config, 1), + {_RaName, _} = N1 = nth_server_name(Config, 1), ok = ra:start_server(default, Name, N1, add_machine(), []), ok = ra:trigger_election(N1), % index is 2 as leaders commit a no-op entry on becoming leaders + monitor(process, element(1, N1)), {ok, 5, _} = ra:process_command(N1, 5, 2000), {ok, 10, _} = ra:process_command(N1, 5, 2000), - terminate_cluster([N1]). + terminate_cluster([N1]), + ok. pipeline_commands(Config) -> Name = ?config(test_name, Config),