Skip to content

Commit

Permalink
feat: add http proxy support
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Sep 20, 2024
1 parent 71ed6ec commit a788945
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 16 deletions.
17 changes: 17 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# ehttpc changes

## 0.4.15

- Added support for using HTTP proxy (HTTP 1.1 only).
To use it, pass `proxy` in the pool opts.

Ex:

```erlang
%% Point to the proxy host and port
ProxyOpts = #{host => "127.0.0.1", port => 8888}.
ehttpc_sup:start_pool(<<"pool">>, [{host, "target.host.com"}, {port, 80}, {proxy, ProxyOpts}]).

%% To use username and password
ProxyOpts = #{host => "127.0.0.1", port => 8888, username => "proxyuser", password => "secret"}.
ehttpc_sup:start_pool(<<"pool">>, [{host, "target.host.com"}, {port, 80}, {proxy, ProxyOpts}]).
```

## 0.4.14

- Forcefully recreate `gproc_pool`s during `ehttpc_pool:init/1` to prevent reusing pools in an inconsistent state.
Expand Down
154 changes: 143 additions & 11 deletions src/ehttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
enable_pipelining :: boolean() | non_neg_integer(),
gun_opts :: gun:opts(),
gun_state :: down | up,
requests :: map()
requests :: map(),
proxy :: undefined | map()
}).

-type pool_name() :: any().
Expand Down Expand Up @@ -195,9 +196,10 @@ name(Pool) -> {?MODULE, Pool}.
%% gen_server callbacks
%%--------------------------------------------------------------------

init([Pool, Id, Opts]) ->
init([Pool, Id, Opts0]) ->
process_flag(trap_exit, true),
PrioLatest = proplists:get_bool(prioritise_latest, Opts),
PrioLatest = proplists:get_bool(prioritise_latest, Opts0),
#{opts := Opts, proxy := Proxy} = parse_proxy_opts(Opts0),
State = #state{
pool = Pool,
id = Id,
Expand All @@ -213,7 +215,8 @@ init([Pool, Id, Opts]) ->
pending_count => 0,
sent => #{},
prioritise_latest => PrioLatest
}
},
proxy = Proxy
},
true = gproc_pool:connect_worker(ehttpc:name(Pool), {Pool, Id}),
{ok, State}.
Expand Down Expand Up @@ -356,10 +359,37 @@ code_change({down, _Vsn}, State, [no_enable_pipelining]) ->
} = State,
OldRequests = downgrade_requests(Requests),
{ok, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, OldRequests}};
code_change({down, _Vsn}, #state{requests = Requests} = State, [downgrade_requests]) ->
code_change({down, _Vsn}, State, [downgrade_requests]) ->
%% downgrade to a version which had old format 'requests'
#state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = Requests
} = State,
OldRequests = downgrade_requests(Requests),
{ok, State#state{requests = OldRequests}};
{ok, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, OldRequests}};
code_change({down, _Vsn}, State, [no_proxy]) ->
%% downgrade to a version before `proxy' was added
#state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = Requests
} = State,
{ok, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, Requests}};
%% below are upgrade instructions
code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}, _Extra) ->
%% upgrade from a version before 'requests' field was added
Expand All @@ -373,10 +403,11 @@ code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState}
enable_pipelining = true,
gun_opts = GunOpts,
gun_state = GunState,
requests = upgrade_requests(#{})
requests = upgrade_requests(#{}),
proxy = undefined
}};
code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState, Requests}, _) ->
%% upgrade from a version before 'enable_pipelining' filed was added
%% upgrade from a version before 'enable_pipelining' field was added
{ok, #state{
pool = Pool,
id = ID,
Expand All @@ -389,8 +420,25 @@ code_change(_Vsn, {state, Pool, ID, Client, MRef, Host, Port, GunOpts, GunState,
gun_state = GunState,
requests = upgrade_requests(Requests)
}};
code_change(
_Vsn, {state, Pool, ID, Client, MRef, Host, Port, Pipelining, GunOpts, GunState, Requests}, _
) ->
%% upgrade from a version before `proxy' field was added
{ok, #state{
pool = Pool,
id = ID,
client = Client,
mref = MRef,
host = Host,
port = Port,
enable_pipelining = Pipelining,
gun_opts = GunOpts,
gun_state = GunState,
requests = upgrade_requests(Requests),
proxy = undefined
}};
code_change(_Vsn, State, _) ->
%% upgrade from a version ahving old format 'requests' field
%% upgrade from a version having old format 'requests' field
{ok, upgrade_requests(State)}.

