From c896989b5f6db937575c11adde85be3caca54598 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Fri, 29 Oct 2021 14:43:35 +1300 Subject: [PATCH 1/7] Add stream_response option to hackney adapter --- lib/tesla/adapter/hackney.ex | 47 +++++++++++++++++++++++++++-- test/tesla/adapter/hackney_test.exs | 12 ++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index 6a1a4ed7f..eec9c3f17 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -42,7 +42,10 @@ if Code.ensure_loaded?(:hackney) do end defp format_body(data) when is_list(data), do: IO.iodata_to_binary(data) - defp format_body(data) when is_binary(data) or is_reference(data), do: data + + defp format_body(data) + when is_binary(data) or is_reference(data) or is_function(data), + do: data defp request(env, opts) do request( @@ -68,7 +71,12 @@ if Code.ensure_loaded?(:hackney) do end defp request(method, url, headers, body, opts) do - handle(:hackney.request(method, url, headers, body || '', opts)) + response = :hackney.request(method, url, headers, body || '', opts) + + case Keyword.get(opts, :stream_response, false) do + true -> handle_stream(response) + false -> handle(response) + end end defp request_stream(method, url, headers, body, opts) do @@ -106,6 +114,41 @@ if Code.ensure_loaded?(:hackney) do defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body} + defp handle_stream({:ok, status, headers, ref}) when is_reference(ref) do + state = :hackney_manager.get_state(ref) + + body = + Stream.resource( + fn -> state end, + fn + {:done, state} -> + {:halt, state} + + {:ok, data, state} -> + {[data], state} + + {:error, reason} -> + raise inspect(reason) + + state -> + {[], :hackney_response.stream_body(state)} + end, + &:hackney_response.close/1 + ) + + {:ok, status, headers, body} + end + + defp handle_stream(response) do + case handle(response) do + {:ok, _status, _headers, ref} = response when is_reference(ref) -> + handle_stream(response) + + response -> + response + end + end + defp handle_async_response({ref, %{headers: headers, status: status}}) when not (is_nil(headers) or is_nil(status)) do {:ok, status, headers, ref} diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index 3ae948626..0e477c6bd 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -62,4 +62,16 @@ defmodule Tesla.Adapter.HackneyTest do assert {:error, :fake_error} = call(request) end + + test "get with `stream_response: true` option" do + request = %Env{ + method: :get, + url: "#{@http}/ip" + } + + assert {:ok, %Env{} = response} = call(request, stream_response: true) + + assert response.status == 200 + assert(is_function(response.body)) + end end From 4353e9dce8c85fbde099ddd004b2b64246b5390e Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Fri, 29 Oct 2021 15:43:53 +1300 Subject: [PATCH 2/7] Change stream_response to stream_to_pid Hackney only allows the controller process to access the ref --- lib/tesla/adapter/hackney.ex | 41 ++++++++++++++--------------- test/tesla/adapter/hackney_test.exs | 7 ++--- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index eec9c3f17..88cd58139 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -73,9 +73,9 @@ if Code.ensure_loaded?(:hackney) do defp request(method, url, headers, body, opts) do response = :hackney.request(method, url, headers, body || '', opts) - case Keyword.get(opts, :stream_response, false) do - true -> handle_stream(response) - false -> handle(response) + case Keyword.get(opts, :stream_to_pid) do + pid when is_pid(pid) -> handle_stream(response, pid) + _ -> handle(response) end end @@ -114,35 +114,34 @@ if Code.ensure_loaded?(:hackney) do defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body} - defp handle_stream({:ok, status, headers, ref}) when is_reference(ref) do - state = :hackney_manager.get_state(ref) + defp handle_stream({:ok, status, headers, ref}, pid) when is_reference(ref) do + :hackney.controlling_process(ref, pid) body = Stream.resource( - fn -> state end, - fn - {:done, state} -> - {:halt, state} - - {:ok, data, state} -> - {[data], state} - - {:error, reason} -> - raise inspect(reason) - - state -> - {[], :hackney_response.stream_body(state)} + fn -> nil end, + fn _ -> + case :hackney.stream_body(ref) do + :done -> + {:halt, nil} + + {:ok, data} -> + {[data], nil} + + {:error, reason} -> + raise inspect(reason) + end end, - &:hackney_response.close/1 + fn _ -> :hackney.close(ref) end ) {:ok, status, headers, body} end - defp handle_stream(response) do + defp handle_stream(response, pid) do case handle(response) do {:ok, _status, _headers, ref} = response when is_reference(ref) -> - handle_stream(response) + handle_stream(response, pid) response -> response diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index 0e477c6bd..82c54c029 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -63,15 +63,16 @@ defmodule Tesla.Adapter.HackneyTest do assert {:error, :fake_error} = call(request) end - test "get with `stream_response: true` option" do + test "get with `stream_to_pid: pid` option" do request = %Env{ method: :get, url: "#{@http}/ip" } - assert {:ok, %Env{} = response} = call(request, stream_response: true) + assert {:ok, %Env{} = response} = call(request, stream_to_pid: self()) assert response.status == 200 - assert(is_function(response.body)) + assert is_function(response.body) + assert is_bitstring(Enum.join(response.body)) end end From fcafbecfc5dbbcb3d79094f10872ef962d8f648e Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Thu, 13 Oct 2022 12:41:36 +1300 Subject: [PATCH 3/7] refactor: improve API surface for response streaming * Rename `stream_to_pid` to `stream` * Add `__pid__` to `Env`, for capturing the process that initiated the request * hackney: add `stream_owner` option, which defaults to `env.__pid__` --- lib/tesla.ex | 8 +++++--- lib/tesla/adapter/hackney.ex | 5 +++-- test/tesla/adapter/hackney_test.exs | 7 ++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/tesla.ex b/lib/tesla.ex index 351109b8f..7f18207d8 100644 --- a/lib/tesla.ex +++ b/lib/tesla.ex @@ -50,7 +50,8 @@ defmodule Tesla.Env do status: status, opts: opts, __module__: atom, - __client__: client + __client__: client, + __pid__: pid() } defstruct method: nil, @@ -61,7 +62,8 @@ defmodule Tesla.Env do status: nil, opts: [], __module__: nil, - __client__: nil + __client__: nil, + __pid__: nil end defmodule Tesla.Middleware do @@ -322,7 +324,7 @@ defmodule Tesla do end defp prepare(module, %{pre: pre, post: post} = client, options) do - env = struct(Env, options ++ [__module__: module, __client__: client]) + env = struct(Env, options ++ [__module__: module, __client__: client, __pid__: self()]) stack = pre ++ module.__middleware__ ++ post ++ [effective_adapter(module, client)] {env, stack} end diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index 88cd58139..802641d16 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -54,6 +54,7 @@ if Code.ensure_loaded?(:hackney) do env.headers, env.body, Tesla.Adapter.opts(env, opts) + |> Keyword.put_new(:stream_owner, env.__pid__) ) end @@ -73,8 +74,8 @@ if Code.ensure_loaded?(:hackney) do defp request(method, url, headers, body, opts) do response = :hackney.request(method, url, headers, body || '', opts) - case Keyword.get(opts, :stream_to_pid) do - pid when is_pid(pid) -> handle_stream(response, pid) + case {Keyword.get(opts, :stream), Keyword.get(opts, :stream_owner)} do + {true, pid} when is_pid(pid) -> handle_stream(response, pid) _ -> handle(response) end end diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index 82c54c029..dd627428e 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -63,13 +63,14 @@ defmodule Tesla.Adapter.HackneyTest do assert {:error, :fake_error} = call(request) end - test "get with `stream_to_pid: pid` option" do + test "get with `stream: true` option" do request = %Env{ method: :get, - url: "#{@http}/ip" + url: "#{@http}/ip", + __pid__: self() } - assert {:ok, %Env{} = response} = call(request, stream_to_pid: self()) + assert {:ok, %Env{} = response} = call(request, stream: true) assert response.status == 200 assert is_function(response.body) From 20b8f5112440cb4d1af4841c720d6b2534a0b018 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Thu, 13 Oct 2022 12:56:04 +1300 Subject: [PATCH 4/7] chore: add test to check if __pid__ is captured correctly --- test/tesla/global_mock_test.exs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/test/tesla/global_mock_test.exs b/test/tesla/global_mock_test.exs index f846c19b5..f68d72eda 100644 --- a/test/tesla/global_mock_test.exs +++ b/test/tesla/global_mock_test.exs @@ -3,8 +3,11 @@ defmodule Tesla.GlobalMockTest do setup_all do Tesla.Mock.mock_global(fn - %{method: :get, url: "/list"} -> %Tesla.Env{status: 200, body: "hello"} - %{method: :post, url: "/create"} -> {201, %{}, %{id: 42}} + %{method: :get, url: "/list", __pid__: pid} -> + %Tesla.Env{status: 200, body: "hello", __pid__: pid} + + %{method: :post, url: "/create"} -> + {201, %{}, %{id: 42}} end) :ok @@ -16,4 +19,11 @@ defmodule Tesla.GlobalMockTest do assert_receive {:ok, %Tesla.Env{status: 200, body: "hello"}} end + + test "__pid__ is passed correctly" do + pid = self() + child_pid = spawn(fn -> send(pid, MockClient.get("/list")) end) + + assert_receive {:ok, %Tesla.Env{status: 200, body: "hello", __pid__: child_pid}} + end end From 4d7ee42ae17d061160ae0f63199452186fea899c Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Thu, 13 Oct 2022 13:31:52 +1300 Subject: [PATCH 5/7] refactor: Add `response` to env * Defaults to `:default` * Has `:stream` option, which attempts to stream the response if the adapter supports it --- lib/tesla.ex | 3 ++ lib/tesla/adapter/hackney.ex | 58 ++++++++++++++--------------- test/tesla/adapter/hackney_test.exs | 3 +- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/lib/tesla.ex b/lib/tesla.ex index 7f18207d8..d19eccb04 100644 --- a/lib/tesla.ex +++ b/lib/tesla.ex @@ -32,6 +32,7 @@ defmodule Tesla.Env do @type param :: binary | [{binary | atom, param}] @type query :: [{binary | atom, param}] @type headers :: [{binary, binary}] + @type response :: :default | :stream @type body :: any @type status :: integer | nil @@ -48,6 +49,7 @@ defmodule Tesla.Env do headers: headers, body: body, status: status, + response: response, opts: opts, __module__: atom, __client__: client, @@ -60,6 +62,7 @@ defmodule Tesla.Env do headers: [], body: nil, status: nil, + response: :default, opts: [], __module__: nil, __client__: nil, diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index 802641d16..cb0c6ebd8 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -53,41 +53,42 @@ if Code.ensure_loaded?(:hackney) do Tesla.build_url(env.url, env.query), env.headers, env.body, + env.response, Tesla.Adapter.opts(env, opts) |> Keyword.put_new(:stream_owner, env.__pid__) ) end - defp request(method, url, headers, %Stream{} = body, opts), - do: request_stream(method, url, headers, body, opts) + defp request(method, url, headers, %Stream{} = body, response, opts), + do: request_stream(method, url, headers, body, response, opts) - defp request(method, url, headers, body, opts) when is_function(body), - do: request_stream(method, url, headers, body, opts) + defp request(method, url, headers, body, response, opts) when is_function(body), + do: request_stream(method, url, headers, body, response, opts) - defp request(method, url, headers, %Multipart{} = mp, opts) do + defp request(method, url, headers, %Multipart{} = mp, response, opts) do headers = headers ++ Multipart.headers(mp) body = Multipart.body(mp) - request(method, url, headers, body, opts) + request(method, url, headers, body, response, opts) end - defp request(method, url, headers, body, opts) do + defp request(method, url, headers, body, type, opts) do response = :hackney.request(method, url, headers, body || '', opts) + handle(response, type, Keyword.get(opts, :stream_owner)) + end - case {Keyword.get(opts, :stream), Keyword.get(opts, :stream_owner)} do - {true, pid} when is_pid(pid) -> handle_stream(response, pid) - _ -> handle(response) + defp request_stream(method, url, headers, body, type, opts) do + handle_wrap = fn result -> + handle(result, type, Keyword.get(opts, :stream_owner)) end - end - defp request_stream(method, url, headers, body, opts) do with {:ok, ref} <- :hackney.request(method, url, headers, :stream, opts) do case send_stream(ref, body) do - :ok -> handle(:hackney.start_response(ref)) - error -> handle(error) + :ok -> handle_wrap.(:hackney.start_response(ref)) + error -> handle_wrap.(error) end else - e -> handle(e) + e -> handle_wrap.(e) end end @@ -100,22 +101,25 @@ if Code.ensure_loaded?(:hackney) do end) end - defp handle({:error, _} = error), do: error - defp handle({:ok, status, headers}), do: {:ok, status, headers, []} + defp handle({:error, _} = error, _, _), do: error + defp handle({:ok, status, headers}, _, _), do: {:ok, status, headers, []} + + defp handle({:ok, ref}, :default, _) when is_reference(ref) do + handle_async_response({ref, %{status: nil, headers: nil}}) + end - defp handle({:ok, ref}) when is_reference(ref) do + defp handle({:ok, ref}, :stream, pid) when is_reference(ref) do handle_async_response({ref, %{status: nil, headers: nil}}) + |> handle(:stream, pid) end - defp handle({:ok, status, headers, ref}) when is_reference(ref) do + defp handle({:ok, status, headers, ref}, :default, _) when is_reference(ref) do with {:ok, body} <- :hackney.body(ref) do {:ok, status, headers, body} end end - defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body} - - defp handle_stream({:ok, status, headers, ref}, pid) when is_reference(ref) do + defp handle({:ok, status, headers, ref}, :stream, pid) when is_reference(ref) do :hackney.controlling_process(ref, pid) body = @@ -139,15 +143,7 @@ if Code.ensure_loaded?(:hackney) do {:ok, status, headers, body} end - defp handle_stream(response, pid) do - case handle(response) do - {:ok, _status, _headers, ref} = response when is_reference(ref) -> - handle_stream(response, pid) - - response -> - response - end - end + defp handle({:ok, status, headers, body}, _, _), do: {:ok, status, headers, body} defp handle_async_response({ref, %{headers: headers, status: status}}) when not (is_nil(headers) or is_nil(status)) do diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index dd627428e..8da955035 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -67,10 +67,11 @@ defmodule Tesla.Adapter.HackneyTest do request = %Env{ method: :get, url: "#{@http}/ip", + response: :stream, __pid__: self() } - assert {:ok, %Env{} = response} = call(request, stream: true) + assert {:ok, %Env{} = response} = call(request) assert response.status == 200 assert is_function(response.body) From 1d6908385594d58eedcf89bfc6f0dafa178c82b2 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Thu, 13 Oct 2022 13:57:36 +1300 Subject: [PATCH 6/7] refactor: add is_pid guard to hackney stream handling --- lib/tesla/adapter/hackney.ex | 15 ++++++++------- test/tesla/adapter/hackney_test.exs | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index cb0c6ebd8..9c05af997 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -113,13 +113,8 @@ if Code.ensure_loaded?(:hackney) do |> handle(:stream, pid) end - defp handle({:ok, status, headers, ref}, :default, _) when is_reference(ref) do - with {:ok, body} <- :hackney.body(ref) do - {:ok, status, headers, body} - end - end - - defp handle({:ok, status, headers, ref}, :stream, pid) when is_reference(ref) do + defp handle({:ok, status, headers, ref}, :stream, pid) + when is_reference(ref) and is_pid(pid) do :hackney.controlling_process(ref, pid) body = @@ -143,6 +138,12 @@ if Code.ensure_loaded?(:hackney) do {:ok, status, headers, body} end + defp handle({:ok, status, headers, ref}, _, _) when is_reference(ref) do + with {:ok, body} <- :hackney.body(ref) do + {:ok, status, headers, body} + end + end + defp handle({:ok, status, headers, body}, _, _), do: {:ok, status, headers, body} defp handle_async_response({ref, %{headers: headers, status: status}}) diff --git a/test/tesla/adapter/hackney_test.exs b/test/tesla/adapter/hackney_test.exs index 8da955035..3c84610e3 100644 --- a/test/tesla/adapter/hackney_test.exs +++ b/test/tesla/adapter/hackney_test.exs @@ -63,7 +63,7 @@ defmodule Tesla.Adapter.HackneyTest do assert {:error, :fake_error} = call(request) end - test "get with `stream: true` option" do + test "get with `response: :stream`" do request = %Env{ method: :get, url: "#{@http}/ip", @@ -77,4 +77,18 @@ defmodule Tesla.Adapter.HackneyTest do assert is_function(response.body) assert is_bitstring(Enum.join(response.body)) end + + test "get with `response: stream` and bad pid" do + request = %Env{ + method: :get, + url: "#{@http}/ip", + response: :stream, + __pid__: nil + } + + assert {:ok, %Env{} = response} = call(request) + + assert response.status == 200 + assert not is_function(response.body) + end end From f2c8809c69c56e89fc63667fc5634388ea0407c9 Mon Sep 17 00:00:00 2001 From: Tim Smart Date: Sat, 15 Oct 2022 13:25:53 +1300 Subject: [PATCH 7/7] refactor: seperate stream response handling from default --- lib/tesla/adapter/hackney.ex | 59 ++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/lib/tesla/adapter/hackney.ex b/lib/tesla/adapter/hackney.ex index 9c05af997..7ffdc649f 100644 --- a/lib/tesla/adapter/hackney.ex +++ b/lib/tesla/adapter/hackney.ex @@ -72,23 +72,33 @@ if Code.ensure_loaded?(:hackney) do request(method, url, headers, body, response, opts) end - defp request(method, url, headers, body, type, opts) do + defp request(method, url, headers, body, :stream, opts) do response = :hackney.request(method, url, headers, body || '', opts) - handle(response, type, Keyword.get(opts, :stream_owner)) + handle_stream(response, Keyword.get(opts, :stream_owner)) end - defp request_stream(method, url, headers, body, type, opts) do - handle_wrap = fn result -> - handle(result, type, Keyword.get(opts, :stream_owner)) - end + defp request(method, url, headers, body, _, opts) do + response = :hackney.request(method, url, headers, body || '', opts) + handle(response) + end + defp request_stream(method, url, headers, body, type, opts) do with {:ok, ref} <- :hackney.request(method, url, headers, :stream, opts) do - case send_stream(ref, body) do - :ok -> handle_wrap.(:hackney.start_response(ref)) - error -> handle_wrap.(error) + case {send_stream(ref, body), type} do + {:ok, :stream} -> + handle_stream( + :hackney.start_response(ref), + Keyword.get(opts, :stream_owner) + ) + + {:ok, _} -> + handle(:hackney.start_response(ref)) + + {error, _} -> + handle(error) end else - e -> handle_wrap.(e) + e -> handle(e) end end @@ -101,19 +111,22 @@ if Code.ensure_loaded?(:hackney) do end) end - defp handle({:error, _} = error, _, _), do: error - defp handle({:ok, status, headers}, _, _), do: {:ok, status, headers, []} + defp handle({:error, _} = error), do: error + defp handle({:ok, status, headers}), do: {:ok, status, headers, []} - defp handle({:ok, ref}, :default, _) when is_reference(ref) do + defp handle({:ok, ref}) when is_reference(ref) do handle_async_response({ref, %{status: nil, headers: nil}}) end - defp handle({:ok, ref}, :stream, pid) when is_reference(ref) do - handle_async_response({ref, %{status: nil, headers: nil}}) - |> handle(:stream, pid) + defp handle({:ok, status, headers, ref}) when is_reference(ref) do + with {:ok, body} <- :hackney.body(ref) do + {:ok, status, headers, body} + end end - defp handle({:ok, status, headers, ref}, :stream, pid) + defp handle({:ok, status, headers, body}), do: {:ok, status, headers, body} + + defp handle_stream({:ok, status, headers, ref}, pid) when is_reference(ref) and is_pid(pid) do :hackney.controlling_process(ref, pid) @@ -138,14 +151,16 @@ if Code.ensure_loaded?(:hackney) do {:ok, status, headers, body} end - defp handle({:ok, status, headers, ref}, _, _) when is_reference(ref) do - with {:ok, body} <- :hackney.body(ref) do - {:ok, status, headers, body} + defp handle_stream(response, pid) do + case handle(response) do + {:ok, _status, _headers, ref} = response when is_reference(ref) and is_pid(pid) -> + handle_stream(response, pid) + + response -> + response end end - defp handle({:ok, status, headers, body}, _, _), do: {:ok, status, headers, body} - defp handle_async_response({ref, %{headers: headers, status: status}}) when not (is_nil(headers) or is_nil(status)) do {:ok, status, headers, ref}