Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream_response option to hackney adapter #498

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions lib/tesla.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule Tesla.Env do
@type param :: binary | [{binary | atom, param}]
@type query :: [{binary | atom, param}]
@type headers :: [{binary, binary}]
@type response :: :default | :stream

@type body :: any
@type status :: integer | nil
Expand All @@ -48,9 +49,11 @@ defmodule Tesla.Env do
headers: headers,
body: body,
status: status,
response: response,
opts: opts,
__module__: atom,
__client__: client
__client__: client,
__pid__: pid()
}

defstruct method: nil,
Expand All @@ -59,9 +62,11 @@ defmodule Tesla.Env do
headers: [],
body: nil,
status: nil,
response: :default,
opts: [],
__module__: nil,
__client__: nil
__client__: nil,
__pid__: nil
end

defmodule Tesla.Middleware do
Expand Down Expand Up @@ -322,7 +327,7 @@ defmodule Tesla do
end

defp prepare(module, %{pre: pre, post: post} = client, options) do
env = struct(Env, options ++ [__module__: module, __client__: client])
env = struct(Env, options ++ [__module__: module, __client__: client, __pid__: self()])
stack = pre ++ module.__middleware__ ++ post ++ [effective_adapter(module, client)]
{env, stack}
end
Expand Down
81 changes: 68 additions & 13 deletions lib/tesla/adapter/hackney.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,60 @@ if Code.ensure_loaded?(:hackney) do
end

defp format_body(data) when is_list(data), do: IO.iodata_to_binary(data)
defp format_body(data) when is_binary(data) or is_reference(data), do: data

defp format_body(data)
when is_binary(data) or is_reference(data) or is_function(data),
do: data

defp request(env, opts) do
request(
env.method,
Tesla.build_url(env.url, env.query),
env.headers,
env.body,
env.response,
Tesla.Adapter.opts(env, opts)
|> Keyword.put_new(:stream_owner, env.__pid__)
)
end

defp request(method, url, headers, %Stream{} = body, opts),
do: request_stream(method, url, headers, body, opts)
defp request(method, url, headers, %Stream{} = body, response, opts),
do: request_stream(method, url, headers, body, response, opts)

defp request(method, url, headers, body, opts) when is_function(body),
do: request_stream(method, url, headers, body, opts)
defp request(method, url, headers, body, response, opts) when is_function(body),
do: request_stream(method, url, headers, body, response, opts)

defp request(method, url, headers, %Multipart{} = mp, opts) do
defp request(method, url, headers, %Multipart{} = mp, response, opts) do
headers = headers ++ Multipart.headers(mp)
body = Multipart.body(mp)

request(method, url, headers, body, opts)
request(method, url, headers, body, response, opts)
end

defp request(method, url, headers, body, :stream, opts) do
response = :hackney.request(method, url, headers, body || '', opts)
handle_stream(response, Keyword.get(opts, :stream_owner))
end

defp request(method, url, headers, body, opts) do
handle(:hackney.request(method, url, headers, body || '', opts))
defp request(method, url, headers, body, _, opts) do
response = :hackney.request(method, url, headers, body || '', opts)
handle(response)
end

defp request_stream(method, url, headers, body, opts) do
defp request_stream(method, url, headers, body, type, opts) do
with {:ok, ref} <- :hackney.request(method, url, headers, :stream, opts) do
case send_stream(ref, body) do
:ok -> handle(:hackney.start_response(ref))
error -> handle(error)
case {send_stream(ref, body), type} do
{:ok, :stream} ->
handle_stream(
:hackney.start_response(ref),
Keyword.get(opts, :stream_owner)
)

{:ok, _} ->
handle(:hackney.start_response(ref))

{error, _} ->
handle(error)
end
else
e -> handle(e)
Expand Down Expand Up @@ -106,6 +126,41 @@ if Code.ensure_loaded?(:hackney) do

defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body}

defp handle_stream({:ok, status, headers, ref}, pid)
when is_reference(ref) and is_pid(pid) do
:hackney.controlling_process(ref, pid)

body =
Stream.resource(
fn -> nil end,
fn _ ->
case :hackney.stream_body(ref) do
:done ->
{:halt, nil}

{:ok, data} ->
{[data], nil}

{:error, reason} ->
raise inspect(reason)
end
end,
fn _ -> :hackney.close(ref) end
)

{:ok, status, headers, body}
end

defp handle_stream(response, pid) do
case handle(response) do
{:ok, _status, _headers, ref} = response when is_reference(ref) and is_pid(pid) ->
handle_stream(response, pid)

response ->
response
end
end

defp handle_async_response({ref, %{headers: headers, status: status}})
when not (is_nil(headers) or is_nil(status)) do
{:ok, status, headers, ref}
Expand Down
29 changes: 29 additions & 0 deletions test/tesla/adapter/hackney_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,33 @@ defmodule Tesla.Adapter.HackneyTest do

assert {:error, :fake_error} = call(request)
end

test "get with `response: :stream`" do
request = %Env{
method: :get,
url: "#{@http}/ip",
response: :stream,
__pid__: self()
}

assert {:ok, %Env{} = response} = call(request)

assert response.status == 200
assert is_function(response.body)
assert is_bitstring(Enum.join(response.body))
end

test "get with `response: stream` and bad pid" do
request = %Env{
method: :get,
url: "#{@http}/ip",
response: :stream,
__pid__: nil
}

assert {:ok, %Env{} = response} = call(request)

assert response.status == 200
assert not is_function(response.body)
end
end
14 changes: 12 additions & 2 deletions test/tesla/global_mock_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ defmodule Tesla.GlobalMockTest do

setup_all do
Tesla.Mock.mock_global(fn
%{method: :get, url: "/list"} -> %Tesla.Env{status: 200, body: "hello"}
%{method: :post, url: "/create"} -> {201, %{}, %{id: 42}}
%{method: :get, url: "/list", __pid__: pid} ->
%Tesla.Env{status: 200, body: "hello", __pid__: pid}

%{method: :post, url: "/create"} ->
{201, %{}, %{id: 42}}
end)

:ok
Expand All @@ -16,4 +19,11 @@ defmodule Tesla.GlobalMockTest do

assert_receive {:ok, %Tesla.Env{status: 200, body: "hello"}}
end

test "__pid__ is passed correctly" do
pid = self()
child_pid = spawn(fn -> send(pid, MockClient.get("/list")) end)

assert_receive {:ok, %Tesla.Env{status: 200, body: "hello", __pid__: child_pid}}
end
end