Skip to content

Commit

Permalink
Merge pull request #57 from emqx/1004-add-max-inactive-check
Browse files Browse the repository at this point in the history
Zombie detector
  • Loading branch information
zmstone authored Oct 4, 2024
2 parents da825ef + c2db8ec commit 2fb8a56
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 31 deletions.
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# ehttpc changes

## 0.6.0

- Changed log format to be more structured, `host` and `port` are included in the log data fields.
- Add `max_inactive` duration option (default is `10_000` milliseconds).
This is to detect zombie connections especially when pipelining is set > 1.
With `{max_inactive, 10_000}` added to the `start_pool` option,
it will try to reconnect HTTP server up on detection of the last sent request had been expired for 10 seconds.

## 0.5.0

- Dropped hot-upgrade support.
Expand Down
181 changes: 152 additions & 29 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
%%--------------------------------------------------------------------

-module(ehttpc).
-feature(maybe_expr, enable).

-behaviour(gen_server).

Expand Down Expand Up @@ -67,7 +68,6 @@
-include_lib("eunit/include/eunit.hrl").
-endif.

-define(LOG(Level, Format, Args), logger:Level("ehttpc: " ++ Format, Args)).
-define(REQ(Method, Req, ExpireAt), {Method, Req, ExpireAt}).
-define(PEND_REQ(ReplyTo, Req), {ReplyTo, Req}).
-define(SENT_REQ(ReplyTo, ExpireAt, Acc), {ReplyTo, ExpireAt, Acc}).
Expand All @@ -77,6 +77,7 @@
-define(GEN_CALL_REQ(From, Call), {'$gen_call', From, ?REQ(_, _, _) = Call}).
-define(undef, undefined).
-define(IS_POOL(Pool), (not is_tuple(Pool) andalso not is_pid(Pool))).
-define(DEFAULT_MAX_INACTIVE, 10_000).

-record(state, {
pool :: term(),
Expand All @@ -90,7 +91,9 @@
gun_opts :: gun:opts(),
gun_state :: down | up,
requests :: map(),
proxy :: undefined | map()
proxy :: undefined | map(),
max_inactive :: pos_integer(),
inactive_check_tref :: reference() | ?undef
}).

-type pool_name() :: any().
Expand Down Expand Up @@ -199,6 +202,7 @@ init([Pool, Id, Opts0]) ->
process_flag(trap_exit, true),
PrioLatest = proplists:get_bool(prioritise_latest, Opts0),
#{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0),
MaxInactive = proplists:get_value(max_inactive, Opts, ?DEFAULT_MAX_INACTIVE),
State = #state{
pool = Pool,
id = Id,
Expand All @@ -213,12 +217,14 @@ init([Pool, Id, Opts0]) ->
pending => queue:new(),
pending_count => 0,
sent => #{},
max_sent_expire => 0,
prioritise_latest => PrioLatest
},
proxy = Proxy
proxy = Proxy,
max_inactive = MaxInactive
},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
{ok, State}.
{ok, start_check_inactive_timer(State)}.

