Skip to content

Commit

Permalink
Merge pull request #501 from rabbitmq/async-dist
Browse files Browse the repository at this point in the history
Enable async_dist by default for Ra server processes.
  • Loading branch information
kjnilsson authored Jan 22, 2025
2 parents cc54084 + 3923612 commit 79e6531
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
30 changes: 20 additions & 10 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@
-define(HANDLE_EFFECTS(Effects, EvtType, State0),
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).

-define(ASYNC_DIST(Node, Send),
case Node == node() of
true ->
Send,
ok;
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
Send,
process_flag(async_dist, false),
ok
end).

-type query_fun() :: ra:query_fun().
-type query_options() :: #{condition => ra:query_condition()}.

Expand Down Expand Up @@ -1349,8 +1362,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
{State, [Next | Actions]};
handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) ->
%% default is to send without any wrapping
%% TODO: handle send failure? how?
_ = send(To, Msg, State#state.conf),
ToNode = get_node(To),
?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)),
{State, Actions};
handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
_, State, Actions) ->
Expand All @@ -1359,13 +1372,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
true ->
case can_execute_locally(RaftState, ToNode, State) of
true ->
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end;
false when RaftState == leader ->
%% the effect got here so we can execute
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end,
Expand Down Expand Up @@ -1978,20 +1991,17 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
case RaftState of
_ when RaftState =/= leader andalso
Membership == voter ->
TargetNode == node();
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Members = do_state_query(voters, State),
not lists:any(fun ({_, N}) -> N == TargetNode end, Members);
leader ->
true;
_ ->
false
_ when RaftState =/= leader ->
TargetNode == node() andalso
voter == ra_server:get_membership(ServerState)
end.

can_execute_on_member(_RaftState, Member,
Expand Down
9 changes: 3 additions & 6 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,16 @@ end_per_testcase(_TestCase, Config) ->
Config.

single_server_processes_command(Config) ->
% ok = logger:set_primary_config(level, all),
Name = ?config(test_name, Config),
N1 = nth_server_name(Config, 1),
{_RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
monitor(process, element(1, N1)),
% index is 2 as leaders commit a no-op entry on becoming leaders
% debugger:start(),
% int:i(ra_server_proc),
% int:break(ra_server_proc, 440),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]).
terminate_cluster([N1]),
ok.

pipeline_commands(Config) ->
Name = ?config(test_name, Config),
Expand Down

0 comments on commit 79e6531

Please sign in to comment.