Skip to content

Commit

Permalink
Merge pull request #369 from rabbitmq/gh-368
Browse files Browse the repository at this point in the history
Fix snapshot installation CRC failure
  • Loading branch information
michaelklishin authored May 10, 2023
2 parents 2120b01 + 7d2cd01 commit 87b04c3
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 111 deletions.
5 changes: 4 additions & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
%% after node restart). Pids are not stable in this sense.
-type ra_server_id() :: {Name :: atom(), Node :: node()}.

-type ra_peer_status() :: normal | {sending_snapshot, pid()} | suspended.
-type ra_peer_status() :: normal |
{sending_snapshot, pid()} |
suspended |
disconnected.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
Expand Down
66 changes: 44 additions & 22 deletions src/ra_log_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
begin_accept/2,
accept_chunk/2,
complete_accept/2,
begin_read/1,
begin_read/2,
read_chunk/3,
recover/1,
validate/1,
read_meta/1
read_meta/1,
context/0
]).

-define(MAGIC, "RASN").
Expand Down Expand Up @@ -68,34 +69,51 @@ begin_accept(SnapDir, Meta) ->
Data]),
{ok, {PartialCrc, Fd}}.

accept_chunk(<<?MAGIC, ?VERSION:8/unsigned, Crc:32/integer,
Rest/binary>> = Chunk, {_PartialCrc, Fd}) ->
% ensure we overwrite the existing header when we are receiving the
% full file
PartialCrc = erlang:crc32(Rest),
{ok, 0} = file:position(Fd, 0),
ok = file:write(Fd, Chunk),
{ok, {PartialCrc, Crc, Fd}};
accept_chunk(Chunk, {PartialCrc, Fd}) ->
<<Crc:32/integer, Rest/binary>> = Chunk,
accept_chunk(Rest, {PartialCrc, Crc, Fd});
%% compatibility clause where we did not receive the full file
%% do not validate Crc due to OTP 26 map key ordering changes
<<_Crc:32/integer, Rest/binary>> = Chunk,
accept_chunk(Rest, {PartialCrc, undefined, Fd});
accept_chunk(Chunk, {PartialCrc0, Crc, Fd}) ->
ok = file:write(Fd, Chunk),
PartialCrc = erlang:crc32(PartialCrc0, Chunk),
{ok, {PartialCrc, Crc, Fd}}.

complete_accept(Chunk, {PartialCrc, Fd}) ->
<<Crc:32/integer, Rest/binary>> = Chunk,
complete_accept(Rest, {PartialCrc, Crc, Fd});
complete_accept(Chunk, {PartialCrc0, Crc, Fd}) ->
ok = file:write(Fd, Chunk),
ok = file:pwrite(Fd, 5, <<Crc:32/integer>>),
Crc = erlang:crc32(PartialCrc0, Chunk),
complete_accept(Chunk, St0) ->
{ok, {CalculatedCrc, Crc, Fd}} = accept_chunk(Chunk, St0),
CrcToWrite = case Crc of
undefined ->
CalculatedCrc;
_ ->
Crc
end,
ok = file:pwrite(Fd, 5, <<CrcToWrite:32/integer>>),
ok = file:sync(Fd),
ok = file:close(Fd),
CalculatedCrc = CrcToWrite,
ok.

begin_read(Dir) ->
begin_read(Dir, Context) ->
File = filename(Dir),
case file:open(File, [read, binary, raw]) of
{ok, Fd} ->
case read_meta_internal(Fd) of
{ok, Meta, _Crc}
when map_get(can_accept_full_file, Context) ->
{ok, Eof} = file:position(Fd, eof),
{ok, Meta, {0, Eof, Fd}};
{ok, Meta, Crc} ->
{ok, DataStart} = file:position(Fd, cur),
{ok, Cur} = file:position(Fd, cur),
{ok, Eof} = file:position(Fd, eof),
{ok, Meta, {Crc, {DataStart, Eof, Fd}}};
{ok, Meta, {Crc, {Cur, Eof, Fd}}};
{error, _} = Err ->
_ = file:close(Fd),
Err
Expand All @@ -105,6 +123,7 @@ begin_read(Dir) ->
end.

read_chunk({Crc, ReadState}, Size, Dir) when is_integer(Crc) ->
%% this the compatibility read mode for old snapshot receivers
case read_chunk(ReadState, Size - 4, Dir) of
{ok, Data, ReadState1} ->
{ok, <<Crc:32/integer, Data/binary>>, ReadState1};
Expand Down Expand Up @@ -156,25 +175,28 @@ validate(Dir) ->
%% entire binary body. NB: this does not do checksum validation.
-spec read_meta(file:filename()) ->
{ok, meta()} | {error, invalid_format |
{invalid_version, integer()} |
checksum_error |
file_err()}.
{invalid_version, integer()} |
checksum_error |
file_err()}.
read_meta(Dir) ->
File = filename(Dir),
case file:open(File, [read, binary, raw]) of
{ok, Fd} ->
case read_meta_internal(Fd) of
{ok, Meta, _} ->
{ok, Meta, _Crc} ->
_ = file:close(Fd),
{ok, Meta};
{error, _} = Err ->
Err ->
_ = file:close(Fd),
Err
end;
Err ->
Err
end
end.

