diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 3941a050..5aec33c6 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -309,11 +309,11 @@ do_init(#{id := Id, MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), MinHeapSize = maps:get(server_min_heap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), - AsyncDist = maps:get(async_dist, SysConf, true), + % AsyncDist = maps:get(async_dist, SysConf, true), process_flag(message_queue_data, MsgQData), process_flag(min_bin_vheap_size, MinBinVheapSize), process_flag(min_heap_size, MinHeapSize), - process_flag(async_dist, AsyncDist), + % process_flag(async_dist, AsyncDist), %% wait for wal for a bit before initialising the server state and log #{cluster := Cluster} = ServerState = ra_server:init(Config), LogId = ra_server:log_id(ServerState), @@ -1321,10 +1321,8 @@ handle_effect(_, {send_rpc, To, Rpc}, _, #state{conf = Conf} = State0, Actions) -> % fully qualified use only so that we can mock it for testing % TODO: review / refactor to remove the mod call here - process_flag(async_dist, false), case ?MODULE:send_rpc(To, Rpc, State0) of ok -> - process_flag(async_dist, true), {State0, Actions}; nosuspend -> %% update peer status to suspended and spawn a process @@ -1339,10 +1337,8 @@ handle_effect(_, {send_rpc, To, Rpc}, _, incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1), Self ! {update_peer, To, #{status => normal}} end), - process_flag(async_dist, true), {update_peer(To, #{status => suspended}, State0), Actions}; noconnect -> - process_flag(async_dist, true), %% for noconnects just allow it to pipeline and catch up later {State0, Actions} end; @@ -1362,13 +1358,29 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, true -> case can_execute_locally(RaftState, ToNode, State) of true -> - send_msg(Eff, State); + case ToNode == node() of + true -> + send_msg(Eff, State); + false -> + %% use async_dist for remote sends + process_flag(async_dist, true), + send_msg(Eff, State), + process_flag(async_dist, false) + end; false -> ok end; false when RaftState == leader -> %% the effect got here so we can execute - send_msg(Eff, State); + case ToNode == node() of + true -> + send_msg(Eff, State); + false -> + %% use async_dist for remote sends + process_flag(async_dist, true), + send_msg(Eff, State), + process_flag(async_dist, false) + end; false -> ok end, @@ -1942,10 +1954,10 @@ handle_tick_metrics(State) -> can_execute_locally(RaftState, TargetNode, #state{server_state = ServerState} = State) -> - Membership = ra_server:get_membership(ServerState), case RaftState of - follower when Membership == voter -> - TargetNode == node(); + follower -> + TargetNode == node() andalso + voter == ra_server:get_membership(ServerState); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 19264d8c..9901832d 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -118,13 +118,13 @@ end_per_testcase(_TestCase, Config) -> single_server_processes_command(Config) -> Name = ?config(test_name, Config), - {RaName, _} = N1 = nth_server_name(Config, 1), + {_RaName, _} = N1 = nth_server_name(Config, 1), ok = ra:start_server(default, Name, N1, add_machine(), []), ok = ra:trigger_election(N1), % index is 2 as leaders commit a no-op entry on becoming leaders monitor(process, element(1, N1)), - ?assertMatch([{async_dist, true}], - process_info(whereis(RaName), [async_dist])), + % ?assertMatch([{async_dist, true}], + % process_info(whereis(RaName), [async_dist])), {ok, 5, _} = ra:process_command(N1, 5, 2000), {ok, 10, _} = ra:process_command(N1, 5, 2000), terminate_cluster([N1]),