diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 3f89ff0d..3941a050 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -309,9 +309,12 @@ do_init(#{id := Id, MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), MinHeapSize = maps:get(server_min_heap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), + AsyncDist = maps:get(async_dist, SysConf, true), process_flag(message_queue_data, MsgQData), process_flag(min_bin_vheap_size, MinBinVheapSize), process_flag(min_heap_size, MinHeapSize), + process_flag(async_dist, AsyncDist), + %% wait for wal for a bit before initialising the server state and log #{cluster := Cluster} = ServerState = ra_server:init(Config), LogId = ra_server:log_id(ServerState), UId = ra_server:uid(ServerState), @@ -1318,8 +1321,10 @@ handle_effect(_, {send_rpc, To, Rpc}, _, #state{conf = Conf} = State0, Actions) -> % fully qualified use only so that we can mock it for testing % TODO: review / refactor to remove the mod call here + process_flag(async_dist, false), case ?MODULE:send_rpc(To, Rpc, State0) of ok -> + process_flag(async_dist, true), {State0, Actions}; nosuspend -> %% update peer status to suspended and spawn a process @@ -1334,8 +1339,10 @@ handle_effect(_, {send_rpc, To, Rpc}, _, incr_counter(Conf, ?C_RA_SRV_MSGS_SENT, 1), Self ! {update_peer, To, #{status => normal}} end), + process_flag(async_dist, true), {update_peer(To, #{status => suspended}, State0), Actions}; noconnect -> + process_flag(async_dist, true), %% for noconnects just allow it to pipeline and catch up later {State0, Actions} end; diff --git a/src/ra_system.erl b/src/ra_system.erl index 54b9722c..c6a4e11f 100644 --- a/src/ra_system.erl +++ b/src/ra_system.erl @@ -59,7 +59,8 @@ low_priority_commands_in_memory_size => non_neg_integer(), server_recovery_strategy => undefined | registered | - {module(), atom(), list()} + {module(), atom(), list()}, + async_dist => boolean() }. -export_type([ @@ -143,6 +144,7 @@ default_config() -> receive_snapshot_timeout => ReceiveSnapshotTimeout, low_priority_commands_flush_size => LowPriorityCommandsFlushSize, low_priority_commands_in_memory_size => LowPriorityInMemSize, + async_dist => true, names => #{wal => ra_log_wal, wal_sup => ra_log_wal_sup, log_sup => ra_log_sup, diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 457cc91f..19264d8c 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -117,15 +117,18 @@ end_per_testcase(_TestCase, Config) -> Config. single_server_processes_command(Config) -> - % ok = logger:set_primary_config(level, all), Name = ?config(test_name, Config), - N1 = nth_server_name(Config, 1), + {RaName, _} = N1 = nth_server_name(Config, 1), ok = ra:start_server(default, Name, N1, add_machine(), []), ok = ra:trigger_election(N1), % index is 2 as leaders commit a no-op entry on becoming leaders + monitor(process, element(1, N1)), + ?assertMatch([{async_dist, true}], + process_info(whereis(RaName), [async_dist])), {ok, 5, _} = ra:process_command(N1, 5, 2000), {ok, 10, _} = ra:process_command(N1, 5, 2000), - terminate_cluster([N1]). + terminate_cluster([N1]), + ok. pipeline_commands(Config) -> Name = ?config(test_name, Config),