Skip to content

Commit

Permalink
Merge pull request #58 from emqx/1004-refactor-delete-dead-code
Browse files Browse the repository at this point in the history
Refactor: delete dead code
  • Loading branch information
zmstone authored Oct 7, 2024
2 parents 2fb8a56 + 90ee836 commit e92689d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 95 deletions.
67 changes: 12 additions & 55 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
%% for test
-export([
get_state/1,
get_state/2,
upgrade_requests/1,
downgrade_requests/1
get_state/2
]).

-export_type([
Expand Down Expand Up @@ -83,8 +81,6 @@
pool :: term(),
id :: pos_integer(),
client :: pid() | ?undef,
%% no longer in use since 0.4.12 (changed to gun:start_link)
mref :: reference() | ?undef,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
enable_pipelining :: boolean() | non_neg_integer(),
Expand Down Expand Up @@ -207,7 +203,6 @@ init([Pool, Id, Opts0]) ->
pool = Pool,
id = Id,
client = ?undef,
mref = ?undef,
host = proplists:get_value(host, Opts),
port = proplists:get_value(port, Opts),
enable_pipelining = proplists:get_value(enable_pipelining, Opts, false),
Expand Down Expand Up @@ -253,20 +248,20 @@ handle_call({health_check, Timeout}, _From, State = #state{client = Client, gun_
end
);
handle_call(?REQ(_Method, _Request, _ExpireAt) = Req, From, State0) ->
State1 = enqueue_req(From, Req, upgrade_requests(State0)),
State1 = enqueue_req(From, Req, State0),
State = maybe_shoot(State1),
{noreply, State};
handle_call(Call, _From, State0) ->
State = maybe_shoot(upgrade_requests(State0)),
State = maybe_shoot(State0),
{reply, {error, {unexpected_call, Call}}, State}.

handle_cast(_Msg, State0) ->
State = maybe_shoot(upgrade_requests(State0)),
State = maybe_shoot(State0),
{noreply, State}.

handle_info(?ASYNC_REQ(Method, Request, ExpireAt, ResultCallback), State0) ->
Req = ?REQ(Method, Request, ExpireAt),
State1 = enqueue_req(ResultCallback, Req, upgrade_requests(State0)),
State1 = enqueue_req(ResultCallback, Req, State0),
State = maybe_shoot(State1),
{noreply, State};
handle_info({suspend, Time}, State) ->
Expand All @@ -277,7 +272,7 @@ handle_info(check_inactive, State0) ->
State = maybe_shoot(State0),
{noreply, start_check_inactive_timer(State)};
handle_info(Info, State0) ->
State1 = do_handle_info(Info, upgrade_requests(State0)),
State1 = do_handle_info(Info, State0),
State = maybe_shoot(State1),
{noreply, State}.

Expand Down Expand Up @@ -324,12 +319,6 @@ do_handle_info(
log(warning, #{msg => "http_connection_down", reason => Reason}, State),
NewState = handle_gun_down(State, KilledStreams, Reason),
NewState;
do_handle_info(
{'DOWN', MRef, process, Client, Reason},
State = #state{mref = MRef, client = Client}
) ->
%% stale code for appup since 0.4.12
handle_client_down(State, Reason);
do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) ->
handle_client_down(State, Reason);
do_handle_info(Info, State) ->
Expand Down Expand Up @@ -380,7 +369,6 @@ handle_client_down(#state{requests = Requests0} = State, Reason) ->
Requests = reply_error_for_sent_reqs(Requests0, Reason),
State#state{
requests = Requests,
mref = ?undef,
client = ?undef,
gun_state = down
}.
Expand Down Expand Up @@ -538,24 +526,6 @@ now_() ->
%% sent requests
%% =================================================================================

%% downgrade will cause all the pending calls to timeout
downgrade_requests(#{pending := _PendingCalls, sent := Sent}) -> Sent;
downgrade_requests(Already) -> Already.

%% upgrade from old format before 0.1.16
upgrade_requests(#state{requests = Requests} = State) ->
State#state{requests = upgrade_requests(Requests)};
upgrade_requests(#{pending := _, sent := _} = Already) ->
Already;
upgrade_requests(Map) when is_map(Map) ->
#{
pending => queue:new(),
pending_count => 0,
sent => Map,
prioritise_latest => false,
max_sent_expire => 0
}.

put_sent_req(
StreamRef,
Req,
Expand Down Expand Up @@ -825,10 +795,10 @@ shoot(
Req = ?SENT_REQ(ReplyTo, ExpireAt, ?undef),
{noreply, State#state{requests = put_sent_req(StreamRef, Req, Requests)}}.

do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
do_after_gun_up(State0 = #state{client = Client}, ExpireAt, Fun) ->
Timeout = timeout(ExpireAt),
%% wait for the http client to be ready
{Res, State} = gun_await_up(Client, ExpireAt, Timeout, MRef, State0),
{Res, State} = gun_await_up(Client, ExpireAt, Timeout, State0),
case Res of
{ok, _} ->
Fun(State);
Expand All @@ -841,20 +811,13 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
%% of the error response.
{reply, {error, Error}, State};
{error, Reason} ->
case is_reference(MRef) of
true ->
%% stale code for appup since 0.4.12
erlang:demonitor(MRef, [flush]);
false ->
ok
end,
{reply, {error, Reason}, State#state{client = ?undef, mref = ?undef}}
{reply, {error, Reason}, State#state{client = ?undef}}
end.

%% This is a copy of gun:wait_up/3
%% with the '$gen_call' clause added so the calls in the mail box
%% are collected into the queue in time
gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
gun_await_up(Pid, ExpireAt, Timeout, State0) ->
receive
{gun_up, Pid, Protocol} ->
case State0#state.proxy of
Expand All @@ -864,12 +827,6 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
#{} = ProxyOpts ->
gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0)
end;
{'DOWN', MRef, process, Pid, {shutdown, Reason}} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'DOWN', MRef, process, Pid, Reason} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'EXIT', Pid, {shutdown, Reason}} ->
{{error, Reason}, State0};
{'EXIT', Pid, Reason} ->
Expand All @@ -879,12 +836,12 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
State = enqueue_req(ResultCallback, Req, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_up(Pid, ExpireAt, NewTimeout, MRef, State);
gun_await_up(Pid, ExpireAt, NewTimeout, State);
?GEN_CALL_REQ(From, Call) ->
State = enqueue_req(From, Call, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_up(Pid, ExpireAt, NewTimeout, MRef, State)
gun_await_up(Pid, ExpireAt, NewTimeout, State)
after Timeout ->
{{error, connect_timeout}, State0}
end.
Expand Down
40 changes: 0 additions & 40 deletions test/ehttpc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -416,46 +416,6 @@ server_outage_test_() ->
end}
].

%% c:l(ehttpc) should work for version 0.1.8 -> 0.1.15
upgrade_state_on_the_fly_test() ->
Port = ?PORT,
ServerOpts = #{
port => Port,
name => ?FUNCTION_NAME,
%% no response during this test
delay => 300_000,
oneoff => false
},
PoolOpts = pool_opts("127.0.0.1", Port, _Pipelining = true, _PrioritiseLatest = false),
?WITH(
ServerOpts,
PoolOpts,
begin
spawn_link(fun() -> ehttpc:request(?POOL, post, {<<"/">>, [], <<"test-post">>}) end),
{ok, _} = ?block_until(#{?snk_kind := shot}, 2000, infinity),
Pid = ehttpc_pool:pick_worker(?POOL),
GetState = fun() -> sys:get_state(Pid) end,
State = GetState(),
RequestsIdx = 11,
Requests = element(RequestsIdx, State),
#{sent := Sent} = Requests,
?assertEqual(1, maps:size(Sent)),
OldState = setelement(RequestsIdx, State, Sent),
%% put old format to the process state
sys:replace_state(Pid, fun(_) -> OldState end),
%% verify it's in the old format
?assertEqual(Sent, element(RequestsIdx, GetState())),
%% send a message to trigger upgrade
Pid ! dummy,
{error, _} = gen_server:call(Pid, dummy),
ok = gen_server:cast(Pid, dummy),
%% now it should be upgraded to the new version
?assertMatch(#{sent := Sent}, element(RequestsIdx, GetState())),
_ = sys:get_status(Pid),
ok
end
).

cool_down_after_5_reqs_test() ->
Port = ?PORT,
ServerOpts = #{
Expand Down

0 comments on commit e92689d

Please sign in to comment.