-spec context() -> map().
context() ->
#{can_accept_full_file => true}.


%% Internal

read_meta_internal(Fd) ->
Expand Down
30 changes: 19 additions & 11 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
persist_last_applied/1,
update_peer/3,
register_external_log_reader/2,
update_disconnected_peers/3,
handle_down/5,
handle_node_status/6,
terminate/2,
Expand Down Expand Up @@ -1648,20 +1649,14 @@ make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
%% TODO: refactor this please, why does make_rpc_effect need to take the
%% full state
maps:fold(
fun (I, _, Acc) when I =:= Id ->
%% oneself
Acc;
(_, #{status := suspended}, Acc) ->
Acc;
(_, #{status := {sending_snapshot, _}}, Acc) ->
%% if a peer is currently receiving a snapshot
%% do not send any append entries rpcs
Acc;
(PeerId, #{next_index := NextIdx,
fun (PeerId, #{next_index := NextIdx,
status := normal,
commit_index_sent := CI,
match_index := MatchIdx} = Peer0,
{S0, More0, Effs} = Acc)
when NextIdx < NextLogIdx orelse CI < CommitIndex ->
when PeerId =/= Id andalso
(NextIdx < NextLogIdx orelse CI < CommitIndex) ->
% the status is normal and
% there are unsent items or a new commit index
% check if the match index isn't too far behind the
% next index
Expand Down Expand Up @@ -1840,6 +1835,19 @@ register_external_log_reader(Pid, #{log := Log0} = State) ->
{Log, Effs} = ra_log:register_reader(Pid, Log0),
{State#{log => Log}, Effs}.

-spec update_disconnected_peers(node(), nodeup | nodedown, ra_server_state()) ->
ra_server_state().
update_disconnected_peers(Node, nodeup, #{cluster := Peers} = State) ->
State#{cluster => maps:map(
fun ({_, PeerNode}, #{status := disconnected} = Peer)
when PeerNode == Node ->
Peer#{status => normal};
(_, Peer) ->
Peer
end, Peers)};
update_disconnected_peers(_Node, _Status, State) ->
State.

peer_snapshot_process_exited(SnapshotPid, #{cluster := Peers} = State) ->
PeerKv =
maps:to_list(
Expand Down
76 changes: 44 additions & 32 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1235,40 +1235,49 @@ handle_effect(_, {reply, Reply}, {call, From}, State, Actions) ->
{State, Actions};
handle_effect(_, {reply, Reply}, EvtType, _, _) ->
exit({undefined_reply, Reply, EvtType});
handle_effect(leader, {send_snapshot, To, {SnapState, Id, Term}}, _,
handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _,
#state{server_state = SS0,
monitors = Monitors,
conf = #conf{snapshot_chunk_size = ChunkSize,
install_snap_rpc_timeout = InstallSnapTimeout} = Conf} = State0,
Actions) ->
ok = incr_counter(Conf, ?C_RA_SRV_SNAPSHOTS_SENT, 1),
%% leader effect only
Self = self(),
Machine = ra_server:machine(SS0),
Pid = spawn(fun () ->
try send_snapshots(Self, Id, Term, To,
ChunkSize, InstallSnapTimeout,
SnapState, Machine) of
_ -> ok
catch
C:timeout:S ->
%% timeout is ok as we've already blocked
%% for a while
erlang:raise(C, timeout, S);
C:E:S ->
%% insert an arbitrary pause here as a primitive
%% throttling operation as certain errors
%% happen quickly
ok = timer:sleep(5000),
erlang:raise(C, E, S)
end
end),
%% update the peer state so that no pipelined entries are sent during
%% the snapshot sending phase
SS = ra_server:update_peer(To, #{status => {sending_snapshot, Pid}}, SS0),
{State0#state{server_state = SS,
monitors = ra_monitors:add(Pid, snapshot_sender, Monitors)},
Actions};
case lists:member(ToNode, [node() | nodes()]) of
true ->
%% node is connected
%% leader effect only
Self = self(),
Machine = ra_server:machine(SS0),
Pid = spawn(fun () ->
try send_snapshots(Self, Id, Term, To,
ChunkSize, InstallSnapTimeout,
SnapState, Machine) of
_ -> ok
catch
C:timeout:S ->
%% timeout is ok as we've already blocked
%% for a while
erlang:raise(C, timeout, S);
C:E:S ->
%% insert an arbitrary pause here as a primitive
%% throttling operation as certain errors
%% happen quickly
ok = timer:sleep(5000),
erlang:raise(C, E, S)
end
end),
ok = incr_counter(Conf, ?C_RA_SRV_SNAPSHOTS_SENT, 1),
%% update the peer state so that no pipelined entries are sent during
%% the snapshot sending phase
SS = ra_server:update_peer(To, #{status => {sending_snapshot, Pid}}, SS0),
{State0#state{server_state = SS,
monitors = ra_monitors:add(Pid, snapshot_sender, Monitors)},
Actions};
false ->
?DEBUG("~s: send_snapshot node ~s disconnected",
[log_id(State0), ToNode]),
SS = ra_server:update_peer(To, #{status => disconnected}, SS0),
{State0#state{server_state = SS}, Actions}
end;
handle_effect(_, {delete_snapshot, Dir, SnapshotRef}, _, State0, Actions) ->
%% delete snapshots in separate process
_ = spawn(fun() ->
Expand Down Expand Up @@ -1571,11 +1580,12 @@ fold_log(From, Fun, Term, State) ->

send_snapshots(Me, Id, Term, {_, ToNode} = To, ChunkSize,
InstallTimeout, SnapState, Machine) ->
Context = ra_snapshot:context(SnapState, ToNode),
{ok, #{machine_version := SnapMacVer} = Meta, ReadState} =
ra_snapshot:begin_read(SnapState),
ra_snapshot:begin_read(SnapState, Context),

%% only send the snapshot if the target server can accept it
TheirMacVer = rpc:call(ToNode, ra_machine, version, [Machine]),
TheirMacVer = erpc:call(ToNode, ra_machine, version, [Machine]),

case SnapMacVer > TheirMacVer of
true ->
Expand Down Expand Up @@ -1726,14 +1736,16 @@ handle_node_status_change(Node, Status, InfoList, RaftState,
#state{monitors = Monitors0,
server_state = ServerState0} = State0) ->
{Comps, Monitors} = ra_monitors:handle_down(Node, Monitors0),
{_, ServerState, Effects} =
{_, ServerState1, Effects} =
lists:foldl(
fun (Comp, {R, S0, E0}) ->
{R, S, E} = ra_server:handle_node_status(R, Comp, Node,
Status, InfoList,
S0),
{R, S, E0 ++ E}
end, {RaftState, ServerState0, []}, Comps),
ServerState = ra_server:update_disconnected_peers(Node, Status,
ServerState1),
{State, Actions} = handle_effects(RaftState, Effects, cast,
State0#state{server_state = ServerState,
monitors = Monitors}),
Expand Down
31 changes: 26 additions & 5 deletions src/ra_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-export([
recover/1,
read_meta/2,
begin_read/1,
begin_read/2,
read_chunk/3,
delete/2,

Expand All @@ -36,6 +36,8 @@
accept_chunk/4,
abort_accept/1,

context/2,

handle_down/3,
current_snapshot_dir/1
]).
Expand Down Expand Up @@ -68,6 +70,8 @@

-export_type([state/0]).

-optional_callbacks([context/0]).

%% Side effect function
%% Turn the current state into immutable reference.
-callback prepare(Index :: ra_index(),
Expand All @@ -87,7 +91,9 @@

%% Read the snapshot metadata and initialise a read state used in read_chunk/1
%% The read state should contain all the information required to read a chunk
-callback begin_read(Location :: file:filename()) ->
%% The Context is the map returned by the context/0 callback
%% This can be used to inform the sender of receive capabilities.
-callback begin_read(Location :: file:filename(), Context :: map()) ->
{ok, Meta :: meta(), ReadState :: term()}
| {error, term()}.

Expand Down Expand Up @@ -131,6 +137,8 @@
file_err() |
term()}.

-callback context() -> map().

-spec init(ra_uid(), module(), file:filename()) ->
state().
init(UId, Mod, File) ->
Expand Down Expand Up @@ -310,6 +318,18 @@ abort_accept(#?MODULE{accepting = #accept{idxterm = {Idx, Term}},
ok = delete(Dir, {Idx, Term}),
State#?MODULE{accepting = undefined}.

%% get the snapshot capabilities context of a remote node
-spec context(state(), node()) -> map().
context(#?MODULE{module = Mod}, Node) ->
try erpc:call(Node, Mod, ?FUNCTION_NAME, []) of
Result ->
Result
catch
error:{exception, undef, _} ->
#{}
end.



-spec handle_down(pid(), Info :: term(), state()) ->
state().
Expand All @@ -335,14 +355,15 @@ delete(Dir, {Idx, Term}) ->
ok = ra_lib:recursive_delete(SnapDir),
ok.

-spec begin_read(State :: state()) ->
-spec begin_read(State :: state(), Context :: map()) ->
{ok, Meta :: meta(), ReadState} |
{error, term()} when ReadState :: term().
begin_read(#?MODULE{module = Mod,
directory = Dir,
current = {Idx, Term}}) ->
current = {Idx, Term}},
Context) when is_map(Context) ->
Location = make_snapshot_dir(Dir, Idx, Term),
Mod:begin_read(Location).
Mod:begin_read(Location, Context).


-spec read_chunk(ReadState, ChunkSizeBytes :: non_neg_integer(),
Expand Down
Loading

0 comments on commit 87b04c3

Please sign in to comment.