Skip to content

Commit

Permalink
Merge pull request #425 from rabbitmq/add-reply-mode-to-command-meta-…
Browse files Browse the repository at this point in the history
…data

Add reply_mode to command meta data
  • Loading branch information
kjnilsson authored Mar 8, 2024
2 parents b90fc3d + c3e1d01 commit 57da222
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@
index := ra_index(),
term := ra_term(),
machine_version => version(),
from => from()}.
from => from(),
reply_mode => ra_server:command_reply_mode()}.
%% extensible command meta data map


Expand Down
19 changes: 12 additions & 7 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2492,29 +2492,29 @@ apply_with(_Cmd,
when MacVer < Effective ->
%% we cannot apply any further entries
{Mod, LastAppliedIdx, State, MacSt, Effects, Notifys, LastTs};
apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyType}},
apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyMode}},
{Module, _LastAppliedIdx,
State = #{cfg := #cfg{effective_machine_version = MacVer}},
MacSt, Effects0, Notifys0, LastTs}) ->
%% augment the meta data structure
Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta),
Meta = augment_command_meta(Idx, Term, MacVer, ReplyMode, CmdMeta),
Ts = maps:get(ts, CmdMeta, LastTs),
case ra_machine:apply(Module, Meta, Cmd, MacSt) of
{NextMacSt, Reply, AppEffs} ->
{Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyType,
{Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyMode,
append_app_effects(AppEffs, Effects0),
Notifys0),
{Module, Idx, State, NextMacSt,
Effects, Notifys, Ts};
{NextMacSt, Reply} ->
{Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyType,
{Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyMode,
Effects0, Notifys0),
{Module, Idx, State, NextMacSt,
Effects, Notifys, Ts}
end;
apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyMode}},
{Mod, _, State0, MacSt, Effects0, Notifys0, LastTs}) ->
{Effects, Notifys} = add_reply(CmdMeta, ok, ReplyType,
{Effects, Notifys} = add_reply(CmdMeta, ok, ReplyMode,
Effects0, Notifys0),
State = case State0 of
#{cluster_index_term := {CI, CT}}
Expand Down Expand Up @@ -2569,7 +2569,7 @@ apply_with({Idx, Term, {noop, CmdMeta, NextMacVer}},
},
State = State0#{cfg => Cfg,
cluster_change_permitted => ClusterChangePerm},
Meta = augment_command_meta(Idx, Term, MacVer, CmdMeta),
Meta = augment_command_meta(Idx, Term, MacVer, undefined, CmdMeta),
?DEBUG("~ts: applying new machine version ~b current ~b",
[LogId, NextMacVer, OldMacVer]),
apply_with({Idx, Term,
Expand Down Expand Up @@ -2611,6 +2611,11 @@ apply_with({Idx, _, _} = Cmd, Acc) ->
[log_id(element(2, Acc)), Cmd, 10]),
setelement(2, Acc, Idx).

augment_command_meta(Idx, Term, MacVer, undefined, CmdMeta) ->
augment_command_meta(Idx, Term, MacVer, CmdMeta);
augment_command_meta(Idx, Term, MacVer, ReplyMode, CmdMeta) ->
augment_command_meta(Idx, Term, MacVer, CmdMeta#{reply_mode => ReplyMode}).

augment_command_meta(Idx, Term, MacVer, CmdMeta) ->
maps:fold(fun (ts, V, Acc) ->
%% rename from compact key name
Expand Down
32 changes: 31 additions & 1 deletion test/ra_machine_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ all_tests() ->
deleted_cluster_emits_eol_effect,
machine_state_enter_effects,
meta_data,
meta_data_2,
append_effect,
append_effect_with_notify,
append_effect_follower,
Expand Down Expand Up @@ -388,7 +389,8 @@ meta_data(Config) ->
meck:expect(Mod, init, fun (_) -> the_state end),
meck:expect(Mod, apply, fun (#{index := Idx,
term := Term,
system_time := Ts}, _, State) ->
system_time := Ts,
reply_mode := await_consensus}, _, State) ->
{State, {metadata, Idx, Term, Ts}}
end),
ClusterName = ?config(cluster_name, Config),
Expand All @@ -403,6 +405,34 @@ meta_data(Config) ->
?assert(Term > 0),
ok.

meta_data_2(Config) ->
Mod = ?config(modname, Config),
Self = self(),
meck:new(Mod, [non_strict]),
meck:expect(Mod, init, fun (_) -> the_state end),
meck:expect(Mod, apply, fun (#{index := Idx,
term := Term,
system_time := Ts,
reply_mode := {notify, 42, Pid}}, _, State)
when Pid == Self ->
{State, {metadata, Idx, Term, Ts}}
end),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
T = erlang:system_time(millisecond),
timer:sleep(1),
ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]),
ok = ra:pipeline_command(ServerId, any_command, 42, normal),
receive
{ra_event, _, {applied, [{42, {metadata, _, _, Ts}}]}}
when Ts > T ->
ok
after 5000 ->
flush(),
ct:fail("applied not received")
end.


append_effect(Config) ->
Mod = ?config(modname, Config),
Self = self(),
Expand Down

0 comments on commit 57da222

Please sign in to comment.