Skip to content

Commit

Permalink
Enable async_dist when processing send_msg effects
Browse files Browse the repository at this point in the history
for remote nodes.

This will allow users to simplify their Ra client implementations
not to have to handle lost messages as could occur
when the distribution buffer fills.
  • Loading branch information
kjnilsson committed Jan 22, 2025
1 parent 7b69b45 commit 5e6e757
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 12 deletions.
39 changes: 32 additions & 7 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ do_init(#{id := Id,
process_flag(message_queue_data, MsgQData),
process_flag(min_bin_vheap_size, MinBinVheapSize),
process_flag(min_heap_size, MinHeapSize),
%% wait for wal for a bit before initialising the server state and log
#{cluster := Cluster} = ServerState = ra_server:init(Config),
LogId = ra_server:log_id(ServerState),
UId = ra_server:uid(ServerState),
Expand Down Expand Up @@ -1345,8 +1346,16 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
{State, [Next | Actions]};
handle_effect(_, {send_msg, To, Msg}, _, State, Actions) ->
%% default is to send without any wrapping
%% TODO: handle send failure? how?
_ = send(To, Msg, State#state.conf),
ToNode = get_node(To),
case ToNode == node() of
true ->
_ = send(To, Msg, State#state.conf);
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
_ = send(To, Msg, State#state.conf),
process_flag(async_dist, false)
end,
{State, Actions};
handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
_, State, Actions) ->
Expand All @@ -1355,13 +1364,29 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
true ->
case can_execute_locally(RaftState, ToNode, State) of
true ->
send_msg(Eff, State);
case ToNode == node() of
true ->
send_msg(Eff, State);
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
send_msg(Eff, State),
process_flag(async_dist, false)
end;
false ->
ok
end;
false when RaftState == leader ->
%% the effect got here so we can execute
send_msg(Eff, State);
case ToNode == node() of
true ->
send_msg(Eff, State);
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
send_msg(Eff, State),
process_flag(async_dist, false)
end;
false ->
ok
end,
Expand Down Expand Up @@ -1935,10 +1960,10 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
case RaftState of
follower when Membership == voter ->
TargetNode == node();
follower ->
TargetNode == node() andalso
voter == ra_server:get_membership(ServerState);
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Expand Down
4 changes: 2 additions & 2 deletions src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
low_priority_commands_flush_size => non_neg_integer(),
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
{module(), atom(), list()}
registered |
{module(), atom(), list()}
}.

-export_type([
Expand Down
7 changes: 4 additions & 3 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,16 @@ end_per_testcase(_TestCase, Config) ->
Config.

single_server_processes_command(Config) ->
% ok = logger:set_primary_config(level, all),
Name = ?config(test_name, Config),
N1 = nth_server_name(Config, 1),
{_RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
% index is 2 as leaders commit a no-op entry on becoming leaders
monitor(process, element(1, N1)),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]).
terminate_cluster([N1]),
ok.

pipeline_commands(Config) ->
Name = ?config(test_name, Config),
Expand Down

0 comments on commit 5e6e757

Please sign in to comment.