Skip to content

Commit

Permalink
Merge pull request #50 from emqx/fix-infinity-expire-timeout
Browse files Browse the repository at this point in the history
fix: handle `infinity` timeout / expiry
  • Loading branch information
thalesmg authored May 24, 2023
2 parents 8796dad + 63b8b08 commit 90fd629
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 14 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ehttpc changes

## 0.4.10

- Fixed `ehttpc:request` and `ehttpc:request_async` to handle `Timeout = infinity` option.

## 0.4.9

- Expanded the fix from 0.4.8 to also account for `{error, {shutdown, normal}}` return values in requests, and added a similar retry when the health check also fails with those two reasons.
Expand Down
2 changes: 1 addition & 1 deletion src/ehttpc.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, ehttpc,
[{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.9"},
{vsn, "0.4.10"},
{registered, []},
{applications, [kernel,
stdlib,
Expand Down
8 changes: 7 additions & 1 deletion src/ehttpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
%% -*- mode: erlang -*-
{"0.4.9",
{"0.4.10",
[
{"0.4.9", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
Expand Down Expand Up @@ -57,6 +60,9 @@
]}
],
[
{"0.4.9", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
Expand Down
26 changes: 20 additions & 6 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,13 @@ request(Pool, Method, Request, Timeout, Retry) when ?IS_POOL(Pool) ->
request({Pool, N}, Method, Request, Timeout, Retry) when ?IS_POOL(Pool) ->
request(ehttpc_pool:pick_worker(Pool, N), Method, Request, Timeout, Retry);
request(Worker, Method, Request, Timeout, Retry) when is_pid(Worker) ->
ExpireAt = now_() + Timeout,
try gen_server:call(Worker, ?REQ(Method, Request, ExpireAt), Timeout + 500) of
ExpireAt = fresh_expire_at(Timeout),
CallTimeout =
case Timeout of
infinity -> infinity;
T -> T + 500
end,
try gen_server:call(Worker, ?REQ(Method, Request, ExpireAt), CallTimeout) of
%% gun will reply {gun_down, _Client, _, normal, _KilledStreams, _} message
%% when connection closed by keepalive

Expand Down Expand Up @@ -176,7 +181,7 @@ request(Worker, Method, Request, Timeout, Retry) when is_pid(Worker) ->
%% @doc Send an async request. The callback is evaluated when an error happens or http response is received.
-spec request_async(pid(), method(), request(), timeout(), callback()) -> ok.
request_async(Worker, Method, Request, Timeout, ResultCallback) when is_pid(Worker) ->
ExpireAt = now_() + Timeout,
ExpireAt = fresh_expire_at(Timeout),
_ = erlang:send(Worker, ?ASYNC_REQ(Method, Request, ExpireAt, ResultCallback)),
ok.

Expand Down Expand Up @@ -219,7 +224,7 @@ handle_call({health_check, Timeout}, _From, State = #state{client = ?undef, gun_
{ok, NewState} ->
do_after_gun_up(
NewState,
now_() + Timeout,
fresh_expire_at(Timeout),
fun(State1) ->
{reply, ok, State1}
end
Expand All @@ -233,7 +238,7 @@ handle_call({health_check, Timeout}, _From, State = #state{client = Client, gun_
?tp(health_check_when_gun_client_not_ready, #{client => Client}),
do_after_gun_up(
State,
now_() + Timeout,
fresh_expire_at(Timeout),
fun(State1) ->
{reply, ok, State1}
end
Expand Down Expand Up @@ -559,6 +564,8 @@ cancel_stream(nofin, Client, StreamRef) ->
_ = gun:cancel(Client, StreamRef),
ok.

timeout(infinity = _ExpireAt) ->
infinity;
timeout(ExpireAt) ->
max(ExpireAt - now_(), 0).

Expand Down Expand Up @@ -602,6 +609,8 @@ take_sent_req(StreamRef, #{sent := Sent} = Requests) ->
end
end.

is_sent_req_expired(?SENT_REQ(_From, infinity = _ExpireAt, _), _Now) ->
false;
is_sent_req_expired(?SENT_REQ({Pid, _Ref}, ExpireAt, _), Now) when is_pid(Pid) ->
%% for gen_server:call, it is aborted after timeout, there is no need to send
%% reply to the caller
Expand Down Expand Up @@ -649,7 +658,7 @@ drop_expired(#{pending_count := 0} = Requests, _Now) ->
drop_expired(#{pending := Pending, pending_count := PC} = Requests, Now) ->
{PeekFun, OutFun} = peek_oldest_fn(Requests),
{value, ?PEND_REQ(ReplyTo, ?REQ(_, _, ExpireAt))} = PeekFun(Pending),
case Now > ExpireAt of
case is_integer(ExpireAt) andalso Now > ExpireAt of
true ->
{_, NewPendings} = OutFun(Pending),
NewRequests = Requests#{pending => NewPendings, pending_count => PC - 1},
Expand Down Expand Up @@ -858,6 +867,11 @@ enqueue_latest_fn(#{prioritise_latest := true}) ->
enqueue_latest_fn(_) ->
fun queue:in/2.

fresh_expire_at(infinity = _Timeout) ->
infinity;
fresh_expire_at(Timeout) when is_integer(Timeout) ->
now_() + Timeout.

-ifdef(TEST).

prioritise_latest_test() ->
Expand Down
58 changes: 52 additions & 6 deletions test/ehttpc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ send_1000_async_no_pipeline_test_() ->
{timeout, TestTimeout + 1, F(Opts4)}
].

requst_timeout_test_() ->
request_timeout_test_() ->
[
{"pipeline", fun() -> requst_timeout_test(true) end},
{"no pipeline", fun() -> requst_timeout_test(false) end}
{"pipeline", fun() -> request_timeout_test(true) end},
{"no pipeline", fun() -> request_timeout_test(false) end}
].

requst_timeout_test(Pipeline) ->
request_timeout_test(Pipeline) ->
Port = ?PORT,
with_server(
Port,
Expand All @@ -179,7 +179,7 @@ requst_timeout_test(Pipeline) ->
end
).

requst_expire_test() ->
request_expire_test() ->
Port = ?PORT,
with_server(
Port,
Expand All @@ -198,6 +198,38 @@ requst_expire_test() ->
end
).

request_infinity_expire_sync_test() ->
ct:timetrap({seconds, 10}),
Port = ?PORT,
with_server(
Port,
?FUNCTION_NAME,
1_000,
fun() ->
with_pool(
pool_opts(Port, true),
fun() ->
{ok, 200, _, _} = req_sync_1(infinity)
end
)
end
).

request_infinity_expire_async_test() ->
ct:timetrap({seconds, 10}),
Port = ?PORT,
with_server(
Port,
?FUNCTION_NAME,
1_000,
fun() ->
with_pool(
pool_opts(Port, true),
fun() -> req_async1(_Timeout = infinity) end
)
end
).

health_check_test_() ->
Port = ?PORT,
%% the normal case
Expand Down Expand Up @@ -544,7 +576,7 @@ connect_timeout_test() ->
?assertEqual({error, connect_timeout}, req_sync_1(_Timeout = 200))
).

request_expire_test() ->
request_expire_2_test() ->
Port = ?PORT,
ServerDelay = timer:seconds(1),
% the thrid request's timeout
Expand Down Expand Up @@ -654,6 +686,20 @@ req_sync(N, Timeout, Retry) ->
error({N, Reason})
end.

req_async1(Timeout) ->
Ref = make_ref(),
TestPid = self(),
ResultCallback = {fun(Reply) -> TestPid ! {{Ref, reply}, Reply} end, []},
Method = get,
Request = req(),
ok = ehttpc:request_async(
ehttpc_pool:pick_worker(?POOL), Method, Request, Timeout, ResultCallback
),
receive
{{Ref, reply}, Reply} ->
{ok, 200, _, _} = Reply
end.

req_async(N) ->
req_async(N, 5_000).

Expand Down

0 comments on commit 90fd629

Please sign in to comment.