Skip to content

Commit

Permalink
Enable async_dist by default for Ra server processes.
Browse files Browse the repository at this point in the history
And add a system config option for `async_dist`.

Async dist should reduce dropped control and send_msg effect
send to remote nodes.

In order to make use of dist buf flow control the async_dist flag
will be unset before all pipelined rpc sends.
  • Loading branch information
kjnilsson committed Jan 21, 2025
1 parent 7b69b45 commit 2a93496
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,12 @@ do_init(#{id := Id,
MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf,
?MIN_BIN_VHEAP_SIZE),
MinHeapSize = maps:get(server_min_heap_size, SysConf, ?MIN_BIN_VHEAP_SIZE),
AsyncDist = maps:get(async_dist, SysConf, true),
process_flag(message_queue_data, MsgQData),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
process_flag(async_dist, AsyncDist),
%% wait for wal for a bit before initialising the server state and log
#{cluster := Cluster} = ServerState = ra_server:init(Config),
LogId = ra_server:log_id(ServerState),
UId = ra_server:uid(ServerState),
Expand Down Expand Up @@ -1318,8 +1321,10 @@ handle_effect(_, {send_rpc, To, Rpc}, _,
#state{conf = Conf} = State0, Actions) ->
% fully qualified use only so that we can mock it for testing
% TODO: review / refactor to remove the mod call here
process_flag(async_dist, false),
case ?MODULE:send_rpc(To, Rpc, State0) of
ok ->
process_flag(async_dist, true),
{State0, Actions};
nosuspend ->
%% update peer status to suspended and spawn a process
Expand All @@ -1334,8 +1339,10 @@ handle_effect(_, {send_rpc, To, Rpc}, _,
incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1),
Self ! {update_peer, To, #{status => normal}}
end),
process_flag(async_dist, true),
{update_peer(To, #{status => suspended}, State0), Actions};
noconnect ->
process_flag(async_dist, true),
%% for noconnects just allow it to pipeline and catch up later
{State0, Actions}
end;
Expand Down
4 changes: 3 additions & 1 deletion src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
{module(), atom(), list()}
{module(), atom(), list()},
async_dist => boolean()
}.

-export_type([
Expand Down Expand Up @@ -143,6 +144,7 @@ default_config() ->
receive_snapshot_timeout => ReceiveSnapshotTimeout,
low_priority_commands_flush_size => LowPriorityCommandsFlushSize,
low_priority_commands_in_memory_size => LowPriorityInMemSize,
async_dist => true,
names => #{wal => ra_log_wal,
wal_sup => ra_log_wal_sup,
log_sup => ra_log_sup,
Expand Down
9 changes: 6 additions & 3 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,18 @@ end_per_testcase(_TestCase, Config) ->
Config.

single_server_processes_command(Config) ->
% ok = logger:set_primary_config(level, all),
Name = ?config(test_name, Config),
N1 = nth_server_name(Config, 1),
{RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
% index is 2 as leaders commit a no-op entry on becoming leaders
monitor(process, element(1, N1)),
?assertMatch([{async_dist, true}],
process_info(whereis(RaName), [async_dist])),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]).
terminate_cluster([N1]),
ok.

pipeline_commands(Config) ->
Name = ?config(test_name, Config),
Expand Down

0 comments on commit 2a93496

Please sign in to comment.