Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segment read bug when segment has been appended to #488

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,16 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
end,
lists:foldl(
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
{Seg, Open} = get_segment_ext(Dir, Open1, BaseName),
{_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1),
{Acc, Open}
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
{ok, _, Acc} ->
{Acc, Open2};
{error, modified} ->
{_, Open3} = ra_flru:evict(BaseName, Open2),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
{Acc, Open}
end
end, {Acc0, Open0}, Plan).

-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
Expand Down Expand Up @@ -335,7 +342,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs,
lists:foldl(
fun ({Idxs, Fn}, {Open0, C, En0}) ->
{Seg, Open} = get_segment(Cfg, Open0, Fn),
{ReadSparseCount, Entries} =
{ok, ReadSparseCount, Entries} =
ra_log_segment:read_sparse(Seg, Idxs,
fun (I, T, B, Acc) ->
[{I, T, binary_to_term(B)} | Acc]
Expand Down
74 changes: 50 additions & 24 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
append/4,
sync/1,
fold/6,
is_modified/1,
read_sparse/4,
term_query/2,
close/1,
Expand All @@ -27,6 +28,8 @@

-include("ra.hrl").

-include_lib("kernel/include/file.hrl").

-define(VERSION, 2).
-define(MAGIC, "RASG").
-define(HEADER_SIZE, 4 + (16 div 8) + (16 div 8)).
Expand Down Expand Up @@ -112,6 +115,7 @@ open(Filename, Options) ->
end.

process_file(true, Mode, Filename, Fd, Options) ->
AccessPattern = maps:get(access_pattern, Options, random),
case read_header(Fd) of
{ok, Version, MaxCount} ->
MaxPending = maps:get(max_pending, Options, ?SEGMENT_MAX_PENDING),
Expand All @@ -120,7 +124,6 @@ process_file(true, Mode, Filename, Fd, Options) ->
{NumIndexRecords, DataOffset, Range, Index} =
recover_index(Fd, Version, MaxCount),
IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize,
AccessPattern = maps:get(access_pattern, Options, random),
Mode = maps:get(mode, Options, append),
ComputeChecksums = maps:get(compute_checksums, Options, true),
{ok, #state{cfg = #cfg{version = Version,
Expand Down Expand Up @@ -184,16 +187,15 @@ append(#state{cfg = #cfg{max_pending = PendingCount},
append(#state{cfg = #cfg{version = Version,
mode = append} = Cfg,
index_offset = IndexOffset,
data_start = DataStart,
data_offset = DataOffset,
range = Range0,
pending_count = PendCnt,
pending_index = IdxPend0,
pending_data = DataPend0} = State,
Index, Term, {Length, Data}) ->
% check if file is full
case IndexOffset < DataStart of
true ->

case is_full(State) of
false ->
% TODO: check length is less than #FFFFFFFF ??
Checksum = compute_checksum(Cfg, Data),
OSize = offset_size(Version),
Expand All @@ -209,7 +211,7 @@ append(#state{cfg = #cfg{version = Version,
pending_data = [DataPend0, Data],
pending_count = PendCnt + 1}
};
false ->
true ->
{error, full}
end;
append(State, Index, Term, Data)
Expand Down Expand Up @@ -271,38 +273,58 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
FromIdx, ToIdx, Fun, AccFun, Acc) ->
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).

-spec is_modified(state()) -> boolean().
is_modified(#state{cfg = #cfg{fd = Fd},
data_offset = DataOffset} = State) ->
case is_full(State) of
true ->
%% a full segment cannot be appended to.
false;
false ->
%% get info and compare to data_offset
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
Size > DataOffset
end.

-spec read_sparse(state(), [ra_index()],
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
Acc) ->
{NumRead :: non_neg_integer(), Acc}
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
when Acc :: term().
read_sparse(#state{index = Index,
cfg = Cfg}, Indexes, AccFun, Acc) ->
Cache0 = prepare_cache(Cfg, Indexes, Index),
read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0).
cfg = #cfg{fd = Fd}} = State,
Indexes, AccFun, Acc) ->
case is_modified(State) of
true ->
{error, modified};
false ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
end.

