Skip to content

Commit

Permalink
feat: start-link gun process
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Dec 15, 2023
1 parent e8d4f0d commit ad87094
Show file tree
Hide file tree
Showing 13 changed files with 239 additions and 205 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
fail-fast: false
matrix:
otp:
- 24.3
- 25.1
- 26.1
- 25.3

steps:
- uses: actions/checkout@v3
Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# ehttpc changes

## 0.4.12

- Upgrade to gun 1.3.10 (OTP 26)
- Changed from `gun:open` to `gun:start_link` to start the gun process.
This makes the gun process linked to `ehttpc` process (instead of `gun_sup`).
Prior to this change, some early errors causing gun process to crash might not be able to be caught by ehttpc due to the slow monitoring.
e.g. when some SSL option is invalid, the gun process will crash immediately and `ehttpc` can only get a `noproc` error reason from the delayed process monitor.

## 0.4.11

- Added support for `PATCH` method requests.
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{minimum_otp_vsn, "21.0"}.

{deps, [
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}},
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.10"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
]}.
Expand Down Expand Up @@ -34,3 +34,4 @@
{cover_export_enabled, true}.

{plugins, [rebar3_proper, erlfmt]}.
{dialyzer, [{plt_extra_apps, [ssl, public_key]}]}.
31 changes: 16 additions & 15 deletions src/ehttpc.app.src
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{application, ehttpc,
[{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.11"},
{registered, []},
{applications, [kernel,
stdlib,
gproc,
gun
]},
{mod, {ehttpc_app, []}},
{env, []},
{licenses,["Apache-2.0"]},
{maintainers, ["EMQX Team <[email protected]>"]},
{links,[{"Github", "https://github.com/emqx/ehttpc"}]}
]}.
{application, ehttpc, [
{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.12"},
{registered, []},
{applications, [
kernel,
stdlib,
gproc,
gun
]},
{mod, {ehttpc_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <[email protected]>"]},
{links, [{"Github", "https://github.com/emqx/ehttpc"}]}
]}.
22 changes: 21 additions & 1 deletion src/ehttpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
%% -*- mode: erlang -*-
{"0.4.11",
{"0.4.12",
[
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.10", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.9", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.7", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.6", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.5", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{<<"0\\.4\\.[0-4]">>, [
Expand Down Expand Up @@ -63,22 +73,32 @@
]}
],
[
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.10", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.9", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.7", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.6", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.5", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{<<"0\\.4\\.[0-4]">>, [
Expand Down
25 changes: 20 additions & 5 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
pool :: term(),
id :: pos_integer(),
client :: pid() | ?undef,
%% no longer in use since 0.4.12 (changed to gun:start_link)
mref :: reference() | ?undef,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
Expand Down Expand Up @@ -312,6 +313,9 @@ do_handle_info(
{'DOWN', MRef, process, Client, Reason},
State = #state{mref = MRef, client = Client}
) ->
%% stale code for appup since 0.4.12
handle_client_down(State, Reason);
do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) ->
handle_client_down(State, Reason);
do_handle_info(Info, State) ->
?LOG(warning, "~p unexpected_info: ~p, client: ~p", [?MODULE, Info, State#state.client]),
Expand Down Expand Up @@ -447,10 +451,9 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
State#state{requests = NRequests, gun_state = down}.

open(State = #state{host = Host, port = Port, gun_opts = GunOpts}) ->
case gun:open(Host, Port, GunOpts) of
case gun:start_link(self(), Host, Port, GunOpts) of
{ok, ConnPid} when is_pid(ConnPid) ->
MRef = erlang:monitor(process, ConnPid),
{ok, State#state{mref = MRef, client = ConnPid}};
{ok, State#state{client = ConnPid}};
{error, Reason} ->
{error, Reason}
end.
Expand Down Expand Up @@ -704,7 +707,7 @@ maybe_shoot(#state{enable_pipelining = EP, requests = Requests0, client = Client
case ClientDown orelse should_cool_down(EP, maps:size(Sent)) of
true ->
%% Then we should cool down, and let the gun responses
%% or 'DOWN' message to trigger the flow again
%% or 'EXIT' message to trigger the flow again
?tp(cool_down, #{enable_pipelining => EP}),
State;
false ->
Expand Down Expand Up @@ -779,7 +782,13 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
%% but the connection is likely to be useful
{reply, {error, connect_timeout}, State};
{error, Reason} ->
erlang:demonitor(MRef, [flush]),
case is_reference(MRef) of
true ->
%% stale code for appup since 0.4.12
erlang:demonitor(MRef, [flush]);
false ->
ok
end,
{reply, {error, Reason}, State#state{client = ?undef, mref = ?undef}}
end.

Expand All @@ -791,8 +800,14 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
{gun_up, Pid, Protocol} ->
{{ok, Protocol}, State0};
{'DOWN', MRef, process, Pid, {shutdown, Reason}} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'DOWN', MRef, process, Pid, Reason} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'EXIT', Pid, {shutdown, Reason}} ->
{{error, Reason}, State0};
{'EXIT', Pid, Reason} ->
{{error, Reason}, State0};
?ASYNC_REQ(Method, Request, ExpireAt1, ResultCallback) ->
Req = ?REQ(Method, Request, ExpireAt1),
Expand Down
1 change: 0 additions & 1 deletion src/ehttpc_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ start(_StartType, _StartArgs) ->

stop(_State) ->
ok.

64 changes: 34 additions & 30 deletions src/ehttpc_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
%% API Function Exports
-export([start_link/2]).

-export([ info/1
, start_pool/2
, stop_pool/1
, pick_worker/1
, pick_worker/2]).
-export([
info/1,
start_pool/2,
stop_pool/1,
pick_worker/1,
pick_worker/2
]).

%% gen_server Function Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-import(proplists, [get_value/3]).

Expand All @@ -44,12 +47,12 @@
%% API
%%--------------------------------------------------------------------

-spec(start_link(ehttpc:pool_name(), list(ehttpc:option()))
-> {ok, pid()} | {error, term()}).
-spec start_link(ehttpc:pool_name(), list(ehttpc:option())) ->
{ok, pid()} | {error, term()}.
start_link(Pool, Opts) ->
gen_server:start_link(?MODULE, [Pool, Opts], []).

-spec(info(pid()) -> list()).
-spec info(pid()) -> list().
info(Pid) ->
gen_server:call(Pid, info).

Expand All @@ -76,19 +79,22 @@ init([Pool, Opts]) ->
PoolType = get_value(pool_type, Opts, random),
ok = ensure_pool(ehttpc:name(Pool), PoolType, [{size, PoolSize}]),
ok = lists:foreach(
fun(I) ->
ensure_pool_worker(ehttpc:name(Pool), {Pool, I}, I)
end, lists:seq(1, PoolSize)),
fun(I) ->
ensure_pool_worker(ehttpc:name(Pool), {Pool, I}, I)
end,
lists:seq(1, PoolSize)
),
{ok, #state{name = Pool, size = PoolSize, type = PoolType}}.

handle_call(info, _From, State = #state{name = Pool, size = Size, type = Type}) ->
Workers = ehttpc:workers(Pool),
Info = [{pool_name, Pool},
{pool_size, Size},
{pool_type, Type},
{workers, Workers}],
Info = [
{pool_name, Pool},
{pool_size, Size},
{pool_type, Type},
{workers, Workers}
],
{reply, Info, State};

handle_call(Req, _From, State) ->
logger:error("[Pool] unexpected request: ~p", [Req]),
{reply, ignored, State}.
Expand All @@ -101,11 +107,7 @@ handle_info(Info, State) ->
logger:error("[Pool] unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, #state{name = Pool, size = Size}) ->
lists:foreach(
fun(I) ->
gproc_pool:remove_worker(ehttpc:name(Pool), {Pool, I})
end, lists:seq(1, Size)),
terminate(_Reason, #state{name = Pool}) ->
gproc_pool:force_delete(ehttpc:name(Pool)).

code_change(_OldVsn, State, _Extra) ->
Expand All @@ -116,13 +118,15 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------

ensure_pool(Pool, Type, Opts) ->
try gproc_pool:new(Pool, Type, Opts)
try
gproc_pool:new(Pool, Type, Opts)
catch
error:exists -> ok
end.

ensure_pool_worker(Pool, Name, Slot) ->
try gproc_pool:add_worker(Pool, Name, Slot)
try
gproc_pool:add_worker(Pool, Name, Slot)
catch
error:exists -> ok
end.
30 changes: 18 additions & 12 deletions src/ehttpc_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ start_link(Pool, Opts) ->
supervisor:start_link(?MODULE, [Pool, Opts]).

init([Pool, Opts]) ->
Specs = [#{id => pool,
start => {ehttpc_pool, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [ehttpc_pool]},
#{id => worker_sup,
start => {ehttpc_worker_sup, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [ehttpc_worker_sup]}],
Specs = [
#{
id => pool,
start => {ehttpc_pool, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [ehttpc_pool]
},
#{
id => worker_sup,
start => {ehttpc_worker_sup, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [ehttpc_worker_sup]
}
],
{ok, {{one_for_all, 10, 100}, Specs}}.
Loading

0 comments on commit ad87094

Please sign in to comment.