diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index daa60147..9ec04c3b 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -235,9 +235,18 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0) {ok, _, Acc} -> {Acc, Open2}; {error, modified} -> + %% if the segment has been modified since it was opened + %% it is not safe to attempt the read as the read plan + %% may refer to indexes that weren't in the segment at + %% that time. In this case we evict all segments and + %% re-open what we need. {_, Open3} = ra_flru:evict(BaseName, Open2), {SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options), - {ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1), + %% at this point we can read without checking for modification + %% as the read plan would have been created before we + %% read the index from the segment + {ok, _, Acc} = ra_log_segment:read_sparse_no_checks( + SegNew, Idxs, Fun, Acc1), {Acc, Open} end end, {Acc0, Open0}, Plan). diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index 269da3fc..39174c5f 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -14,6 +14,7 @@ fold/6, is_modified/1, read_sparse/4, + read_sparse_no_checks/4, term_query/2, close/1, range/1, @@ -299,7 +300,8 @@ is_modified(#state{cfg = #cfg{fd = Fd}, false; false -> %% get info and compare to data_offset - {ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd), + {ok, #file_info{size = Size}} = + prim_file:read_handle_info(Fd, [posix]), Size > DataOffset end. @@ -308,17 +310,25 @@ is_modified(#state{cfg = #cfg{fd = Fd}, Acc) -> {ok, NumRead :: non_neg_integer(), Acc} | {error, modified} when Acc :: term(). -read_sparse(#state{index = Index, - cfg = #cfg{fd = Fd}} = State, - Indexes, AccFun, Acc) -> +read_sparse(#state{} = State, Indexes, AccFun, Acc) -> case is_modified(State) of true -> {error, modified}; false -> - Cache0 = prepare_cache(Fd, Indexes, Index), - read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0) + read_sparse_no_checks(State, Indexes, AccFun, Acc) end. +-spec read_sparse_no_checks(state(), [ra_index()], + fun((ra:index(), ra_term(), binary(), Acc) -> Acc), + Acc) -> + {ok, NumRead :: non_neg_integer(), Acc} + when Acc :: term(). +read_sparse_no_checks(#state{index = Index, + cfg = #cfg{fd = Fd}}, + Indexes, AccFun, Acc) -> + Cache0 = prepare_cache(Fd, Indexes, Index), + read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0). + read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) -> {ok, Num, Acc}; read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num) diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 4a4daf7f..6c5816c2 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -489,11 +489,13 @@ read_plan_modified(Config) -> Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end), {#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined), - Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100), - Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end), + Log3 = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100), + Plan2 = ra_log:partial_read([1,2], Log3, fun (_, _, Cmd) -> Cmd end), %% assert we can read the newly appended item with the cached %% segment - {#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru), + {#{1 := _, 2 := _}, Flru2} = ra_log_read_plan:execute(Plan2, Flru), + Log = deliver_all_log_events(write_and_roll(3, 4, 1, Log3, 50), 100), + {#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru2), ra_log:close(Log), ok. diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index b2980218..c62cbd55 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -220,6 +220,8 @@ full_file(Config) -> {ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data), {error, full} = ra_log_segment:append(Seg, 3, 2, Data), ?assertNot(ra_log_segment:is_modified(Seg)), + {ok, R} = ra_log_segment:open(Fn, #{mode => read}), + ?assertNot(ra_log_segment:is_modified(R)), {1,2} = ra_log_segment:range(Seg), ok = ra_log_segment:close(Seg), ok.