read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) ->
{Num, Acc};
read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
{ok, Num, Acc};
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
when is_map_key(NextIdx, Index) ->
{Term, Offset, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Offset, Length) of
{Term, Pos, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Pos, Length) of
false ->
case prepare_cache(Cfg, Indexes, Index) of
case prepare_cache(Fd, Indexes, Index) of
undefined ->
{ok, Data, _} = pread(Cfg, undefined, Offset, Length),
read_sparse0(Cfg, Rem, Index, undefined,
%% TODO: check for partial data?
{ok, Data} = file:pread(Fd, Pos, Length),
read_sparse0(Fd, Rem, Index, undefined,
AccFun(NextIdx, Term, Data, Acc),
AccFun, Num+1);
Cache ->
read_sparse0(Cfg, Indexes, Index, Cache,
Acc, AccFun, Num+1)
read_sparse0(Fd, Indexes, Index, Cache,
Acc, AccFun, Num)
end;
Data ->
read_sparse0(Cfg, Rem, Index, Cache0,
read_sparse0(Fd, Rem, Index, Cache0,
AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1)
end;
read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
read_sparse0(_Fd, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
exit({missing_key, NextIdx}).

cache_read({CPos, CLen, Bin}, Pos, Length)
Expand All @@ -313,9 +335,9 @@ cache_read({CPos, CLen, Bin}, Pos, Length)
cache_read(_, _, _) ->
false.

prepare_cache(#cfg{} = _Cfg, [_], _SegIndex) ->
prepare_cache(_Fd, [_], _SegIndex) ->
undefined;
prepare_cache(#cfg{fd = Fd} = _Cfg, [FirstIdx | Rem], SegIndex) ->
prepare_cache(Fd, [FirstIdx | Rem], SegIndex) ->
case consec_run(FirstIdx, FirstIdx, Rem) of
{Idx, Idx} ->
%% no run, no cache;
Expand Down Expand Up @@ -622,6 +644,10 @@ validate_checksum(0, _) ->
validate_checksum(Crc, Data) ->
Crc == erlang:crc32(Data).

is_full(#state{index_offset = IndexOffset,
data_start = DataStart}) ->
IndexOffset >= DataStart.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down
2 changes: 1 addition & 1 deletion src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,7 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod},
become(leader, OldRaftState, #{cluster := Cluster,
cluster_change_permitted := CCP0,
log := Log0} = State) ->
Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0),
Log = ra_log:release_resources(maps:size(Cluster), sequential, Log0),
CCP = case OldRaftState of
await_condition ->
CCP0;
Expand Down
16 changes: 16 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ all_tests() ->
transient_writer_is_handled,
read_opt,
sparse_read,
read_plan_modified,
read_plan,
sparse_read_out_of_range,
sparse_read_out_of_range_2,
Expand Down Expand Up @@ -481,6 +482,21 @@ sparse_read(Config) ->
{99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2),
ok.

read_plan_modified(Config) ->
Log0 = ra_log_init(Config),
Log1 = write_and_roll(1, 2, 1, Log0, 50),
Log2 = deliver_all_log_events(Log1, 100),
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),

Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
%% assert we can read the newly appended item with the cached
%% segment
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
ra_log:close(Log),
ok.

read_plan(Config) ->
Num = 256 * 2,
Div = 2,
Expand Down
49 changes: 43 additions & 6 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ all_tests() ->
overwrite,
term_query,
write_many,
read_sparse_append_read,
open_invalid,
corrupted_segment,
large_segment,
Expand Down Expand Up @@ -81,6 +82,7 @@ corrupted_segment(Config) ->
%% ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]),
%% check that the current state throws a missing key
{ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}),
?assertNot(ra_log_segment:is_modified(SegR0)),
?assertExit({missing_key, 2},
read_sparse(SegR0, [1, 2])),

Expand Down Expand Up @@ -210,11 +212,13 @@ segref(Config) ->
full_file(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, "seg1.seg"),
Data = make_data(1024),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2}),
Data = make_data(10),
{ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2,
max_pending => 1}),
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data),
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
?assertNot(ra_log_segment:is_modified(Seg)),
{1,2} = ra_log_segment:range(Seg),
ok = ra_log_segment:close(Seg),
ok.
Expand Down Expand Up @@ -396,6 +400,39 @@ write_many(Config) ->
ct:pal("~p", [Result]),
ok.


read_sparse_append_read(Config) ->
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, <<"0000000.segment">>),
{ok, W0} = ra_log_segment:open(Fn, #{}),
Data = <<"banana">>,
Term = 1,
%% write two entries in term 1
{ok, W1} = ra_log_segment:append(W0, 1, Term, Data),
{ok, W2} = ra_log_segment:append(W1, 2, Term, Data),
{ok, W3} = ra_log_segment:flush(W2),


{ok, R0} = ra_log_segment:open(Fn, #{mode => read}),
{ok, 2, [_, _]} = ra_log_segment:read_sparse(R0, [1, 2],
fun (I, _, _, Acc) ->
[I | Acc]
end, []),

?assertNot(ra_log_segment:is_modified(R0)),
%% overwrite in term 2
{ok, W4} = ra_log_segment:append(W3, 2, 2, <<"apple">>),
{ok, W5} = ra_log_segment:append(W4, 3, 2, <<"apple">>),
{ok, W} = ra_log_segment:flush(W5),
?assert(ra_log_segment:is_modified(R0)),
{error, modified} = ra_log_segment:read_sparse(R0, [2],
fun (_I, _, B, Acc) ->
[B | Acc]
end, []),
ra_log_segment:close(W),
ra_log_segment:close(R0),
ok.

write_until_full(Idx, Term, Data, Seg0) ->
case ra_log_segment:append(Seg0, Idx, Term, Data) of
{ok, Seg} ->
Expand All @@ -410,8 +447,8 @@ make_data(Size) ->
term_to_binary(crypto:strong_rand_bytes(Size)).

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun (I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).
8 changes: 4 additions & 4 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ segments_for(UId, DataDir) ->
SegFiles.

read_sparse(R, Idxs) ->
{_, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
{ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs,
fun(I, T, B, Acc) ->
[{I, T, B} | Acc]
end, []),
lists:reverse(Entries).

get_names(System) when is_atom(System) ->
Expand Down
Loading