Skip to content

Commit

Permalink
Merge pull request #502 from rabbitmq/async-dist-v2.15.x
Browse files Browse the repository at this point in the history
Enable async_dist by default for Ra server processes.
  • Loading branch information
kjnilsson authored Jan 22, 2025
2 parents 7b69b45 + e957465 commit 554a974
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
27 changes: 20 additions & 7 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@
-define(HANDLE_EFFECTS(Effects, EvtType, State0),
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).

-define(ASYNC_DIST(Node, Send),
case Node == node() of
true ->
Send,
ok;
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
Send,
process_flag(async_dist, false),
ok
end).

-type query_fun() :: ra:query_fun().
-type query_options() :: #{condition => ra:query_condition()}.

Expand Down Expand Up @@ -1345,8 +1358,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
{State, [Next | Actions]};
handle_effect(_, {send_msg, To, Msg}, _, State, Actions) ->
%% default is to send without any wrapping
%% TODO: handle send failure? how?
_ = send(To, Msg, State#state.conf),
ToNode = get_node(To),
?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)),
{State, Actions};
handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
_, State, Actions) ->
Expand All @@ -1355,13 +1368,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
true ->
case can_execute_locally(RaftState, ToNode, State) of
true ->
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end;
false when RaftState == leader ->
%% the effect got here so we can execute
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end,
Expand Down Expand Up @@ -1935,10 +1948,10 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
case RaftState of
follower when Membership == voter ->
TargetNode == node();
follower ->
TargetNode == node() andalso
voter == ra_server:get_membership(ServerState);
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Expand Down
4 changes: 2 additions & 2 deletions src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
low_priority_commands_flush_size => non_neg_integer(),
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
{module(), atom(), list()}
registered |
{module(), atom(), list()}
}.

-export_type([
Expand Down
7 changes: 4 additions & 3 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,16 @@ end_per_testcase(_TestCase, Config) ->
Config.

single_server_processes_command(Config) ->
% ok = logger:set_primary_config(level, all),
Name = ?config(test_name, Config),
N1 = nth_server_name(Config, 1),
{_RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
% index is 2 as leaders commit a no-op entry on becoming leaders
monitor(process, element(1, N1)),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]).
terminate_cluster([N1]),
ok.

pipeline_commands(Config) ->
Name = ?config(test_name, Config),
Expand Down

0 comments on commit 554a974

Please sign in to comment.