format_status(Status = #{state := State}) ->
Expand Down Expand Up @@ -776,11 +824,15 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
{Res, State} = gun_await_up(Client, ExpireAt, Timeout, MRef, State0),
case Res of
{ok, _} ->
Fun(State#state{gun_state = up});
Fun(State);
{error, connect_timeout} ->
%% the caller can not wait logger
%% but the connection is likely to be useful
{reply, {error, connect_timeout}, State};
{error, {proxy_error, _} = Error} ->
%% We keep the client around because the proxy might still send data as part
%% of the error response.
{reply, {error, Error}, State};
{error, Reason} ->
case is_reference(MRef) of
true ->
Expand All @@ -798,7 +850,13 @@ do_after_gun_up(State0 = #state{client = Client, mref = MRef}, ExpireAt, Fun) ->
gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
receive
{gun_up, Pid, Protocol} ->
{{ok, Protocol}, State0};
case State0#state.proxy of
undefined ->
State = State0#state{gun_state = up},
{{ok, Protocol}, State};
#{} = ProxyOpts ->
gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0)
end;
{'DOWN', MRef, process, Pid, {shutdown, Reason}} ->
%% stale code for appup since 0.4.12
{{error, Reason}, State0};
Expand All @@ -824,6 +882,34 @@ gun_await_up(Pid, ExpireAt, Timeout, MRef, State0) ->
{{error, connect_timeout}, State0}
end.

gun_connect_proxy(Pid, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
StreamRef = gun:connect(Pid, ProxyOpts),
gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0).

gun_await_connect_proxy(Pid, StreamRef, ExpireAt, Timeout, Protocol, ProxyOpts, State0) ->
receive
{gun_response, Pid, StreamRef, fin, 200, Headers} ->
State = State0#state{gun_state = up},
{{ok, {Protocol, Headers}}, State};
{gun_response, Pid, StreamRef, _Fin, 407, _Headers} ->
{{error, {proxy_error, unauthorized}}, State0};
{gun_response, Pid, StreamRef, _Fin, StatusCode, Headers} ->
{{error, {proxy_error, {StatusCode, Headers}}}, State0};
?ASYNC_REQ(Method, Request, ExpireAt1, ResultCallback) ->
Req = ?REQ(Method, Request, ExpireAt1),
State = enqueue_req(ResultCallback, Req, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State);
?GEN_CALL_REQ(From, Call) ->
State = enqueue_req(From, Call, State0),
%% keep waiting
NewTimeout = timeout(ExpireAt),
gun_await_connect_proxy(Pid, StreamRef, ExpireAt, NewTimeout, Protocol, ProxyOpts, State)
after Timeout ->
{{error, connect_timeout}, State0}
end.

%% normal handling of gun_response and gun_data reply
handle_gun_reply(State, Client, StreamRef, IsFin, StatusCode, Headers, Data) ->
#state{requests = Requests} = State,
Expand Down Expand Up @@ -891,6 +977,52 @@ fresh_expire_at(infinity = _Timeout) ->
fresh_expire_at(Timeout) when is_integer(Timeout) ->
now_() + Timeout.

parse_proxy_opts(Opts) ->
%% Target host and port
case proplists:get_value(proxy, Opts, undefined) of
undefined ->
#{opts => Opts, proxy => undefined};
#{host := _, port := _} = ProxyOpts0 ->
%% We open connection to proxy, then issue `gun:connect' to target host.
{ProxyOpts, NewOpts} =
lists:foldl(
fun(Key, {ProxyAcc, GunAcc}) ->
swap(Key, ProxyAcc, GunAcc)
end,
{ProxyOpts0, proplists:delete(proxy, Opts)},
[host, port, transport, {tls_opts, transport_opts}]
),
#{opts => NewOpts, proxy => ProxyOpts}
end.

swap(Key, Map, Proplist) when is_atom(Key) ->
swap({Key, Key}, Map, Proplist);
swap({KeyM, KeyP}, Map0, Proplist0) when is_map_key(KeyM, Map0) ->
ValueFromMap = maps:get(KeyM, Map0),
Map = maps:remove(KeyM, Map0),
case take_proplist(KeyP, Proplist0) of
{ValueFromProplist, Proplist} ->
{Map#{KeyM => ValueFromProplist}, [{KeyP, ValueFromMap} | Proplist]};
error ->
{Map, [{KeyP, ValueFromMap} | Proplist0]}
end;
swap({KeyM, KeyP}, Map0, Proplist0) ->
case take_proplist(KeyP, Proplist0) of
{ValueFromProplist, Proplist} ->
{Map0#{KeyM => ValueFromProplist}, Proplist};
error ->
{Map0, Proplist0}
end.

take_proplist(Key, Proplist0) ->
Proplist1 = lists:keydelete(Key, 1, Proplist0),
case lists:keyfind(Key, 1, Proplist0) of
false ->
error;
{Key, ValueFromProplist} ->
{ValueFromProplist, Proplist1}
end.

-ifdef(TEST).

prioritise_latest_test() ->
Expand Down
11 changes: 6 additions & 5 deletions test/ehttpc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,23 @@ upgrade_state_on_the_fly_test() ->
spawn_link(fun() -> ehttpc:request(?POOL, post, {<<"/">>, [], <<"test-post">>}) end),
{ok, _} = ?block_until(#{?snk_kind := shot}, 2000, infinity),
Pid = ehttpc_pool:pick_worker(?POOL),
GetState = fun() -> lists:reverse(tuple_to_list(sys:get_state(Pid))) end,
GetState = fun() -> sys:get_state(Pid) end,
State = GetState(),
Requests = hd(State),
RequestsIdx = 11,
Requests = element(RequestsIdx, State),
#{sent := Sent} = Requests,
?assertEqual(1, maps:size(Sent)),
OldState = list_to_tuple(lists:reverse([Sent | tl(State)])),
OldState = setelement(RequestsIdx, State, Sent),
%% put old format to the process state
sys:replace_state(Pid, fun(_) -> OldState end),
%% verify it's in the old format
?assertEqual(Sent, hd(GetState())),
?assertEqual(Sent, element(RequestsIdx, GetState())),
%% send a message to trigger upgrade
Pid ! dummy,
{error, _} = gen_server:call(Pid, dummy),
ok = gen_server:cast(Pid, dummy),
%% now it should be upgraded to the new version
?assertMatch(#{sent := Sent}, hd(GetState())),
?assertMatch(#{sent := Sent}, element(RequestsIdx, GetState())),
_ = sys:get_status(Pid),
ok
end
Expand Down

0 comments on commit a788945

Please sign in to comment.