Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start-link gun process #52

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/erlang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
fail-fast: false
matrix:
otp:
- 24.3
- 25.1
- 26.1
- 25.3

steps:
- uses: actions/checkout@v3
Expand Down
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# ehttpc changes

## 0.4.12

- Upgrade to gun 1.3.10 (OTP 26)
- Changed from `gun:open` to `gun:start_link` to start the gun process.
This makes the gun process linked to `ehttpc` process (instead of `gun_sup`).
Prior to this change, some early errors causing gun process to crash might not be able to be caught by ehttpc due to the slow monitoring.
e.g. when some SSL option is invalid, the gun process will crash immediately and `ehttpc` can only get a `noproc` error reason from the delayed process monitor.

## 0.4.11

- Added support for `PATCH` method requests.
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{minimum_otp_vsn, "21.0"}.

{deps, [
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}},
{gun, {git, "https://github.com/emqx/gun", {tag, "1.3.10"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}}
]}.
Expand Down Expand Up @@ -34,3 +34,4 @@
{cover_export_enabled, true}.

{plugins, [rebar3_proper, erlfmt]}.
{dialyzer, [{plt_extra_apps, [ssl, public_key]}]}.
31 changes: 16 additions & 15 deletions src/ehttpc.app.src
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{application, ehttpc,
[{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.11"},
{registered, []},
{applications, [kernel,
stdlib,
gproc,
gun
]},
{mod, {ehttpc_app, []}},
{env, []},
{licenses,["Apache-2.0"]},
{maintainers, ["EMQX Team <[email protected]>"]},
{links,[{"Github", "https://github.com/emqx/ehttpc"}]}
]}.
{application, ehttpc, [
{description, "HTTP Client for Erlang/OTP"},
{vsn, "0.4.12"},
{registered, []},
{applications, [
kernel,
stdlib,
gproc,
gun
]},
{mod, {ehttpc_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQX Team <[email protected]>"]},
{links, [{"Github", "https://github.com/emqx/ehttpc"}]}
]}.
22 changes: 21 additions & 1 deletion src/ehttpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
%% -*- mode: erlang -*-
{"0.4.11",
{"0.4.12",
[
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.10", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.9", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.7", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.6", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.5", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{<<"0\\.4\\.[0-4]">>, [
Expand Down Expand Up @@ -63,22 +73,32 @@
]}
],
[
{"0.4.11", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.10", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.9", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.8", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.7", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.6", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{"0.4.5", [
{load_module, ehttpc_pool, brutal_purge, soft_purge, []},
{load_module, ehttpc, brutal_purge, soft_purge, []}
]},
{<<"0\\.4\\.[0-4]">>, [
Expand Down
25 changes: 20 additions & 5 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
pool :: term(),
id :: pos_integer(),
client :: pid() | ?undef,
%% no longer in use since 0.4.12 (changed to gun:start_link)
mref :: reference() | ?undef,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
Expand Down Expand Up @@ -312,6 +313,9 @@ do_handle_info(
{'DOWN', MRef, process, Client, Reason},
State = #state{mref = MRef, client = Client}
) ->
%% stale code for appup since 0.4.12
handle_client_down(State, Reason);
do_handle_info({'EXIT', Client, Reason}, State = #state{client = Client}) ->
handle_client_down(State, Reason);
do_handle_info(Info, State) ->
?LOG(warning, "~p unexpected_info: ~p, client: ~p", [?MODULE, Info, State#state.client]),
Expand Down Expand Up @@ -447,10 +451,9 @@ handle_gun_down(#state{requests = Requests} = State, KilledStreams, Reason) ->
State#state{requests = NRequests, gun_state = down}.

open(State = #state{host = Host, port = Port, gun_opts = GunOpts}) ->
case gun:open(Host, Port, GunOpts) of
case gun:start_link(self(), Host, Port, GunOpts) of
{ok, ConnPid} when is_pid(ConnPid) ->
MRef = erlang:monitor(process, ConnPid),
{ok, State#state{mref = MRef, client = ConnPid}};
{ok, State#state{client = ConnPid}};
{error, Reason} ->
{error, Reason}
end.
Expand Down Expand Up @@ -704,7 +707,7 @@ maybe_shoot(#state{enable_pipelining = EP, requests = Requests0, client = Client
case ClientDown orelse should_cool_down(EP, maps:size(Sent)) of
true ->
%% Then we should cool down, and let the gun responses
%% or 'DOWN' message to trigger the flow again
%% or 'EXIT' message to trigger the flow again
?tp(cool_down, #{enable_pipelining => EP}),
State;
false ->
Expand Down Expand Up @@ -779,7 +782,13 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
%% but the connection is likely to be useful
{reply, {error, connect_timeout}, State};
{error, Reason} ->
erlang:demonitor(MRef, [flush]),
case is_reference(MRef) of
true ->
%% stale code for appup since 0.4.12
erlang:demonitor(MRef, [flush]);
false ->
ok
end,
{reply, {error, Reason}, State#state{client = ?undef, mref = ?undef}}
end.

Expand All @@ -791,8 +800,14 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
{gun_up, Pid, Protocol} ->
{{ok, Protocol}, State0};
{'DOWN', MRef, process, Pid, {shutdown, Reason}} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'DOWN', MRef, process, Pid, Reason} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
{'EXIT', Pid, {shutdown, Reason}} ->
{{error, Reason}, State0};
{'EXIT', Pid, Reason} ->
{{error, Reason}, State0};
?ASYNC_REQ(Method, Request, ExpireAt1, ResultCallback) ->
Req = ?REQ(Method, Request, ExpireAt1),
Expand Down
1 change: 0 additions & 1 deletion src/ehttpc_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ start(_StartType, _StartArgs) ->

stop(_State) ->
ok.

64 changes: 34 additions & 30 deletions src/ehttpc_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
%% API Function Exports
-export([start_link/2]).

-export([ info/1
, start_pool/2
, stop_pool/1
, pick_worker/1
, pick_worker/2]).
-export([
info/1,
start_pool/2,
stop_pool/1,
pick_worker/1,
pick_worker/2
]).

%% gen_server Function Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-import(proplists, [get_value/3]).

Expand All @@ -44,12 +47,12 @@
%% API
%%--------------------------------------------------------------------

-spec(start_link(ehttpc:pool_name(), list(ehttpc:option()))
-> {ok, pid()} | {error, term()}).
-spec start_link(ehttpc:pool_name(), list(ehttpc:option())) ->
{ok, pid()} | {error, term()}.
start_link(Pool, Opts) ->
gen_server:start_link(?MODULE, [Pool, Opts], []).

-spec(info(pid()) -> list()).
-spec info(pid()) -> list().
info(Pid) ->
gen_server:call(Pid, info).

Expand All @@ -76,19 +79,22 @@ init([Pool, Opts]) ->
PoolType = get_value(pool_type, Opts, random),
ok = ensure_pool(ehttpc:name(Pool), PoolType, [{size, PoolSize}]),
ok = lists:foreach(
fun(I) ->
ensure_pool_worker(ehttpc:name(Pool), {Pool, I}, I)
end, lists:seq(1, PoolSize)),
fun(I) ->
ensure_pool_worker(ehttpc:name(Pool), {Pool, I}, I)
end,
lists:seq(1, PoolSize)
),
{ok, #state{name = Pool, size = PoolSize, type = PoolType}}.

handle_call(info, _From, State = #state{name = Pool, size = Size, type = Type}) ->
Workers = ehttpc:workers(Pool),
Info = [{pool_name, Pool},
{pool_size, Size},
{pool_type, Type},
{workers, Workers}],
Info = [
{pool_name, Pool},
{pool_size, Size},
{pool_type, Type},
{workers, Workers}
],
{reply, Info, State};

handle_call(Req, _From, State) ->
logger:error("[Pool] unexpected request: ~p", [Req]),
{reply, ignored, State}.
Expand All @@ -101,11 +107,7 @@ handle_info(Info, State) ->
logger:error("[Pool] unexpected info: ~p", [Info]),
{noreply, State}.

terminate(_Reason, #state{name = Pool, size = Size}) ->
lists:foreach(
fun(I) ->
gproc_pool:remove_worker(ehttpc:name(Pool), {Pool, I})
end, lists:seq(1, Size)),
terminate(_Reason, #state{name = Pool}) ->
gproc_pool:force_delete(ehttpc:name(Pool)).

code_change(_OldVsn, State, _Extra) ->
Expand All @@ -116,13 +118,15 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------

ensure_pool(Pool, Type, Opts) ->
try gproc_pool:new(Pool, Type, Opts)
try
gproc_pool:new(Pool, Type, Opts)
catch
error:exists -> ok
end.

ensure_pool_worker(Pool, Name, Slot) ->
try gproc_pool:add_worker(Pool, Name, Slot)
try
gproc_pool:add_worker(Pool, Name, Slot)
catch
error:exists -> ok
end.
30 changes: 18 additions & 12 deletions src/ehttpc_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ start_link(Pool, Opts) ->
supervisor:start_link(?MODULE, [Pool, Opts]).

init([Pool, Opts]) ->
Specs = [#{id => pool,
start => {ehttpc_pool, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [ehttpc_pool]},
#{id => worker_sup,
start => {ehttpc_worker_sup, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [ehttpc_worker_sup]}],
Specs = [
#{
id => pool,
start => {ehttpc_pool, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => worker,
modules => [ehttpc_pool]
},
#{
id => worker_sup,
start => {ehttpc_worker_sup, start_link, [Pool, Opts]},
restart => transient,
shutdown => 5000,
type => supervisor,
modules => [ehttpc_worker_sup]
}
],
{ok, {{one_for_all, 10, 100}, Specs}}.
Loading