Skip to content

Commit

Permalink
Try using async_dist only for remote send_msg effects
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Jan 21, 2025
1 parent 2a93496 commit 885b1a2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
34 changes: 23 additions & 11 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ do_init(#{id := Id,
MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf,
?MIN_BIN_VHEAP_SIZE),
MinHeapSize = maps:get(server_min_heap_size, SysConf, ?MIN_BIN_VHEAP_SIZE),
AsyncDist = maps:get(async_dist, SysConf, true),
% AsyncDist = maps:get(async_dist, SysConf, true),
process_flag(message_queue_data, MsgQData),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
process_flag(async_dist, AsyncDist),
% process_flag(async_dist, AsyncDist),
%% wait for wal for a bit before initialising the server state and log
#{cluster := Cluster} = ServerState = ra_server:init(Config),
LogId = ra_server:log_id(ServerState),
Expand Down Expand Up @@ -1321,10 +1321,8 @@ handle_effect(_, {send_rpc, To, Rpc}, _,
#state{conf = Conf} = State0, Actions) ->
% fully qualified use only so that we can mock it for testing
% TODO: review / refactor to remove the mod call here
process_flag(async_dist, false),
case ?MODULE:send_rpc(To, Rpc, State0) of
ok ->
process_flag(async_dist, true),
{State0, Actions};
nosuspend ->
%% update peer status to suspended and spawn a process
Expand All @@ -1339,10 +1337,8 @@ handle_effect(_, {send_rpc, To, Rpc}, _,
incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1),
Self ! {update_peer, To, #{status => normal}}
end),
process_flag(async_dist, true),
{update_peer(To, #{status => suspended}, State0), Actions};
noconnect ->
process_flag(async_dist, true),
%% for noconnects just allow it to pipeline and catch up later
{State0, Actions}
end;
Expand All @@ -1362,13 +1358,29 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
true ->
case can_execute_locally(RaftState, ToNode, State) of
true ->
send_msg(Eff, State);
case ToNode == node() of
true ->
send_msg(Eff, State);
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
send_msg(Eff, State),
process_flag(async_dist, false)
end;
false ->
ok
end;
false when RaftState == leader ->
%% the effect got here so we can execute
send_msg(Eff, State);
case ToNode == node() of
true ->
send_msg(Eff, State);
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
send_msg(Eff, State),
process_flag(async_dist, false)
end;
false ->
ok
end,
Expand Down Expand Up @@ -1942,10 +1954,10 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
case RaftState of
follower when Membership == voter ->
TargetNode == node();
follower ->
TargetNode == node() andalso
voter == ra_server:get_membership(ServerState);
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Expand Down
6 changes: 3 additions & 3 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ end_per_testcase(_TestCase, Config) ->

single_server_processes_command(Config) ->
Name = ?config(test_name, Config),
{RaName, _} = N1 = nth_server_name(Config, 1),
{_RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
% index is 2 as leaders commit a no-op entry on becoming leaders
monitor(process, element(1, N1)),
?assertMatch([{async_dist, true}],
process_info(whereis(RaName), [async_dist])),
% ?assertMatch([{async_dist, true}],
% process_info(whereis(RaName), [async_dist])),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]),
Expand Down

0 comments on commit 885b1a2

Please sign in to comment.