Skip to content

Commit

Permalink
Merge pull request #503 from rabbitmq/read-plan-modified-fix
Browse files Browse the repository at this point in the history
Fix issue with sparse reading a segment that is being modified.
  • Loading branch information
kjnilsson authored Jan 22, 2025
2 parents 79e6531 + fd711b7 commit 0123418
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
11 changes: 10 additions & 1 deletion src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,18 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0)
{ok, _, Acc} ->
{Acc, Open2};
{error, modified} ->
%% if the segment has been modified since it was opened
%% it is not safe to attempt the read as the read plan
%% may refer to indexes that weren't in the segment at
%% that time. In this case we evict all segments and
%% re-open what we need.
{_, Open3} = ra_flru:evict(BaseName, Open2),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options),
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
%% at this point we can read without checking for modification
%% as the read plan would have been created before we
%% read the index from the segment
{ok, _, Acc} = ra_log_segment:read_sparse_no_checks(
SegNew, Idxs, Fun, Acc1),
{Acc, Open}
end
end, {Acc0, Open0}, Plan).
Expand Down
22 changes: 16 additions & 6 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
fold/6,
is_modified/1,
read_sparse/4,
read_sparse_no_checks/4,
term_query/2,
close/1,
range/1,
Expand Down Expand Up @@ -299,7 +300,8 @@ is_modified(#state{cfg = #cfg{fd = Fd},
false;
false ->
%% get info and compare to data_offset
{ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd),
{ok, #file_info{size = Size}} =
prim_file:read_handle_info(Fd, [posix]),
Size > DataOffset
end.

Expand All @@ -308,17 +310,25 @@ is_modified(#state{cfg = #cfg{fd = Fd},
Acc) ->
{ok, NumRead :: non_neg_integer(), Acc} | {error, modified}
when Acc :: term().
read_sparse(#state{index = Index,
cfg = #cfg{fd = Fd}} = State,
Indexes, AccFun, Acc) ->
read_sparse(#state{} = State, Indexes, AccFun, Acc) ->
case is_modified(State) of
true ->
{error, modified};
false ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0)
read_sparse_no_checks(State, Indexes, AccFun, Acc)
end.

-spec read_sparse_no_checks(state(), [ra_index()],
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
Acc) ->
{ok, NumRead :: non_neg_integer(), Acc}
when Acc :: term().
read_sparse_no_checks(#state{index = Index,
cfg = #cfg{fd = Fd}},
Indexes, AccFun, Acc) ->
Cache0 = prepare_cache(Fd, Indexes, Index),
read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0).

read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) ->
{ok, Num, Acc};
read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
Expand Down
8 changes: 5 additions & 3 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,13 @@ read_plan_modified(Config) ->
Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end),
{#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined),

Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end),
Log3 = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100),
Plan2 = ra_log:partial_read([1,2], Log3, fun (_, _, Cmd) -> Cmd end),
%% assert we can read the newly appended item with the cached
%% segment
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru),
{#{1 := _, 2 := _}, Flru2} = ra_log_read_plan:execute(Plan2, Flru),
Log = deliver_all_log_events(write_and_roll(3, 4, 1, Log3, 50), 100),
{#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru2),
ra_log:close(Log),
ok.

Expand Down
2 changes: 2 additions & 0 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ full_file(Config) ->
{ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data),
{error, full} = ra_log_segment:append(Seg, 3, 2, Data),
?assertNot(ra_log_segment:is_modified(Seg)),
{ok, R} = ra_log_segment:open(Fn, #{mode => read}),
?assertNot(ra_log_segment:is_modified(R)),
{1,2} = ra_log_segment:range(Seg),
ok = ra_log_segment:close(Seg),
ok.
Expand Down

0 comments on commit 0123418

Please sign in to comment.