From f5a3ae8aae333447e3a983eddb7cade3ac52d386 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Sun, 5 Nov 2023 15:01:22 -0800 Subject: [PATCH 01/13] feat: EventStream for SelectObjectContentRequest --- lib/ex_aws/event_stream.ex | 12 +++++ lib/ex_aws/event_stream/header.ex | 38 +++++++++++++++ lib/ex_aws/event_stream/message.ex | 58 ++++++++++++++++++++++ lib/ex_aws/event_stream/prelude.ex | 77 ++++++++++++++++++++++++++++++ lib/ex_aws/operation/s3.ex | 28 +++++++++-- lib/ex_aws/request.ex | 23 +++++++++ lib/ex_aws/request/hackney.ex | 36 ++++++++++++++ lib/ex_aws/request/http_client.ex | 8 ++++ 8 files changed, 275 insertions(+), 5 deletions(-) create mode 100644 lib/ex_aws/event_stream.ex create mode 100644 lib/ex_aws/event_stream/header.ex create mode 100644 lib/ex_aws/event_stream/message.ex create mode 100644 lib/ex_aws/event_stream/prelude.ex diff --git a/lib/ex_aws/event_stream.ex b/lib/ex_aws/event_stream.ex new file mode 100644 index 00000000..67182f5b --- /dev/null +++ b/lib/ex_aws/event_stream.ex @@ -0,0 +1,12 @@ +defmodule ExAws.EventStream do + # https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html + # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html + + alias ExAws.EventStream.Message + + def parse_message(chunk) do + with {:ok, message} <- Message.parse(chunk) do + message + end + end +end diff --git a/lib/ex_aws/event_stream/header.ex b/lib/ex_aws/event_stream/header.ex new file mode 100644 index 00000000..7cec6845 --- /dev/null +++ b/lib/ex_aws/event_stream/header.ex @@ -0,0 +1,38 @@ +defmodule ExAws.EventStream.Header do + alias ExAws.EventStream.Prelude + + defp extract_header_bytes(prelude, payload_bytes) do + binary_part( + payload_bytes, + Prelude.prelude_length(), + prelude.headers_end - Prelude.prelude_length() + ) + end + + def extract_headers(<<>>, headers) do + Map.new(headers) + end + + def extract_headers(<>, headers) do + header_name = binary_part(rest, 0, header_name_size) + # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html + # +1 and -1 for ignoring first byte here (Always = 0x07) for S3 + # TODO: Support other header data types + rest = + binary_part(rest, header_name_size + 1, byte_size(rest) - header_name_size - 1) + + value_size = rest |> binary_part(0, 2) |> :binary.decode_unsigned(:big) + + value = + binary_part(rest, 2, value_size) + + rest + |> binary_part(2 + value_size, byte_size(rest) - 2 - value_size) + |> extract_headers([{header_name, value} | headers]) + end + + def parse(prelude, payload_bytes) do + headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers([]) + {:ok, headers} + end +end diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex new file mode 100644 index 00000000..ac042f7d --- /dev/null +++ b/lib/ex_aws/event_stream/message.ex @@ -0,0 +1,58 @@ +defmodule ExAws.EventStream.Message do + alias ExAws.EventStream.Prelude + alias ExAws.EventStream.Header + import Bitwise + + defstruct prelude: nil, + headers: nil, + payload: nil + + def validate_checksum(message_bytes, message_crc, prelude_crc) do + if :erlang.crc32(prelude_crc, message_bytes) |> band(0xFFFFFFFF) == message_crc do + :ok + else + {:error, :message_checksum_mismatch} + end + end + + def verify_message_crc(prelude, payload_bytes) do + message_crc = + payload_bytes + |> binary_part(prelude.payload_end, 4) + |> :binary.decode_unsigned(:big) + + message_length = prelude.payload_end - (Prelude.prelude_length() - 4) + + message_bytes = + binary_part( + payload_bytes, + Prelude.prelude_length() - 4, + message_length + ) + + validate_checksum(message_bytes, message_crc, prelude.crc) + end + + def parse_payload(prelude, payload_bytes) do + {:ok, binary_part(payload_bytes, prelude.headers_end, prelude.payload_length)} + end + + def is_record?(%__MODULE__{headers: headers}) do + Map.get(headers, ":event-type") == "Records" + end + + def get_payload(%__MODULE__{payload: payload}) do + payload + end + + def parse(chunk) do + dbg(chunk) + with {:ok, prelude, payload_bytes} <- + Prelude.parse(chunk), + :ok <- verify_message_crc(prelude, payload_bytes), + {:ok, headers} <- Header.parse(prelude, payload_bytes), + {:ok, payload} <- parse_payload(prelude, payload_bytes) do + {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} |> dbg() + end + end +end diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex new file mode 100644 index 00000000..37fc1c1c --- /dev/null +++ b/lib/ex_aws/event_stream/prelude.ex @@ -0,0 +1,77 @@ +defmodule ExAws.EventStream.Prelude do + defstruct total_length: nil, + headers_length: nil, + crc: nil, + payload_length: nil, + payload_end: nil, + headers_end: nil, + prelude_bytes: nil + + @prelude_length 12 + # 128 Kb + @max_header_length 128 * 1024 + # 16 Mb + @max_payload_length 16 * 1024 * 1024 + + def prelude_length(), do: @prelude_length + + defp unpack_prelude( + << + total_length::binary-size(4), + headers_length::binary-size(4), + crc::binary-size(4) + >> = prelude_bytes + ) do + total_length = :binary.decode_unsigned(total_length, :big) + headers_length = :binary.decode_unsigned(headers_length, :big) + crc = :binary.decode_unsigned(crc, :big) + + # The extra minus 4 bytes is for the message CRC. + payload_length = total_length - @prelude_length - headers_length - 4 + payload_end = total_length - 4 + headers_end = @prelude_length + headers_length + + {:ok, + %__MODULE__{ + total_length: total_length, + headers_length: headers_length, + crc: crc, + payload_length: payload_length, + payload_end: payload_end, + headers_end: headers_end, + prelude_bytes: prelude_bytes + }} + end + + def validate_prelude(prelude) do + cond do + prelude.headers_length > @max_header_length -> + {:error, :invalid_headers_length} + + prelude.payload_length > @max_payload_length -> + {:error, :invalid_payload_length} + + true -> + {:ok, prelude} + end + end + + def validate_checksum(%__MODULE__{ + prelude_bytes: <>, + crc: checksum + }) do + if :erlang.crc32(prelude_bytes) == checksum do + :ok + else + {:error, :prelude_checksum_mismatch} + end + end + + def parse(<> = payload) do + with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), + {:ok, prelude} <- validate_prelude(unpacked_prelude), + :ok <- validate_checksum(prelude) do + {:ok, prelude, payload} + end + end +end diff --git a/lib/ex_aws/operation/s3.ex b/lib/ex_aws/operation/s3.ex index cf2cfba4..4496dc4f 100644 --- a/lib/ex_aws/operation/s3.ex +++ b/lib/ex_aws/operation/s3.ex @@ -2,6 +2,7 @@ defmodule ExAws.Operation.S3 do @moduledoc """ Holds data necessary for an operation on the S3 service. """ + alias ExAws.EventStream defstruct stream_builder: nil, parser: &Function.identity/1, @@ -18,6 +19,27 @@ defmodule ExAws.Operation.S3 do defimpl ExAws.Operation do def perform(operation, config) do + {operation, config, url, body, headers, http_method} = + build_request_params(operation, config) + + ExAws.Request.request(http_method, url, body, headers, config, operation.service) + |> ExAws.Request.default_aws_error() + |> operation.parser.() + end + + def stream!(%{stream_builder: :event_stream} = operation, config) do + {operation, config, url, body, headers, http_method} = + build_request_params(operation, config) + + ExAws.Request.request_stream(http_method, url, body, headers, config, operation.service) + |> Stream.map(&EventStream.parse_message/1) + |> Stream.filter(&EventStream.Message.is_record?/1) + |> Stream.map(&EventStream.Message.get_payload/1) + end + + def stream!(%{stream_builder: fun}, config), do: fun.(config) + + def build_request_params(operation, config) do body = operation.body headers = operation.headers http_method = operation.http_method @@ -37,13 +59,9 @@ defmodule ExAws.Operation.S3 do |> put_content_length_header(body, http_method) |> Map.to_list() - ExAws.Request.request(http_method, url, body, headers, config, operation.service) - |> ExAws.Request.default_aws_error() - |> operation.parser.() + {operation, config, url, body, headers, http_method} end - def stream!(%{stream_builder: fun}, config), do: fun.(config) - defp put_content_length_header(headers, "", :get), do: headers defp put_content_length_header(headers, body, _) do diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex index db6c93f8..a330033c 100644 --- a/lib/ex_aws/request.ex +++ b/lib/ex_aws/request.ex @@ -9,6 +9,29 @@ defmodule ExAws.Request do @type error_t :: {:error, {:http_error, http_status, binary}} @type response_t :: success_t | error_t + def request_stream(method, url, data, headers, config, service) do + req_body = + case data do + [] -> "{}" + d when is_binary(d) -> d + _ -> config[:json_codec].encode!(data) + end + + safe_url = ExAws.Request.Url.sanitize(url, service) + full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) + + {:ok, _status, _headers, stream} = + config[:http_client].request_stream( + method, + safe_url, + req_body, + full_headers, + Map.get(config, :http_opts, []) + ) + + stream + end + def request(http_method, url, data, headers, config, service) do body = case data do diff --git a/lib/ex_aws/request/hackney.ex b/lib/ex_aws/request/hackney.ex index 1a9c7584..a73e027d 100644 --- a/lib/ex_aws/request/hackney.ex +++ b/lib/ex_aws/request/hackney.ex @@ -29,4 +29,40 @@ defmodule ExAws.Request.Hackney do {:error, %{reason: reason}} end end + + def request_stream(method, url, body \\ "", headers \\ [], http_opts \\ []) do + {:ok, headers} = headers + + {:ok, status, headers, client} = begin_download(method, url, body, headers, http_opts) + + stream = + Stream.resource( + fn -> client end, + &continue_download/1, + &finish_download/1 + ) + + {:ok, status, headers, stream} + end + + defp begin_download(method, url, body, headers, http_opts) do + :hackney.request(method, url, headers, body, http_opts) + end + + defp continue_download(client) do + case :hackney.stream_body(client) do + {:ok, data} -> + {[data], client} + + :done -> + {:halt, client} + + {:error, reason} -> + raise reason + end + end + + defp finish_download(client) do + :hackney.close(client) + end end diff --git a/lib/ex_aws/request/http_client.ex b/lib/ex_aws/request/http_client.ex index ca19cafb..9029b56a 100644 --- a/lib/ex_aws/request/http_client.ex +++ b/lib/ex_aws/request/http_client.ex @@ -60,4 +60,12 @@ defmodule ExAws.Request.HttpClient do {:ok, %{status_code: pos_integer, headers: any}} | {:ok, %{status_code: pos_integer, headers: any, body: binary}} | {:error, %{reason: any}} + + @callback request_stream( + method :: http_method, + url :: binary, + req_body :: binary, + headers :: [{binary, binary}, ...], + http_opts :: term + ) :: Enumerable.t() end From edbe06eed7a6e1eacad459b2c12d70786052d4c5 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Sun, 5 Nov 2023 22:59:38 -0800 Subject: [PATCH 02/13] Refactored extraction logic for Event Stream headers --- lib/ex_aws/event_stream/header.ex | 54 +++++++++++++----------- lib/ex_aws/event_stream/message.ex | 3 +- test/ex_aws/event_stream/header_test.exs | 45 ++++++++++++++++++++ 3 files changed, 76 insertions(+), 26 deletions(-) create mode 100644 test/ex_aws/event_stream/header_test.exs diff --git a/lib/ex_aws/event_stream/header.ex b/lib/ex_aws/event_stream/header.ex index 7cec6845..1cf478d9 100644 --- a/lib/ex_aws/event_stream/header.ex +++ b/lib/ex_aws/event_stream/header.ex @@ -1,38 +1,44 @@ defmodule ExAws.EventStream.Header do + @moduledoc """ + Parses EventStream headers. + + AWS encodes EventStream headers as follows: + + [header-name-size][header-name][header-data-type][header-value-size][header-value-data] + |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| + + This module parses this information and returns a map of header names - values. + header-data-type is always 0x07(String) for S3. + """ alias ExAws.EventStream.Prelude - defp extract_header_bytes(prelude, payload_bytes) do - binary_part( - payload_bytes, - Prelude.prelude_length(), - prelude.headers_end - Prelude.prelude_length() - ) + def extract_headers(header_bytes) do + do_extract_headers(header_bytes, []) end - def extract_headers(<<>>, headers) do - Map.new(headers) - end + defp do_extract_headers(<<>>, headers), do: Map.new(headers) - def extract_headers(<>, headers) do - header_name = binary_part(rest, 0, header_name_size) - # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html - # +1 and -1 for ignoring first byte here (Always = 0x07) for S3 - # TODO: Support other header data types - rest = - binary_part(rest, header_name_size + 1, byte_size(rest) - header_name_size - 1) + defp do_extract_headers(<>, headers) do + <> = rest + <> = rest + + value_size = :binary.decode_unsigned(value_size_binary, :big) + <> = rest + + do_extract_headers(rest, [{header_name, value} | headers]) + end - value_size = rest |> binary_part(0, 2) |> :binary.decode_unsigned(:big) + defp extract_header_bytes(headers_end, payload_bytes) do + prelude_length = Prelude.prelude_length() - value = - binary_part(rest, 2, value_size) + <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_end), + _payload::binary>> = payload_bytes - rest - |> binary_part(2 + value_size, byte_size(rest) - 2 - value_size) - |> extract_headers([{header_name, value} | headers]) + headers_bytes end - def parse(prelude, payload_bytes) do - headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers([]) + def parse(%Prelude{headers_end: headers_end, headers_length: headers_length}, payload_bytes) do + headers = headers_length |> extract_header_bytes(payload_bytes) |> extract_headers() {:ok, headers} end end diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex index ac042f7d..d1bf5a08 100644 --- a/lib/ex_aws/event_stream/message.ex +++ b/lib/ex_aws/event_stream/message.ex @@ -46,13 +46,12 @@ defmodule ExAws.EventStream.Message do end def parse(chunk) do - dbg(chunk) with {:ok, prelude, payload_bytes} <- Prelude.parse(chunk), :ok <- verify_message_crc(prelude, payload_bytes), {:ok, headers} <- Header.parse(prelude, payload_bytes), {:ok, payload} <- parse_payload(prelude, payload_bytes) do - {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} |> dbg() + {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} end end end diff --git a/test/ex_aws/event_stream/header_test.exs b/test/ex_aws/event_stream/header_test.exs new file mode 100644 index 00000000..e75efa09 --- /dev/null +++ b/test/ex_aws/event_stream/header_test.exs @@ -0,0 +1,45 @@ +defmodule ExAws.AuthTest do + use ExUnit.Case, async: true + + alias ExAws.EventStream.Header + + describe "Header.extract_headers/1" do + test "can extract Record type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 111, 99, + 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, 118, 101, 110, 116, + 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "application/octet-stream", + ":event-type" => "Records" + } + end + + test "can extract Stats type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, + 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, 11, 58, 101, 118, 101, 110, + 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115>> + ) == %{ + ":message-type" => "event", + ":content-type" => "text/xml", + ":event-type" => "Stats" + } + end + + test "can extract End type event header" do + assert Header.extract_headers( + <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, + 118, 101, 110, 116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, + 0, 3, 69, 110, 100>> + ) == %{ + ":message-type" => "event", + ":event-type" => "End" + } + end + end +end From dbe713b3c743b3b2095050c3d10b81280f7612ee Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 6 Nov 2023 09:36:53 -0800 Subject: [PATCH 03/13] Refactored parse function in Header module --- lib/ex_aws/event_stream/header.ex | 2 +- lib/ex_aws/event_stream/message.ex | 20 +++++++------------- lib/ex_aws/event_stream/prelude.ex | 11 ++++++++--- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/lib/ex_aws/event_stream/header.ex b/lib/ex_aws/event_stream/header.ex index 1cf478d9..48582a38 100644 --- a/lib/ex_aws/event_stream/header.ex +++ b/lib/ex_aws/event_stream/header.ex @@ -37,7 +37,7 @@ defmodule ExAws.EventStream.Header do headers_bytes end - def parse(%Prelude{headers_end: headers_end, headers_length: headers_length}, payload_bytes) do + def parse(%Prelude{headers_length: headers_length}, payload_bytes) do headers = headers_length |> extract_header_bytes(payload_bytes) |> extract_headers() {:ok, headers} end diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex index d1bf5a08..69076dc7 100644 --- a/lib/ex_aws/event_stream/message.ex +++ b/lib/ex_aws/event_stream/message.ex @@ -16,19 +16,13 @@ defmodule ExAws.EventStream.Message do end def verify_message_crc(prelude, payload_bytes) do - message_crc = - payload_bytes - |> binary_part(prelude.payload_end, 4) - |> :binary.decode_unsigned(:big) - - message_length = prelude.payload_end - (Prelude.prelude_length() - 4) - - message_bytes = - binary_part( - payload_bytes, - Prelude.prelude_length() - 4, - message_length - ) + prelude_length = Prelude.prelude_length() + message_length = prelude.payload_end - (prelude_length - 4) + + <<_::binary-size(8), message_bytes::binary-size(message_length), + message_crc_bytes::binary-size(4)>> = payload_bytes + + message_crc = :binary.decode_unsigned(message_crc_bytes, :big) validate_checksum(message_bytes, message_crc, prelude.crc) end diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex index 37fc1c1c..7eff19be 100644 --- a/lib/ex_aws/event_stream/prelude.ex +++ b/lib/ex_aws/event_stream/prelude.ex @@ -43,12 +43,17 @@ defmodule ExAws.EventStream.Prelude do }} end - def validate_prelude(prelude) do + def validate_prelude( + %__MODULE__{ + headers_length: headers_length, + payload_length: payload_length + } = prelude + ) do cond do - prelude.headers_length > @max_header_length -> + headers_length > @max_header_length -> {:error, :invalid_headers_length} - prelude.payload_length > @max_payload_length -> + payload_length > @max_payload_length -> {:error, :invalid_payload_length} true -> From 4c18dcdc9dff8d3fb6caf4683e034245a0a644f4 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 6 Nov 2023 16:44:28 -0800 Subject: [PATCH 04/13] Refactored the extract_headers and extract_header_bytes methods to improve readability --- lib/ex_aws/event_stream/header.ex | 21 ++++++++++------ lib/ex_aws/event_stream/message.ex | 40 +++++++++++++++++++----------- lib/ex_aws/event_stream/prelude.ex | 35 ++++++++++---------------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/lib/ex_aws/event_stream/header.ex b/lib/ex_aws/event_stream/header.ex index 48582a38..e26d2f11 100644 --- a/lib/ex_aws/event_stream/header.ex +++ b/lib/ex_aws/event_stream/header.ex @@ -19,7 +19,7 @@ defmodule ExAws.EventStream.Header do defp do_extract_headers(<<>>, headers), do: Map.new(headers) defp do_extract_headers(<>, headers) do - <> = rest + <>, rest::binary>> = rest <> = rest value_size = :binary.decode_unsigned(value_size_binary, :big) @@ -28,17 +28,24 @@ defmodule ExAws.EventStream.Header do do_extract_headers(rest, [{header_name, value} | headers]) end - defp extract_header_bytes(headers_end, payload_bytes) do - prelude_length = Prelude.prelude_length() - - <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_end), + defp extract_header_bytes( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_length), _payload::binary>> = payload_bytes headers_bytes end - def parse(%Prelude{headers_length: headers_length}, payload_bytes) do - headers = headers_length |> extract_header_bytes(payload_bytes) |> extract_headers() + def parse( + %Prelude{} = prelude, + payload_bytes + ) do + headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers() {:ok, headers} end end diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex index 69076dc7..6252e2a0 100644 --- a/lib/ex_aws/event_stream/message.ex +++ b/lib/ex_aws/event_stream/message.ex @@ -7,7 +7,19 @@ defmodule ExAws.EventStream.Message do headers: nil, payload: nil - def validate_checksum(message_bytes, message_crc, prelude_crc) do + def verify_message_crc( + %Prelude{ + crc: prelude_crc, + payload_length: payload_length, + headers_length: headers_length + }, + payload_bytes + ) do + <<_::binary-size(8), message_bytes::binary-size(payload_length + headers_length + 4), + message_crc_bytes::binary-size(4)>> = payload_bytes + + message_crc = :binary.decode_unsigned(message_crc_bytes, :big) + if :erlang.crc32(prelude_crc, message_bytes) |> band(0xFFFFFFFF) == message_crc do :ok else @@ -15,20 +27,18 @@ defmodule ExAws.EventStream.Message do end end - def verify_message_crc(prelude, payload_bytes) do - prelude_length = Prelude.prelude_length() - message_length = prelude.payload_end - (prelude_length - 4) - - <<_::binary-size(8), message_bytes::binary-size(message_length), - message_crc_bytes::binary-size(4)>> = payload_bytes - - message_crc = :binary.decode_unsigned(message_crc_bytes, :big) - - validate_checksum(message_bytes, message_crc, prelude.crc) - end - - def parse_payload(prelude, payload_bytes) do - {:ok, binary_part(payload_bytes, prelude.headers_end, prelude.payload_length)} + def parse_payload( + %Prelude{ + prelude_length: prelude_length, + headers_length: headers_length, + payload_length: payload_length + }, + payload_bytes + ) do + <<_prelude_headers::binary-size(prelude_length + headers_length), + message_bytes::binary-size(payload_length), _::binary>> = payload_bytes + + {:ok, message_bytes} end def is_record?(%__MODULE__{headers: headers}) do diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex index 7eff19be..0a8ee479 100644 --- a/lib/ex_aws/event_stream/prelude.ex +++ b/lib/ex_aws/event_stream/prelude.ex @@ -1,10 +1,9 @@ defmodule ExAws.EventStream.Prelude do defstruct total_length: nil, headers_length: nil, + prelude_length: nil, crc: nil, payload_length: nil, - payload_end: nil, - headers_end: nil, prelude_bytes: nil @prelude_length 12 @@ -13,32 +12,24 @@ defmodule ExAws.EventStream.Prelude do # 16 Mb @max_payload_length 16 * 1024 * 1024 - def prelude_length(), do: @prelude_length - defp unpack_prelude( << - total_length::binary-size(4), - headers_length::binary-size(4), + total_length_bytes::binary-size(4), + headers_length_bytes::binary-size(4), crc::binary-size(4) >> = prelude_bytes ) do - total_length = :binary.decode_unsigned(total_length, :big) - headers_length = :binary.decode_unsigned(headers_length, :big) + total_length = :binary.decode_unsigned(total_length_bytes, :big) + headers_length = :binary.decode_unsigned(headers_length_bytes, :big) crc = :binary.decode_unsigned(crc, :big) - # The extra minus 4 bytes is for the message CRC. - payload_length = total_length - @prelude_length - headers_length - 4 - payload_end = total_length - 4 - headers_end = @prelude_length + headers_length - {:ok, %__MODULE__{ total_length: total_length, headers_length: headers_length, + prelude_length: @prelude_length, + payload_length: total_length - @prelude_length - headers_length - 4, crc: crc, - payload_length: payload_length, - payload_end: payload_end, - headers_end: headers_end, prelude_bytes: prelude_bytes }} end @@ -61,11 +52,11 @@ defmodule ExAws.EventStream.Prelude do end end - def validate_checksum(%__MODULE__{ - prelude_bytes: <>, - crc: checksum - }) do - if :erlang.crc32(prelude_bytes) == checksum do + def validate_checksum( + <>, + prelude_checksum + ) do + if :erlang.crc32(prelude_bytes_without_crc) == prelude_checksum do :ok else {:error, :prelude_checksum_mismatch} @@ -75,7 +66,7 @@ defmodule ExAws.EventStream.Prelude do def parse(<> = payload) do with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), {:ok, prelude} <- validate_prelude(unpacked_prelude), - :ok <- validate_checksum(prelude) do + :ok <- validate_checksum(prelude_bytes, unpacked_prelude.crc) do {:ok, prelude, payload} end end From 97ae9b7b68f246001cce20fd37a1694c248ecfdc Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 6 Nov 2023 16:53:46 -0800 Subject: [PATCH 05/13] Added Bitwise import and updated prelude checksum calculation --- lib/ex_aws/event_stream/prelude.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex index 0a8ee479..bc6be804 100644 --- a/lib/ex_aws/event_stream/prelude.ex +++ b/lib/ex_aws/event_stream/prelude.ex @@ -1,4 +1,6 @@ defmodule ExAws.EventStream.Prelude do + import Bitwise + defstruct total_length: nil, headers_length: nil, prelude_length: nil, @@ -56,7 +58,9 @@ defmodule ExAws.EventStream.Prelude do <>, prelude_checksum ) do - if :erlang.crc32(prelude_bytes_without_crc) == prelude_checksum do + computed_checksum = prelude_bytes_without_crc |> :erlang.crc32() |> band(0xFFFFFFFF) + + if computed_checksum == prelude_checksum do :ok else {:error, :prelude_checksum_mismatch} From 2f9a92b17948c3e36979170e73717914e38ebd77 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 6 Nov 2023 17:05:38 -0800 Subject: [PATCH 06/13] Added detailed module documentation for EventStream --- lib/ex_aws/event_stream.ex | 21 +++++++++++++++++++-- lib/ex_aws/event_stream/message.ex | 6 +++++- lib/ex_aws/event_stream/prelude.ex | 10 ++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/lib/ex_aws/event_stream.ex b/lib/ex_aws/event_stream.ex index 67182f5b..6acbf23f 100644 --- a/lib/ex_aws/event_stream.ex +++ b/lib/ex_aws/event_stream.ex @@ -1,6 +1,23 @@ defmodule ExAws.EventStream do - # https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html - # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html + @moduledoc """ + Parses EventStream messages. + + AWS encodes EventStream messages in binary as follows: + [ prelude ][ headers ][ payload ][ message-crc ] + |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| + + This module parses this information and returns a struct with the prelude, headers and payload. + + The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + + The headers are a map of header names to values. + + The payload is the actual message data. + + The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). + + Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. + """ alias ExAws.EventStream.Message diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex index 6252e2a0..e5bd36fe 100644 --- a/lib/ex_aws/event_stream/message.ex +++ b/lib/ex_aws/event_stream/message.ex @@ -1,4 +1,7 @@ defmodule ExAws.EventStream.Message do + @moduledoc """ + Parses EventStream messages. This module parses this information and returns a struct with the prelude, headers and payload. Also verifies the message CRC. + """ alias ExAws.EventStream.Prelude alias ExAws.EventStream.Header import Bitwise @@ -19,8 +22,9 @@ defmodule ExAws.EventStream.Message do message_crc_bytes::binary-size(4)>> = payload_bytes message_crc = :binary.decode_unsigned(message_crc_bytes, :big) + computed_crc = prelude_crc |> :erlang.crc32(message_bytes) |> band(0xFFFFFFFF) - if :erlang.crc32(prelude_crc, message_bytes) |> band(0xFFFFFFFF) == message_crc do + if computed_crc == message_crc do :ok else {:error, :message_checksum_mismatch} diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex index bc6be804..ac17e451 100644 --- a/lib/ex_aws/event_stream/prelude.ex +++ b/lib/ex_aws/event_stream/prelude.ex @@ -1,4 +1,14 @@ defmodule ExAws.EventStream.Prelude do + @moduledoc """ + Parses EventStream preludes. + + AWS encodes EventStream preludes in binary as follows: + + [ total-length ][headers-length][ prelude crc ] + |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| + + This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. + """ import Bitwise defstruct total_length: nil, From ca5732eccdf514ec2dc329a52d9fcee6641ce7e8 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 6 Nov 2023 17:42:58 -0800 Subject: [PATCH 07/13] Refactored Hackney request stream methods --- lib/ex_aws/request/hackney.ex | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/ex_aws/request/hackney.ex b/lib/ex_aws/request/hackney.ex index a73e027d..4f1d62a8 100644 --- a/lib/ex_aws/request/hackney.ex +++ b/lib/ex_aws/request/hackney.ex @@ -33,23 +33,19 @@ defmodule ExAws.Request.Hackney do def request_stream(method, url, body \\ "", headers \\ [], http_opts \\ []) do {:ok, headers} = headers - {:ok, status, headers, client} = begin_download(method, url, body, headers, http_opts) + {:ok, status, headers, client} = :hackney.request(method, url, headers, body, http_opts) stream = Stream.resource( fn -> client end, - &continue_download/1, - &finish_download/1 + &continue_stream/1, + &finish_stream/1 ) {:ok, status, headers, stream} end - defp begin_download(method, url, body, headers, http_opts) do - :hackney.request(method, url, headers, body, http_opts) - end - - defp continue_download(client) do + defp continue_stream(client) do case :hackney.stream_body(client) do {:ok, data} -> {[data], client} @@ -62,7 +58,7 @@ defmodule ExAws.Request.Hackney do end end - defp finish_download(client) do + defp finish_stream(client) do :hackney.close(client) end end From d24456a4d4d3eb83e0b6ac705a9c1746cf696d4b Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 13:47:18 -0800 Subject: [PATCH 08/13] Refactored ExAws stream function --- lib/ex_aws.ex | 15 +++- lib/ex_aws/event_stream.ex | 29 -------- lib/ex_aws/event_stream/header.ex | 51 -------------- lib/ex_aws/event_stream/message.ex | 65 ------------------ lib/ex_aws/event_stream/prelude.ex | 87 ------------------------ lib/ex_aws/operation/s3.ex | 7 +- lib/ex_aws/request.ex | 65 +++++++++--------- lib/ex_aws/request/hackney.ex | 37 +++++----- lib/ex_aws/request/http_client.ex | 9 +-- test/ex_aws/event_stream/header_test.exs | 45 ------------ 10 files changed, 71 insertions(+), 339 deletions(-) delete mode 100644 lib/ex_aws/event_stream.ex delete mode 100644 lib/ex_aws/event_stream/header.ex delete mode 100644 lib/ex_aws/event_stream/message.ex delete mode 100644 lib/ex_aws/event_stream/prelude.ex delete mode 100644 test/ex_aws/event_stream/header_test.exs diff --git a/lib/ex_aws.ex b/lib/ex_aws.ex index ea4fbfea..8c96212a 100644 --- a/lib/ex_aws.ex +++ b/lib/ex_aws.ex @@ -106,7 +106,20 @@ defmodule ExAws do @impl ExAws.Behaviour @spec stream!(ExAws.Operation.t(), keyword) :: Enumerable.t() def stream!(op, config_overrides \\ []) do - ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) + case ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) do + {:ok, result} -> + result + + %Stream{} = result -> + result + + error -> + raise ExAws.Error, """ + ExAws Stream Request Error! + + #{inspect(error)} + """ + end end @doc false diff --git a/lib/ex_aws/event_stream.ex b/lib/ex_aws/event_stream.ex deleted file mode 100644 index 6acbf23f..00000000 --- a/lib/ex_aws/event_stream.ex +++ /dev/null @@ -1,29 +0,0 @@ -defmodule ExAws.EventStream do - @moduledoc """ - Parses EventStream messages. - - AWS encodes EventStream messages in binary as follows: - [ prelude ][ headers ][ payload ][ message-crc ] - |<-- 12 bytes -->|<-- variable -->|<-- variable -->|<-- 4 bytes -->| - - This module parses this information and returns a struct with the prelude, headers and payload. - - The prelude contains the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. - - The headers are a map of header names to values. - - The payload is the actual message data. - - The message-crc is a CRC32 checksum of the message (excluding the message-crc itself). - - Refer to https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html for more information. - """ - - alias ExAws.EventStream.Message - - def parse_message(chunk) do - with {:ok, message} <- Message.parse(chunk) do - message - end - end -end diff --git a/lib/ex_aws/event_stream/header.ex b/lib/ex_aws/event_stream/header.ex deleted file mode 100644 index e26d2f11..00000000 --- a/lib/ex_aws/event_stream/header.ex +++ /dev/null @@ -1,51 +0,0 @@ -defmodule ExAws.EventStream.Header do - @moduledoc """ - Parses EventStream headers. - - AWS encodes EventStream headers as follows: - - [header-name-size][header-name][header-data-type][header-value-size][header-value-data] - |<-- 1 byte -->|<-variable->|<-- 1 byte -->|<-- 2 bytes -->|<-- variable -->| - - This module parses this information and returns a map of header names - values. - header-data-type is always 0x07(String) for S3. - """ - alias ExAws.EventStream.Prelude - - def extract_headers(header_bytes) do - do_extract_headers(header_bytes, []) - end - - defp do_extract_headers(<<>>, headers), do: Map.new(headers) - - defp do_extract_headers(<>, headers) do - <>, rest::binary>> = rest - <> = rest - - value_size = :binary.decode_unsigned(value_size_binary, :big) - <> = rest - - do_extract_headers(rest, [{header_name, value} | headers]) - end - - defp extract_header_bytes( - %Prelude{ - prelude_length: prelude_length, - headers_length: headers_length - }, - payload_bytes - ) do - <<_prelude::binary-size(prelude_length), headers_bytes::binary-size(headers_length), - _payload::binary>> = payload_bytes - - headers_bytes - end - - def parse( - %Prelude{} = prelude, - payload_bytes - ) do - headers = prelude |> extract_header_bytes(payload_bytes) |> extract_headers() - {:ok, headers} - end -end diff --git a/lib/ex_aws/event_stream/message.ex b/lib/ex_aws/event_stream/message.ex deleted file mode 100644 index e5bd36fe..00000000 --- a/lib/ex_aws/event_stream/message.ex +++ /dev/null @@ -1,65 +0,0 @@ -defmodule ExAws.EventStream.Message do - @moduledoc """ - Parses EventStream messages. This module parses this information and returns a struct with the prelude, headers and payload. Also verifies the message CRC. - """ - alias ExAws.EventStream.Prelude - alias ExAws.EventStream.Header - import Bitwise - - defstruct prelude: nil, - headers: nil, - payload: nil - - def verify_message_crc( - %Prelude{ - crc: prelude_crc, - payload_length: payload_length, - headers_length: headers_length - }, - payload_bytes - ) do - <<_::binary-size(8), message_bytes::binary-size(payload_length + headers_length + 4), - message_crc_bytes::binary-size(4)>> = payload_bytes - - message_crc = :binary.decode_unsigned(message_crc_bytes, :big) - computed_crc = prelude_crc |> :erlang.crc32(message_bytes) |> band(0xFFFFFFFF) - - if computed_crc == message_crc do - :ok - else - {:error, :message_checksum_mismatch} - end - end - - def parse_payload( - %Prelude{ - prelude_length: prelude_length, - headers_length: headers_length, - payload_length: payload_length - }, - payload_bytes - ) do - <<_prelude_headers::binary-size(prelude_length + headers_length), - message_bytes::binary-size(payload_length), _::binary>> = payload_bytes - - {:ok, message_bytes} - end - - def is_record?(%__MODULE__{headers: headers}) do - Map.get(headers, ":event-type") == "Records" - end - - def get_payload(%__MODULE__{payload: payload}) do - payload - end - - def parse(chunk) do - with {:ok, prelude, payload_bytes} <- - Prelude.parse(chunk), - :ok <- verify_message_crc(prelude, payload_bytes), - {:ok, headers} <- Header.parse(prelude, payload_bytes), - {:ok, payload} <- parse_payload(prelude, payload_bytes) do - {:ok, %__MODULE__{prelude: prelude, payload: payload, headers: headers}} - end - end -end diff --git a/lib/ex_aws/event_stream/prelude.ex b/lib/ex_aws/event_stream/prelude.ex deleted file mode 100644 index ac17e451..00000000 --- a/lib/ex_aws/event_stream/prelude.ex +++ /dev/null @@ -1,87 +0,0 @@ -defmodule ExAws.EventStream.Prelude do - @moduledoc """ - Parses EventStream preludes. - - AWS encodes EventStream preludes in binary as follows: - - [ total-length ][headers-length][ prelude crc ] - |<-- 4 bytes -->|<-- 4 bytes -->|<-- 4 bytes -->| - - This module parses this information and returns a struct with the total length of the message, the length of the headers, the length of the prelude, the CRC of the message, and the length of the payload. - """ - import Bitwise - - defstruct total_length: nil, - headers_length: nil, - prelude_length: nil, - crc: nil, - payload_length: nil, - prelude_bytes: nil - - @prelude_length 12 - # 128 Kb - @max_header_length 128 * 1024 - # 16 Mb - @max_payload_length 16 * 1024 * 1024 - - defp unpack_prelude( - << - total_length_bytes::binary-size(4), - headers_length_bytes::binary-size(4), - crc::binary-size(4) - >> = prelude_bytes - ) do - total_length = :binary.decode_unsigned(total_length_bytes, :big) - headers_length = :binary.decode_unsigned(headers_length_bytes, :big) - crc = :binary.decode_unsigned(crc, :big) - - {:ok, - %__MODULE__{ - total_length: total_length, - headers_length: headers_length, - prelude_length: @prelude_length, - payload_length: total_length - @prelude_length - headers_length - 4, - crc: crc, - prelude_bytes: prelude_bytes - }} - end - - def validate_prelude( - %__MODULE__{ - headers_length: headers_length, - payload_length: payload_length - } = prelude - ) do - cond do - headers_length > @max_header_length -> - {:error, :invalid_headers_length} - - payload_length > @max_payload_length -> - {:error, :invalid_payload_length} - - true -> - {:ok, prelude} - end - end - - def validate_checksum( - <>, - prelude_checksum - ) do - computed_checksum = prelude_bytes_without_crc |> :erlang.crc32() |> band(0xFFFFFFFF) - - if computed_checksum == prelude_checksum do - :ok - else - {:error, :prelude_checksum_mismatch} - end - end - - def parse(<> = payload) do - with {:ok, unpacked_prelude} <- unpack_prelude(prelude_bytes), - {:ok, prelude} <- validate_prelude(unpacked_prelude), - :ok <- validate_checksum(prelude_bytes, unpacked_prelude.crc) do - {:ok, prelude, payload} - end - end -end diff --git a/lib/ex_aws/operation/s3.ex b/lib/ex_aws/operation/s3.ex index 4496dc4f..7a76c234 100644 --- a/lib/ex_aws/operation/s3.ex +++ b/lib/ex_aws/operation/s3.ex @@ -31,10 +31,9 @@ defmodule ExAws.Operation.S3 do {operation, config, url, body, headers, http_method} = build_request_params(operation, config) - ExAws.Request.request_stream(http_method, url, body, headers, config, operation.service) - |> Stream.map(&EventStream.parse_message/1) - |> Stream.filter(&EventStream.Message.is_record?/1) - |> Stream.map(&EventStream.Message.get_payload/1) + ExAws.Request.request(http_method, url, body, headers, config, operation.service, true) + |> ExAws.Request.default_aws_error() + |> operation.parser.() end def stream!(%{stream_builder: fun}, config), do: fun.(config) diff --git a/lib/ex_aws/request.ex b/lib/ex_aws/request.ex index a330033c..c3c56c64 100644 --- a/lib/ex_aws/request.ex +++ b/lib/ex_aws/request.ex @@ -9,30 +9,7 @@ defmodule ExAws.Request do @type error_t :: {:error, {:http_error, http_status, binary}} @type response_t :: success_t | error_t - def request_stream(method, url, data, headers, config, service) do - req_body = - case data do - [] -> "{}" - d when is_binary(d) -> d - _ -> config[:json_codec].encode!(data) - end - - safe_url = ExAws.Request.Url.sanitize(url, service) - full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) - - {:ok, _status, _headers, stream} = - config[:http_client].request_stream( - method, - safe_url, - req_body, - full_headers, - Map.get(config, :http_opts, []) - ) - - stream - end - - def request(http_method, url, data, headers, config, service) do + def request(http_method, url, data, headers, config, service, stream \\ false) do body = case data do [] -> "{}" @@ -40,13 +17,31 @@ defmodule ExAws.Request do _ -> config[:json_codec].encode!(data) end - request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1}) + request_and_retry(http_method, url, service, config, headers, body, stream, {:attempt, 1}) end - def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}), - do: {:error, reason} + def request_and_retry( + _method, + _url, + _service, + _config, + _headers, + _req_body, + _stream, + {:error, reason} + ), + do: {:error, reason} - def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do + def request_and_retry( + method, + url, + service, + config, + headers, + req_body, + stream, + {:attempt, attempt} + ) do full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body) with {:ok, full_headers} <- full_headers do @@ -58,7 +53,7 @@ defmodule ExAws.Request do ) end - case do_request(config, method, safe_url, req_body, full_headers, attempt, service) do + case do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do {:ok, %{status_code: status} = resp} when status in 200..299 or status == 304 -> {:ok, resp} @@ -76,6 +71,7 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) @@ -94,6 +90,7 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) @@ -109,13 +106,14 @@ defmodule ExAws.Request do config, headers, req_body, + stream, attempt_again?(attempt, reason, config) ) end end end - defp do_request(config, method, safe_url, req_body, full_headers, attempt, service) do + defp do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do telemetry_event = Map.get(config, :telemetry_event, [:ex_aws, :request]) telemetry_options = Map.get(config, :telemetry_options, []) @@ -134,7 +132,8 @@ defmodule ExAws.Request do safe_url, req_body, full_headers, - Map.get(config, :http_opts, []) + Map.get(config, :http_opts, []), + stream ) |> maybe_transform_response() @@ -234,6 +233,10 @@ defmodule ExAws.Request do |> :timer.sleep() end + def maybe_transform_response({:ok, %{status: status, stream: stream, headers: headers}}) do + {:ok, %{status_code: status, stream: stream, headers: headers}} + end + def maybe_transform_response({:ok, %{status: status, body: body, headers: headers}}) do # Req and Finch use status (rather than status_code) as a key. {:ok, %{status_code: status, body: body, headers: headers}} diff --git a/lib/ex_aws/request/hackney.ex b/lib/ex_aws/request/hackney.ex index 4f1d62a8..15ab826b 100644 --- a/lib/ex_aws/request/hackney.ex +++ b/lib/ex_aws/request/hackney.ex @@ -14,14 +14,30 @@ defmodule ExAws.Request.Hackney do @default_opts [recv_timeout: 30_000] - def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do - opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) - opts = http_opts ++ [:with_body | opts] + def request(method, url, body \\ "", headers \\ [], http_opts \\ [], stream \\ false) do + opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) ++ http_opts + + opts = + if stream do + opts + else + [:with_body | opts] + end case :hackney.request(method, url, headers, body, opts) do {:ok, status, headers} -> {:ok, %{status_code: status, headers: headers}} + {:ok, status, headers, client} when is_reference(client) -> + stream = + Stream.resource( + fn -> client end, + &continue_stream/1, + &finish_stream/1 + ) + + {:ok, %{status_code: status, headers: headers, stream: stream}} + {:ok, status, headers, body} -> {:ok, %{status_code: status, headers: headers, body: body}} @@ -30,21 +46,6 @@ defmodule ExAws.Request.Hackney do end end - def request_stream(method, url, body \\ "", headers \\ [], http_opts \\ []) do - {:ok, headers} = headers - - {:ok, status, headers, client} = :hackney.request(method, url, headers, body, http_opts) - - stream = - Stream.resource( - fn -> client end, - &continue_stream/1, - &finish_stream/1 - ) - - {:ok, status, headers, stream} - end - defp continue_stream(client) do case :hackney.stream_body(client) do {:ok, data} -> diff --git a/lib/ex_aws/request/http_client.ex b/lib/ex_aws/request/http_client.ex index 9029b56a..dfd714cc 100644 --- a/lib/ex_aws/request/http_client.ex +++ b/lib/ex_aws/request/http_client.ex @@ -59,13 +59,6 @@ defmodule ExAws.Request.HttpClient do ) :: {:ok, %{status_code: pos_integer, headers: any}} | {:ok, %{status_code: pos_integer, headers: any, body: binary}} + | {:ok, %{status_code: pos_integer, headers: any, stream: Enumerable.t()}} | {:error, %{reason: any}} - - @callback request_stream( - method :: http_method, - url :: binary, - req_body :: binary, - headers :: [{binary, binary}, ...], - http_opts :: term - ) :: Enumerable.t() end diff --git a/test/ex_aws/event_stream/header_test.exs b/test/ex_aws/event_stream/header_test.exs deleted file mode 100644 index e75efa09..00000000 --- a/test/ex_aws/event_stream/header_test.exs +++ /dev/null @@ -1,45 +0,0 @@ -defmodule ExAws.AuthTest do - use ExUnit.Case, async: true - - alias ExAws.EventStream.Header - - describe "Header.extract_headers/1" do - test "can extract Record type event header" do - assert Header.extract_headers( - <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, - 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, - 101, 7, 0, 24, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 111, 99, - 116, 101, 116, 45, 115, 116, 114, 101, 97, 109, 11, 58, 101, 118, 101, 110, 116, - 45, 116, 121, 112, 101, 7, 0, 7, 82, 101, 99, 111, 114, 100, 115>> - ) == %{ - ":message-type" => "event", - ":content-type" => "application/octet-stream", - ":event-type" => "Records" - } - end - - test "can extract Stats type event header" do - assert Header.extract_headers( - <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, - 118, 101, 110, 116, 13, 58, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, - 101, 7, 0, 8, 116, 101, 120, 116, 47, 120, 109, 108, 11, 58, 101, 118, 101, 110, - 116, 45, 116, 121, 112, 101, 7, 0, 5, 83, 116, 97, 116, 115>> - ) == %{ - ":message-type" => "event", - ":content-type" => "text/xml", - ":event-type" => "Stats" - } - end - - test "can extract End type event header" do - assert Header.extract_headers( - <<13, 58, 109, 101, 115, 115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, - 118, 101, 110, 116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, - 0, 3, 69, 110, 100>> - ) == %{ - ":message-type" => "event", - ":event-type" => "End" - } - end - end -end From a357742e76a137b2fc923b8b48b4796e83ba98ed Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 14:06:30 -0800 Subject: [PATCH 09/13] Removed unused alias in S3 operation module --- lib/ex_aws/operation/s3.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/ex_aws/operation/s3.ex b/lib/ex_aws/operation/s3.ex index 7a76c234..3279ec1b 100644 --- a/lib/ex_aws/operation/s3.ex +++ b/lib/ex_aws/operation/s3.ex @@ -2,7 +2,6 @@ defmodule ExAws.Operation.S3 do @moduledoc """ Holds data necessary for an operation on the S3 service. """ - alias ExAws.EventStream defstruct stream_builder: nil, parser: &Function.identity/1, From e44570ef8f8a036cea62f7063b74f42a510889a1 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 14:07:46 -0800 Subject: [PATCH 10/13] Changed stream_builder from event_stream to octet_stream --- lib/ex_aws/operation/s3.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ex_aws/operation/s3.ex b/lib/ex_aws/operation/s3.ex index 3279ec1b..ec4290dc 100644 --- a/lib/ex_aws/operation/s3.ex +++ b/lib/ex_aws/operation/s3.ex @@ -26,7 +26,7 @@ defmodule ExAws.Operation.S3 do |> operation.parser.() end - def stream!(%{stream_builder: :event_stream} = operation, config) do + def stream!(%{stream_builder: :octet_stream} = operation, config) do {operation, config, url, body, headers, http_method} = build_request_params(operation, config) From 54243816156ab67f55634d009597355ea401fb4e Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 14:13:05 -0800 Subject: [PATCH 11/13] Updated response handling in ExAws module --- lib/ex_aws.ex | 1 + lib/ex_aws/behaviour.ex | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/ex_aws.ex b/lib/ex_aws.ex index 8c96212a..12a48340 100644 --- a/lib/ex_aws.ex +++ b/lib/ex_aws.ex @@ -84,6 +84,7 @@ defmodule ExAws do def request!(op, config_overrides \\ []) do case request(op, config_overrides) do {:ok, result} -> + result error -> diff --git a/lib/ex_aws/behaviour.ex b/lib/ex_aws/behaviour.ex index c585f01d..28b98464 100644 --- a/lib/ex_aws/behaviour.ex +++ b/lib/ex_aws/behaviour.ex @@ -18,8 +18,8 @@ defmodule ExAws.Behaviour do @callback request!(ExAws.Operation.t(), Keyword.t()) :: term | no_return @doc "See `ExAws.stream!/2`." - @callback stream!(ExAws.Operation.t()) :: Enumerable.t() + @callback stream!(ExAws.Operation.t()) :: Enumerable.t() | {:ok, term} | {:error, term} @doc "See `ExAws.stream!/2`." - @callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() + @callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() | {:ok, term} | {:error, term} end From 0ba30d7f59bcf54ae817ffa003dbee492dac77c2 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Tue, 7 Nov 2023 14:14:01 -0800 Subject: [PATCH 12/13] Removed unnecessary blank line in ExAws module --- lib/ex_aws.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/ex_aws.ex b/lib/ex_aws.ex index 12a48340..8c96212a 100644 --- a/lib/ex_aws.ex +++ b/lib/ex_aws.ex @@ -84,7 +84,6 @@ defmodule ExAws do def request!(op, config_overrides \\ []) do case request(op, config_overrides) do {:ok, result} -> - result error -> From bc078affdd24360c50b846011aa11f663b150d89 Mon Sep 17 00:00:00 2001 From: Atul Vinayak Date: Mon, 13 Nov 2023 23:41:42 -0800 Subject: [PATCH 13/13] Simplified result processing in ExAws module --- lib/ex_aws.ex | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/ex_aws.ex b/lib/ex_aws.ex index 8c96212a..dc9fb11b 100644 --- a/lib/ex_aws.ex +++ b/lib/ex_aws.ex @@ -110,15 +110,8 @@ defmodule ExAws do {:ok, result} -> result - %Stream{} = result -> + result -> result - - error -> - raise ExAws.Error, """ - ExAws Stream Request Error! - - #{inspect(error)} - """ end end