diff --git a/lib/ex_aws.ex b/lib/ex_aws.ex index ea4fbfea..dc9fb11b 100644 --- a/lib/ex_aws.ex +++ b/lib/ex_aws.ex @@ -106,7 +106,13 @@ defmodule ExAws do @impl ExAws.Behaviour @spec stream!(ExAws.Operation.t(), keyword) :: Enumerable.t() def stream!(op, config_overrides \\ []) do - ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) + case ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) do + {:ok, result} -> + result + + result -> + result + end end @doc false diff --git a/lib/ex_aws/behaviour.ex b/lib/ex_aws/behaviour.ex index c585f01d..28b98464 100644 --- a/lib/ex_aws/behaviour.ex +++ b/lib/ex_aws/behaviour.ex @@ -18,8 +18,8 @@ defmodule ExAws.Behaviour do @callback request!(ExAws.Operation.t(), Keyword.t()) :: term | no_return @doc "See `ExAws.stream!/2`." - @callback stream!(ExAws.Operation.t()) :: Enumerable.t() + @callback stream!(ExAws.Operation.t()) :: Enumerable.t() | {:ok, term} | {:error, term} @doc "See `ExAws.stream!/2`." - @callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() + @callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() | {:ok, term} | {:error, term} end diff --git a/lib/ex_aws/operation/s3.ex b/lib/ex_aws/operation/s3.ex index cf2cfba4..ec4290dc 100644 --- a/lib/ex_aws/operation/s3.ex +++ b/lib/ex_aws/operation/s3.ex @@ -18,6 +18,26 @@ defmodule ExAws.Operation.S3 do defimpl ExAws.Operation do def perform(operation, config) do + {operation, config, url, body, headers, http_method} = + build_request_params(operation, config) + + ExAws.Request.request(http_method, url, body, headers, config, operation.service) + |> ExAws.Request.default_aws_error() + |> operation.parser.() + end + + def stream!(%{stream_builder: :octet_stream} = operation, config) do + {operation, config, url, body, headers, http_method} = + build_request_params(operation, config) + + ExAws.Request.request(http_method, url, body, headers, config, operation.service, true) + |> ExAws.Request.default_aws_error() + |> operation.parser.() + end + + def stream!(%{stream_builder: fun}, config), do: fun.(config) + + def build_request_params(operation, config) do body = operation.body headers = operation.headers http_method = operation.http_method @@ -37,13 +57,9 @@ defmodule ExAws.Operation.S3 do |> put_content_length_header(body, http_method) |> Map.to_list() - ExAws.Request.request(http_method, url, body, headers, config, operation.service) - |> ExAws.Request.default_aws_error() - |> operation.parser.() + {operation, config, url, body, headers, http_method} end - def stream!(%{stream_builder: fun}, config), do: fun.(config) - defp put_content_length_header(headers, "", :get), do: headers defp put_content_length_header(headers, body, _) do diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex index db6c93f8..c3c56c64 100644 --- a/lib/ex_aws/request.ex +++ b/lib/ex_aws/request.ex @@ -9,7 +9,7 @@ defmodule ExAws.Request do @type error_t :: {:error, {:http_error, http_status, binary}} @type response_t :: success_t | error_t - def request(http_method, url, data, headers, config, service) do + def request(http_method, url, data, headers, config, service, stream \\ false) do body = case data do [] -> "{}" @@ -17,13 +17,31 @@ defmodule ExAws.Request do _ -> config[:json_codec].encode!(data) end - request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}) + request_and_retry(http_method, url, service, config, headers, body, stream, {:attempt, 1}) end - def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}), - do: {:error, reason} - - def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do + def request_and_retry( + _method, + _url, + _service, + _config, + _headers, + _req_body, + _stream, + {:error, reason} + ), + do: {:error, reason} + + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + stream, + {:attempt, attempt} + ) do full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) with {:ok, full_headers} <- full_headers do @@ -35,7 +53,7 @@ defmodule ExAws.Request do ) end - case do_request(config, method, safe_url, req_body, full_headers, attempt, service) do + case do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do {:ok, %{status_code: status} = resp} when status in 200..299 or status == 304 -> {:ok, resp} @@ -53,6 +71,7 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) @@ -71,6 +90,7 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) @@ -86,13 +106,14 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) end end end - defp do_request(config, method, safe_url, req_body, full_headers, attempt, service) do + defp do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do telemetry_event = Map.get(config, :telemetry_event, [:ex_aws, :request]) telemetry_options = Map.get(config, :telemetry_options, []) @@ -111,7 +132,8 @@ defmodule ExAws.Request do safe_url, req_body, full_headers, - Map.get(config, :http_opts, []) + Map.get(config, :http_opts, []), + stream ) |> maybe_transform_response() @@ -211,6 +233,10 @@ defmodule ExAws.Request do |> :timer.sleep() end + def maybe_transform_response({:ok, %{status: status, stream: stream, headers: headers}}) do + {:ok, %{status_code: status, stream: stream, headers: headers}} + end + def maybe_transform_response({:ok, %{status: status, body: body, headers: headers}}) do # Req and Finch use status (rather than status_code) as a key. {:ok, %{status_code: status, body: body, headers: headers}} diff --git a/lib/ex_aws/request/hackney.ex b/lib/ex_aws/request/hackney.ex index 1a9c7584..15ab826b 100644 --- a/lib/ex_aws/request/hackney.ex +++ b/lib/ex_aws/request/hackney.ex @@ -14,14 +14,30 @@ defmodule ExAws.Request.Hackney do @default_opts [recv_timeout: 30_000] - def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do - opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) - opts = http_opts ++ [:with_body | opts] + def request(method, url, body \\ "", headers \\ [], http_opts \\ [], stream \\ false) do + opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) ++ http_opts + + opts = + if stream do + opts + else + [:with_body | opts] + end case :hackney.request(method, url, headers, body, opts) do {:ok, status, headers} -> {:ok, %{status_code: status, headers: headers}} + {:ok, status, headers, client} when is_reference(client) -> + stream = + Stream.resource( + fn -> client end, + &continue_stream/1, + &finish_stream/1 + ) + + {:ok, %{status_code: status, headers: headers, stream: stream}} + {:ok, status, headers, body} -> {:ok, %{status_code: status, headers: headers, body: body}} @@ -29,4 +45,21 @@ defmodule ExAws.Request.Hackney do {:error, %{reason: reason}} end end + + defp continue_stream(client) do + case :hackney.stream_body(client) do + {:ok, data} -> + {[data], client} + + :done -> + {:halt, client} + + {:error, reason} -> + raise reason + end + end + + defp finish_stream(client) do + :hackney.close(client) + end end diff --git a/lib/ex_aws/request/http_client.ex b/lib/ex_aws/request/http_client.ex index ca19cafb..dfd714cc 100644 --- a/lib/ex_aws/request/http_client.ex +++ b/lib/ex_aws/request/http_client.ex @@ -59,5 +59,6 @@ defmodule ExAws.Request.HttpClient do ) :: {:ok, %{status_code: pos_integer, headers: any}} | {:ok, %{status_code: pos_integer, headers: any, body: binary}} + | {:ok, %{status_code: pos_integer, headers: any, stream: Enumerable.t()}} | {:error, %{reason: any}} end