handle_call({health_check, _}, _From, State = #state{gun_state = up}) ->
{reply, ok, State};
Expand Down Expand Up @@ -267,11 +273,18 @@ handle_info({suspend, Time}, State) ->
%% only for testing
timer:sleep(Time),
{noreply, State};
handle_info(check_inactive, State0) ->
State = maybe_shoot(State0),
{noreply, start_check_inactive_timer(State)};
handle_info(Info, State0) ->
State1 = do_handle_info(Info, upgrade_requests(State0)),
State = maybe_shoot(State1),
{noreply, State}.

start_check_inactive_timer(#state{inactive_check_tref = Tref, max_inactive = T} = State) ->
is_reference(Tref) andalso erlang:cancel_timer(Tref),
State#state{inactive_check_tref = erlang:send_after(T, self(), check_inactive)}.

do_handle_info(
{gun_response, Client, StreamRef, IsFin, StatusCode, Headers},
#state{client = Client} = State
Expand Down Expand Up @@ -308,7 +321,7 @@ do_handle_info(
State = #state{client = Client}
) ->
Reason =/= normal andalso Reason =/= closed andalso
?LOG(warning, "Received 'gun_down' message with reason: ~p", [Reason]),
log(warning, #{msg => "http_connection_down", reason => Reason}, State),
NewState = handle_gun_down(State, KilledStreams, Reason),
NewState;
do_handle_info(
Expand All @@ -320,7 +333,14 @@ do_handle_info(
do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) ->
handle_client_down(State, Reason);
do_handle_info(Info, State) ->
?LOG(warning, "~p unexpected_info: ~p, client: ~p", [?MODULE, Info, State#state.client]),
log(
warning,
#{
msg => "ehttpc_unexpected_info",
info => Info
},
State
),
State.

terminate(_Reason, #state{pool = Pool, id = Id, client = Client}) ->
Expand Down Expand Up @@ -532,22 +552,49 @@ upgrade_requests(Map) when is_map(Map) ->
pending => queue:new(),
pending_count => 0,
sent => Map,
prioritise_latest => false
prioritise_latest => false,
max_sent_expire => 0
}.

put_sent_req(StreamRef, Req, #{sent := Sent} = Requests) ->
Requests#{sent := maps:put(StreamRef, Req, Sent)}.
put_sent_req(
StreamRef,
Req,
#{
sent := Sent,
max_sent_expire := T
} = Requests
) ->
?SENT_REQ(_, Expire, _) = Req,
Requests#{
sent := maps:put(StreamRef, Req, Sent),
max_sent_expire := max_expire(T, Expire)
}.

%% if a request has infinity timeout, ignore it
max_expire(T, infinity) -> T;
max_expire(T1, T2) when is_integer(T2) -> max(T1, T2).

take_sent_req(StreamRef, #{sent := Sent} = Requests) ->
take_sent_req(StreamRef, #{sent := Sent, max_sent_expire := T} = Requests) ->
case maps:take(StreamRef, Sent) of
error ->
error;
{Req, NewSent} ->
%% we assume all calls use the same timeout value
%% so there is no need to scan the map to find a new max
%% or even if calls may use different timeout
%% the impact of a wrong max is minimal: delayed detection of zombie connection
NewT =
case map_size(NewSent) of
0 ->
0;
_ ->
T
end,
case is_sent_req_expired(Req, now_()) of
true ->
{expired, Requests#{sent := NewSent}};
{expired, Requests#{sent := NewSent, max_sent_expire := NewT}};
false ->
{Req, Requests#{sent := NewSent}}
{Req, Requests#{sent := NewSent, max_sent_expire := NewT}}
end
end.

Expand Down Expand Up @@ -579,14 +626,7 @@ reply_error_for_sent_reqs(#{sent := Sent} = R, Reason) ->
end,
maps:to_list(Sent)
),
R#{sent := #{}}.

%% allow 100 async requests maximum when enable_pipelining is 'true'
%% allow only 1 async request when enable_pipelining is 'false'
%% otherwise stop shooting at the number limited by enable_pipelining
should_cool_down(true, Sent) -> Sent >= 100;
should_cool_down(false, Sent) -> Sent > 0;
should_cool_down(N, Sent) when is_integer(N) -> Sent >= N.
R#{sent => #{}, max_sent_expire => 0}.

%% Continue droping expired requests, to avoid the state RAM usage
%% explosion if http client can not keep up.
Expand Down Expand Up @@ -633,20 +673,100 @@ enqueue_req(ReplyTo, Req, #state{requests = Requests0} = State) ->
State#state{requests = drop_expired(Requests)}.

%% call gun to shoot the request out
maybe_shoot(#state{enable_pipelining = EP, requests = Requests0, client = Client} = State0) ->
#{sent := Sent} = Requests0,
maybe_shoot(
#state{
requests =
#{
sent := Sent,
max_sent_expire := MaxExpire
} = Requests0,
client = Client,
max_inactive = MaxInactive,
enable_pipelining = PipelineLimit
} = State0
) ->
State = State0#state{requests = drop_expired(Requests0)},
%% If the gun http client is down
ClientDown = is_pid(Client) andalso (not is_process_alive(Client)),
%% Or when too many has been sent already
case ClientDown orelse should_cool_down(EP, maps:size(Sent)) of
true ->
SentCount = map_size(Sent),
case check_gun(Client, PipelineLimit, SentCount, MaxExpire, MaxInactive) of
continue ->
do_shoot(State);
pause ->
%% Then we should cool down, and let the gun responses
%% or 'EXIT' message to trigger the flow again
?tp(cool_down, #{enable_pipelining => EP}),
?tp(cool_down, #{enable_pipelining => State#state.enable_pipelining}),
State;
reconnect ->
%% assert
true = (MaxExpire > 0),
%% the connection has been inactive for too long
log(
error,
#{
msg => "force_reconnecting_zombie_http_connection",
last_request_expire => calendar:system_time_to_rfc3339(MaxExpire, [
{unit, millisecond}
]),
inactive_duration_threshold => MaxInactive,
inflight_requests => SentCount,
connection_pid => Client
},
State
),
?tp(reconnect, #{sent => SentCount}),
_ = exit(Client, kill),
State
end.

check_gun(ClientPid, PipelineLimit, SentCount, MaxExpireTs, MaxInactiveDuration) ->
maybe
ok ?= check_gun_pid(ClientPid),
ok ?= check_gun_jammed(SentCount, MaxExpireTs, MaxInactiveDuration),
check_gun_limit(PipelineLimit, SentCount)
end.

check_gun_pid(Pid) when not is_pid(Pid) ->
%% go straight to initialize client
continue;
check_gun_pid(Pid) ->
case is_process_alive(Pid) of
true ->
%% ok to send
ok;
false ->
do_shoot(State)
%% once initialized but now restarting
%% should not send but wait for EXIT message to trigger
%% reconnect
pause
end.

%% if there are sent requests, and the last reply is older than max_inactive,
%% the connection is considered in zomebie state hence require a reconnect.
check_gun_jammed(_SentCount, 0, _MaxInactiveDuration) ->
%% there was no expire time recorded before
ok;
check_gun_jammed(_SentCount, MaxExpireTs, MaxInactiveDuration) ->
case (now_() - MaxExpireTs) > MaxInactiveDuration of
true ->
reconnect;
false ->
ok
end.

%% allow 100 async requests maximum when enable_pipelining is 'true'
%% allow only 1 async request when enable_pipelining is 'false'
%% otherwise stop shooting at the number limited by enable_pipelining
check_gun_limit(_EnablePipeline = true, SentCount) ->
%% backward compatible
check_gun_limit(100, SentCount);
check_gun_limit(_EnablePipeline = false, SentCount) ->
%% backward compatible
check_gun_limit(1, SentCount);
check_gun_limit(PipelineLimit, SentCount) ->
case SentCount < PipelineLimit of
true ->
continue;
false ->
pause
end.

do_shoot(#state{requests = #{pending_count := 0}} = State) ->
Expand Down Expand Up @@ -914,6 +1034,9 @@ take_proplist(Key, Proplist0) ->
{ValueFromProplist, Proplist1}
end.

log(Level, Data, #state{host = Host, port = Port}) ->
logger:log(Level, Data#{host => Host, port => Port}).

-ifdef(TEST).

prioritise_latest_test() ->
Expand Down
Loading

0 comments on commit 2fb8a56

Please sign in to comment.