diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 269aef8b..4a98a6ac 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -47,7 +47,8 @@ all_tests() -> leaderboard, bench, disconnected_node_catches_up, - key_metrics + key_metrics, + recover_from_checkpoint ]. groups() -> @@ -708,6 +709,86 @@ bench(Config) -> ra_lib:recursive_delete(PrivDir), ok. +recover_from_checkpoint(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + ServerNames = [s1, s2, s3], + ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- ServerNames], + Configs = [begin + UId = atom_to_binary(Name, utf8), + #{cluster_name => ClusterName, + id => NodeId, + uid => UId, + initial_members => ServerIds, + machine => {module, ?MODULE, #{}}, + log_init_args => #{uid => UId, + min_checkpoint_interval => 3, + snapshot_interval => 5}} + end || {Name, _Node} = NodeId <- ServerIds], + {ok, Started, []} = ra:start_cluster(?SYS, Configs), + {ok, _, Leader} = ra:members(hd(Started)), + [Follower1, Follower2] = ServerIds -- [Leader], + + %% Send five commands to trigger a snapshot. + [ok = ra:pipeline_command(Leader, N, no_correlation, normal) + || N <- lists:seq(1, 6)], + await_condition( + fun () -> + {ok, #{log := #{snapshot_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{snapshot_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{snapshot_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 6 andalso Follower1Idx =:= 6 andalso + Follower2Idx =:= 6 + end, 20), + + %% Trigger a checkpoint. + {ok, _, _} = ra:process_command(Leader, checkpoint), + await_condition( + fun () -> + {ok, #{log := #{latest_checkpoint_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{latest_checkpoint_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{latest_checkpoint_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso + Follower2Idx =:= 8 + end, 20), + + %% Restart the servers + [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], + [ok = ra:restart_server(?SYS, ServerId) || ServerId <- ServerIds], + + %% All servers should have recovered from their checkpoints since the + %% checkpoint has a higher index than the snapshot. + [{ok, {_CurrentIdx, _CheckpointIdx = 8}, _Leader} = + ra:local_query(ServerId, fun(State) -> + maps:get(checkpoint_index, State, + undefined) + end) || ServerId <- ServerIds], + + %% Promote the checkpoint into a snapshot. + {ok, _, _} = ra:process_command(Leader, promote_checkpoint), + await_condition( + fun () -> + {ok, #{log := #{snapshot_index := LeaderIdx}}, _} = + ra:member_overview(Leader), + {ok, #{log := #{snapshot_index := Follower1Idx}}, _} = + ra:member_overview(Follower1), + {ok, #{log := #{snapshot_index := Follower2Idx}}, _} = + ra:member_overview(Follower2), + LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso + Follower2Idx =:= 8 + end, 20), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + +%% Utility + test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) -> Opts = case Opts0 of local -> [local]; @@ -761,8 +842,6 @@ test_local_msg(Leader, ReceiverNode, ExpectedSenderNode, CmdTag, Opts0) -> flush(), ok. -%% Utility - get_current_host() -> NodeStr = atom_to_list(node()), Host = re:replace(NodeStr, "^[^@]+@", "", [{return, list}]), @@ -802,7 +881,7 @@ flush() -> %% ra_machine impl init(_) -> - {#{}, []}. + #{}. apply(_Meta, {send_local_msg, Pid, Opts}, State) -> {State, ok, [{send_msg, Pid, {local_msg, node()}, Opts}]}; @@ -815,6 +894,15 @@ apply(#{index := Idx}, {do_local_log, SenderPid, Opts}, State) -> {State, ok, [Eff]}; apply(#{index := _Idx}, {data, _}, State) -> {State, ok, []}; +apply(#{index := Idx}, checkpoint, State) -> + %% Generally machines should save their state without any modifications + %% but we slightly modify the machine state we save in the checkpoint here + %% so that we can tell when we've recovered from a checkpoint rather than + %% a snapshot. + CheckpointState = maps:put(checkpoint_index, Idx, State), + {State, ok, [{checkpoint, Idx, CheckpointState}]}; +apply(#{index := Idx}, promote_checkpoint, State) -> + {State, ok, [{release_cursor, Idx}]}; apply(#{index := Idx}, _Cmd, State) -> {State, ok, [{release_cursor, Idx, State}]}.