diff --git a/lib/tesla.ex b/lib/tesla.ex index 351109b8..d19eccb0 100644 --- a/lib/tesla.ex +++ b/lib/tesla.ex @@ -32,6 +32,7 @@ defmodule Tesla.Env do @type param :: binary | [{binary | atom, param}] @type query :: [{binary | atom, param}] @type headers :: [{binary, binary}] + @type response :: :default | :stream @type body :: any @type status :: integer | nil @@ -48,9 +49,11 @@ defmodule Tesla.Env do headers: headers, body: body, status: status, + response: response, opts: opts, __module__: atom, - __client__: client + __client__: client, + __pid__: pid() } defstruct method: nil, @@ -59,9 +62,11 @@ defmodule Tesla.Env do headers: [], body: nil, status: nil, + response: :default, opts: [], __module__: nil, - __client__: nil + __client__: nil, + __pid__: nil end defmodule Tesla.Middleware do @@ -322,7 +327,7 @@ defmodule Tesla do end defp prepare(module, %{pre: pre, post: post} = client, options) do - env = struct(Env, options ++ [__module__: module, __client__: client]) + env = struct(Env, options ++ [__module__: module, __client__: client, __pid__: self()]) stack = pre ++ module.__middleware__ ++ post ++ [effective_adapter(module, client)] {env, stack} end diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index 6a1a4ed7..7ffdc649 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -42,7 +42,10 @@ if Code.ensure_loaded?(:hackney) do end defp format_body(data) when is_list(data), do: IO.iodata_to_binary(data) - defp format_body(data) when is_binary(data) or is_reference(data), do: data + + defp format_body(data) + when is_binary(data) or is_reference(data) or is_function(data), + do: data defp request(env, opts) do request( @@ -50,32 +53,49 @@ if Code.ensure_loaded?(:hackney) do Tesla.build_url(env.url, env.query), env.headers, env.body, + env.response, Tesla.Adapter.opts(env, opts) + |> Keyword.put_new(:stream_owner, env.__pid__) ) end - defp request(method, url, headers, %Stream{} = body, opts), - do: request_stream(method, url, headers, body, opts) + defp request(method, url, headers, %Stream{} = body, response, opts), + do: request_stream(method, url, headers, body, response, opts) - defp request(method, url, headers, body, opts) when is_function(body), - do: request_stream(method, url, headers, body, opts) + defp request(method, url, headers, body, response, opts) when is_function(body), + do: request_stream(method, url, headers, body, response, opts) - defp request(method, url, headers, %Multipart{} = mp, opts) do + defp request(method, url, headers, %Multipart{} = mp, response, opts) do headers = headers ++ Multipart.headers(mp) body = Multipart.body(mp) - request(method, url, headers, body, opts) + request(method, url, headers, body, response, opts) + end + + defp request(method, url, headers, body, :stream, opts) do + response = :hackney.request(method, url, headers, body || '', opts) + handle_stream(response, Keyword.get(opts, :stream_owner)) end - defp request(method, url, headers, body, opts) do - handle(:hackney.request(method, url, headers, body || '', opts)) + defp request(method, url, headers, body, _, opts) do + response = :hackney.request(method, url, headers, body || '', opts) + handle(response) end - defp request_stream(method, url, headers, body, opts) do + defp request_stream(method, url, headers, body, type, opts) do with {:ok, ref} <- :hackney.request(method, url, headers, :stream, opts) do - case send_stream(ref, body) do - :ok -> handle(:hackney.start_response(ref)) - error -> handle(error) + case {send_stream(ref, body), type} do + {:ok, :stream} -> + handle_stream( + :hackney.start_response(ref), + Keyword.get(opts, :stream_owner) + ) + + {:ok, _} -> + handle(:hackney.start_response(ref)) + + {error, _} -> + handle(error) end else e -> handle(e) @@ -106,6 +126,41 @@ if Code.ensure_loaded?(:hackney) do defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body} + defp handle_stream({:ok, status, headers, ref}, pid) + when is_reference(ref) and is_pid(pid) do + :hackney.controlling_process(ref, pid) + + body = + Stream.resource( + fn -> nil end, + fn _ -> + case :hackney.stream_body(ref) do + :done -> + {:halt, nil} + + {:ok, data} -> + {[data], nil} + + {:error, reason} -> + raise inspect(reason) + end + end, + fn _ -> :hackney.close(ref) end + ) + + {:ok, status, headers, body} + end + + defp handle_stream(response, pid) do + case handle(response) do + {:ok, _status, _headers, ref} = response when is_reference(ref) and is_pid(pid) -> + handle_stream(response, pid) + + response -> + response + end + end + defp handle_async_response({ref, %{headers: headers, status: status}}) when not (is_nil(headers) or is_nil(status)) do {:ok, status, headers, ref} diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index 3ae94862..3c84610e 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -62,4 +62,33 @@ defmodule Tesla.Adapter.HackneyTest do assert {:error, :fake_error} = call(request) end + + test "get with `response: :stream`" do + request = %Env{ + method: :get, + url: "#{@http}/ip", + response: :stream, + __pid__: self() + } + + assert {:ok, %Env{} = response} = call(request) + + assert response.status == 200 + assert is_function(response.body) + assert is_bitstring(Enum.join(response.body)) + end + + test "get with `response: stream` and bad pid" do + request = %Env{ + method: :get, + url: "#{@http}/ip", + response: :stream, + __pid__: nil + } + + assert {:ok, %Env{} = response} = call(request) + + assert response.status == 200 + assert not is_function(response.body) + end end diff --git a/test/tesla/global_mock_test.exs b/test/tesla/global_mock_test.exs index f846c19b..f68d72ed 100644 --- a/test/tesla/global_mock_test.exs +++ b/test/tesla/global_mock_test.exs @@ -3,8 +3,11 @@ defmodule Tesla.GlobalMockTest do setup_all do Tesla.Mock.mock_global(fn - %{method: :get, url: "/list"} -> %Tesla.Env{status: 200, body: "hello"} - %{method: :post, url: "/create"} -> {201, %{}, %{id: 42}} + %{method: :get, url: "/list", __pid__: pid} -> + %Tesla.Env{status: 200, body: "hello", __pid__: pid} + + %{method: :post, url: "/create"} -> + {201, %{}, %{id: 42}} end) :ok @@ -16,4 +19,11 @@ defmodule Tesla.GlobalMockTest do assert_receive {:ok, %Tesla.Env{status: 200, body: "hello"}} end + + test "__pid__ is passed correctly" do + pid = self() + child_pid = spawn(fn -> send(pid, MockClient.get("/list")) end) + + assert_receive {:ok, %Tesla.Env{status: 200, body: "hello", __pid__: child_pid}} + end end