Skip to content

Commit

Permalink
Add an integration test for checkpoints and promotion
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis committed Feb 13, 2024
1 parent 336bded commit d9890cc
Showing 1 changed file with 92 additions and 4 deletions.
96 changes: 92 additions & 4 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ all_tests() ->
leaderboard,
bench,
disconnected_node_catches_up,
key_metrics
key_metrics,
recover_from_checkpoint
].

groups() ->
Expand Down Expand Up @@ -708,6 +709,86 @@ bench(Config) ->
ra_lib:recursive_delete(PrivDir),
ok.

recover_from_checkpoint(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
ServerNames = [s1, s2, s3],
ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames],
Configs = [begin
UId = atom_to_binary(Name, utf8),
#{cluster_name => ClusterName,
id => NodeId,
uid => UId,
initial_members => ServerIds,
machine => {module, ?MODULE, #{}},
log_init_args => #{uid => UId,
min_checkpoint_interval => 3,
snapshot_interval => 5}}
end || {Name, _Node} = NodeId <- ServerIds],
{ok, Started, []} = ra:start_cluster(?SYS, Configs),
{ok, _, Leader} = ra:members(hd(Started)),
[Follower1, Follower2] = ServerIds -- [Leader],

%% Send five commands to trigger a snapshot.
[ok = ra:pipeline_command(Leader, N, no_correlation, normal)
|| N <- lists:seq(1, 6)],
await_condition(
fun () ->
{ok, #{log := #{snapshot_index := LeaderIdx}}, _} =
ra:member_overview(Leader),
{ok, #{log := #{snapshot_index := Follower1Idx}}, _} =
ra:member_overview(Follower1),
{ok, #{log := #{snapshot_index := Follower2Idx}}, _} =
ra:member_overview(Follower2),
LeaderIdx =:= 6 andalso Follower1Idx =:= 6 andalso
Follower2Idx =:= 6
end, 20),

%% Trigger a checkpoint.
{ok, _, _} = ra:process_command(Leader, checkpoint),
await_condition(
fun () ->
{ok, #{log := #{latest_checkpoint_index := LeaderIdx}}, _} =
ra:member_overview(Leader),
{ok, #{log := #{latest_checkpoint_index := Follower1Idx}}, _} =
ra:member_overview(Follower1),
{ok, #{log := #{latest_checkpoint_index := Follower2Idx}}, _} =
ra:member_overview(Follower2),
LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
Follower2Idx =:= 8
end, 20),

%% Restart the servers
[ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds],
[ok = ra:restart_server(?SYS, ServerId) || ServerId <- ServerIds],

%% All servers should have recovered from their checkpoints since the
%% checkpoint has a higher index than the snapshot.
[{ok, {_CurrentIdx, _CheckpointIdx = 8}, _Leader} =
ra:local_query(ServerId, fun(State) ->
maps:get(checkpoint_index, State,
undefined)
end) || ServerId <- ServerIds],

%% Promote the checkpoint into a snapshot.
{ok, _, _} = ra:process_command(Leader, promote_checkpoint),
await_condition(
fun () ->
{ok, #{log := #{snapshot_index := LeaderIdx}}, _} =
ra:member_overview(Leader),
{ok, #{log := #{snapshot_index := Follower1Idx}}, _} =
ra:member_overview(Follower1),
{ok, #{log := #{snapshot_index := Follower2Idx}}, _} =
ra:member_overview(Follower2),
LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso
Follower2Idx =:= 8
end, 20),

[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

%% Utility

test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) ->
Opts = case Opts0 of
local -> [local];
Expand Down Expand Up @@ -761,8 +842,6 @@ test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) ->
flush(),
ok.

%% Utility

get_current_host() ->
NodeStr = atom_to_list(node()),
Host = re:replace(NodeStr, "^[^@]+@", "", [{return, list}]),
Expand Down Expand Up @@ -802,7 +881,7 @@ flush() ->
%% ra_machine impl

init(_) ->
{#{}, []}.
#{}.

apply(_Meta, {send_local_msg, Pid, Opts}, State) ->
{State, ok, [{send_msg, Pid, {local_msg, node()}, Opts}]};
Expand All @@ -815,6 +894,15 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) ->
{State, ok, [Eff]};
apply(#{index := _Idx}, {data, _}, State) ->
{State, ok, []};
apply(#{index := Idx}, checkpoint, State) ->
%% Generally machines should save their state without any modifications
%% but we slightly modify the machine state we save in the checkpoint here
%% so that we can tell when we've recovered from a checkpoint rather than
%% a snapshot.
CheckpointState = maps:put(checkpoint_index, Idx, State),
{State, ok, [{checkpoint, Idx, CheckpointState}]};
apply(#{index := Idx}, promote_checkpoint, State) ->
{State, ok, [{release_cursor, Idx}]};
apply(#{index := Idx}, _Cmd, State) ->
{State, ok, [{release_cursor, Idx, State}]}.

Expand Down

0 comments on commit d9890cc

Please sign in to comment.