From 214f10592430efbc1acfbed788776bcd12a5aa66 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 6 Jun 2018 10:53:36 +1000 Subject: [PATCH 1/9] Subscription prime support --- lib/absinthe.ex | 28 +++++++++-- lib/absinthe/blueprint/continuation.ex | 19 ++++++++ lib/absinthe/blueprint/result/list.ex | 6 ++- lib/absinthe/blueprint/result/object.ex | 6 ++- lib/absinthe/phase/document/result.ex | 9 +++- lib/absinthe/phase/subscription/prime.ex | 8 ++++ lib/absinthe/phase/subscription/result.ex | 41 +++++++++++++++- .../phase/subscription/subscribe_self.ex | 17 ++++--- lib/absinthe/pipeline.ex | 26 +++++++--- lib/absinthe/subscription.ex | 2 + test/absinthe/execution/subscription_test.exs | 48 ++++++++++++++++++- 11 files changed, 186 insertions(+), 24 deletions(-) create mode 100644 lib/absinthe/blueprint/continuation.ex create mode 100644 lib/absinthe/phase/subscription/prime.ex diff --git a/lib/absinthe.ex b/lib/absinthe.ex index 5ed484cdc2..dc95fd5d54 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -44,9 +44,12 @@ defmodule Absinthe do %{message: String.t()} | %{message: String.t(), locations: [%{line: pos_integer, column: integer}]} + @type continuation_t :: nil | [Continuation.t()] + @type result_t :: - %{data: nil | result_selection_t} - | %{data: nil | result_selection_t, errors: [result_error_t]} + %{required(:data) => nil | result_selection_t, + optional(:continuation) => continuation_t, + optional(:errors) => [result_error_t]} | %{errors: [result_error_t]} @type pipeline_modifier_fun :: (Absinthe.Pipeline.t(), Keyword.t() -> Absinthe.Pipeline.t()) @@ -98,7 +101,7 @@ defmodule Absinthe do pipeline_modifier: pipeline_modifier_fun() ] - @type run_result :: {:ok, result_t} | {:error, String.t()} + @type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()} @spec run( binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(), @@ -113,7 +116,23 @@ defmodule Absinthe do |> Absinthe.Pipeline.for_document(options) |> pipeline_modifier.(options) - case Absinthe.Pipeline.run(document, pipeline) do + document + |> Absinthe.Pipeline.run(pipeline) + |> build_result() + end + + @spec continue([Continuation.t()]) :: run_result() + def continue(continuation) do + continuation + |> Absinthe.Pipeline.continue() + |> build_result() + end + + defp build_result(output) do + case output do + {:ok, %{result: %{continuation: c} = result}, _phases} when c != [] -> + {:more, result} + {:ok, %{result: result}, _phases} -> {:ok, result} @@ -137,6 +156,7 @@ defmodule Absinthe do def run!(input, schema, options \\ []) do case run(input, schema, options) do {:ok, result} -> result + {:more, result} -> result {:error, err} -> raise ExecutionError, message: err end end diff --git a/lib/absinthe/blueprint/continuation.ex b/lib/absinthe/blueprint/continuation.ex new file mode 100644 index 0000000000..d581c88bad --- /dev/null +++ b/lib/absinthe/blueprint/continuation.ex @@ -0,0 +1,19 @@ +defmodule Absinthe.Blueprint.Continuation do + @moduledoc false + + # Continuations allow further resolutions after the initial result is + # returned + + alias Absinthe.Pipeline + + defstruct [ + :phase_input, + :pipeline + ] + + @type t :: %__MODULE__{ + phase_input: Pipeline.data_t, + pipeline: Pipeline.t() + } + +end diff --git a/lib/absinthe/blueprint/result/list.ex b/lib/absinthe/blueprint/result/list.ex index 40d41896e9..25e50e4e7d 100644 --- a/lib/absinthe/blueprint/result/list.ex +++ b/lib/absinthe/blueprint/result/list.ex @@ -9,7 +9,8 @@ defmodule Absinthe.Blueprint.Result.List do :values, errors: [], flags: %{}, - extensions: %{} + extensions: %{}, + continuations: [] ] @type t :: %__MODULE__{ @@ -17,6 +18,7 @@ defmodule Absinthe.Blueprint.Result.List do values: [Blueprint.Execution.node_t()], errors: [Phase.Error.t()], flags: Blueprint.flags_t(), - extensions: %{any => any} + extensions: %{any => any}, + continuations: [Continuation.t()] } end diff --git a/lib/absinthe/blueprint/result/object.ex b/lib/absinthe/blueprint/result/object.ex index 745dc16062..c9acf60b11 100644 --- a/lib/absinthe/blueprint/result/object.ex +++ b/lib/absinthe/blueprint/result/object.ex @@ -10,7 +10,8 @@ defmodule Absinthe.Blueprint.Result.Object do :fields, errors: [], flags: %{}, - extensions: %{} + extensions: %{}, + continuations: [] ] @type t :: %__MODULE__{ @@ -18,6 +19,7 @@ defmodule Absinthe.Blueprint.Result.Object do fields: [Blueprint.Execution.node_t()], errors: [Phase.Error.t()], flags: Blueprint.flags_t(), - extensions: %{any => any} + extensions: %{any => any}, + continuations: [Continuation.t()] } end diff --git a/lib/absinthe/phase/document/result.ex b/lib/absinthe/phase/document/result.ex index 651555b78d..3598005633 100644 --- a/lib/absinthe/phase/document/result.ex +++ b/lib/absinthe/phase/document/result.ex @@ -25,7 +25,9 @@ defmodule Absinthe.Phase.Document.Result do {:validation_failed, errors} end - format_result(result, opts) + result + |> format_result(opts) + |> maybe_add_continuations(blueprint.execution.result) end defp format_result({:ok, {data, []}}, _) do @@ -156,4 +158,9 @@ defmodule Absinthe.Phase.Document.Result do end defp format_location(_), do: [] + + defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [], + do: Map.put(result, :continuation, continuations) + + defp maybe_add_continuations(result, _), do: result end diff --git a/lib/absinthe/phase/subscription/prime.ex b/lib/absinthe/phase/subscription/prime.ex new file mode 100644 index 0000000000..4b0835a2ad --- /dev/null +++ b/lib/absinthe/phase/subscription/prime.ex @@ -0,0 +1,8 @@ +defmodule Absinthe.Phase.Subscription.Prime do + @moduledoc false + + @spec run(any(), Keyword.t()) :: Phase.result_t() + def run(blueprint, [prime_result: cr]) do + {:ok, put_in(blueprint.execution.root_value, cr)} + end +end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index 7dbe61eb8c..465c98fe36 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -5,10 +5,47 @@ defmodule Absinthe.Phase.Subscription.Result do # subscription alias Absinthe.Blueprint + alias Absinthe.Blueprint.Continuation @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} - def run(blueprint, topic: topic) do + def run(blueprint, options) do + topic = Keyword.get(options, :topic) + prime = Keyword.get(options, :prime) result = %{"subscribed" => topic} - {:ok, put_in(blueprint.result, result)} + case prime do + nil -> + {:ok, put_in(blueprint.result, result)} + + prime_fun when is_function(prime_fun, 0) -> + {:ok, prime_results} = prime_fun.() + + result = + if prime_results != [] do + continuations = + Enum.map(prime_results, fn cr -> + %Continuation{ + phase_input: blueprint, + pipeline: [ + {Absinthe.Phase.Subscription.Prime, [prime_result: cr]}, + {Absinthe.Phase.Document.Execution.Resolution, options}, + Absinthe.Phase.Document.Result + ] + } + end) + + Map.put(result, :continuation, continuations) + else + result + end + + {:ok, put_in(blueprint.result, result)} + + val -> + raise """ + Invalid prime function. Must be a function of arity 0. + + #{inspect(val)} + """ + end end end diff --git a/lib/absinthe/phase/subscription/subscribe_self.ex b/lib/absinthe/phase/subscription/subscribe_self.ex index 662ee69365..4b79f48e9f 100644 --- a/lib/absinthe/phase/subscription/subscribe_self.ex +++ b/lib/absinthe/phase/subscription/subscribe_self.ex @@ -22,13 +22,13 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do %{selections: [field]} = op with {:ok, config} <- get_config(field, context, blueprint) do - field_keys = get_field_keys(field, config) + {field_keys, prime} = get_field_keys(field, config) subscription_id = get_subscription_id(config, blueprint, options) Absinthe.Subscription.subscribe(pubsub, field_keys, subscription_id, blueprint) pipeline = [ - {Phase.Subscription.Result, topic: subscription_id}, + {Phase.Subscription.Result, topic: subscription_id, prime: prime}, {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} ] @@ -97,8 +97,9 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do defp get_field_keys(%{schema_node: schema_node} = _field, config) do name = schema_node.identifier - find_field_keys!(config) - |> Enum.map(fn key -> {name, key} end) + {keys, prime} = find_field_keys!(config) + field_keys = Enum.map(keys, fn key -> {name, key} end) + {field_keys, prime} end defp ensure_pubsub!(context) do @@ -133,8 +134,12 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do """ val -> - List.wrap(val) - |> Enum.map(&to_string/1) + topics = List.wrap(val) + |> Enum.map(&to_string/1) + + prime = config[:prime] || nil + + {topics, prime} end end diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index 16c7d1c2ed..3698a964a1 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -12,23 +12,39 @@ defmodule Absinthe.Pipeline do * See `Absinthe.Schema` on adjusting the schema pipeline for schema manipulation. """ + alias Absinthe.Blueprint.Continuation alias Absinthe.Phase @type data_t :: any + @type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()} @type t :: [phase_config_t | [phase_config_t]] - @spec run(data_t, t) :: - {:ok, data_t, [Phase.t()]} - | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @spec run(data_t, t) :: run_result_t def run(input, pipeline) do pipeline |> List.flatten() |> run_phase(input) end + @spec continue([Continuation.t()]) :: run_result_t + def continue([continuation | rest]) do + result = run_phase(continuation.pipeline, continuation.phase_input) + + case result do + {:ok, blueprint, phases} when rest == [] -> + {:ok, blueprint, phases} + {:ok, blueprint, phases} -> + bp_result = Map.put(blueprint.result, :continuation, rest) + blueprint = Map.put(blueprint, :result, bp_result) + {:ok, blueprint, phases} + error -> error + end + end + @defaults [ adapter: Absinthe.Adapter.LanguageConventions, operation_name: nil, @@ -393,9 +409,7 @@ defmodule Absinthe.Pipeline do end) end - @spec run_phase(t, data_t, [Phase.t()]) :: - {:ok, data_t, [Phase.t()]} - | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @spec run_phase(t, data_t, [Phase.t()]) :: run_result_t def run_phase(pipeline, input, done \\ []) def run_phase([], input, done) do diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 694ba75bfe..79683c1f76 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -82,6 +82,8 @@ defmodule Absinthe.Subscription do @type subscription_field_spec :: {atom, term | (term -> term)} + @type prime_fun :: (-> {:ok, [map()]}) + @doc """ Publish a mutation diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index ddf1e5c002..3583acd247 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -196,6 +196,7 @@ defmodule Absinthe.Execution.SubscriptionTest do end end + field :other_user, :user do arg :id, :id @@ -226,6 +227,21 @@ defmodule Absinthe.Execution.SubscriptionTest do {:error, %{message: "failed", extensions: %{code: "FAILED"}}} end end + + field :prime, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn -> + {:ok, Enum.map(args.prime_data, &(%{id: "some_id", name: &1}))} + end + } + end + end end mutation do @@ -900,6 +916,36 @@ defmodule Absinthe.Execution.SubscriptionTest do } == msg end + @query """ + subscription ($clientId: ID!, $primeData: [String]) { + prime(clientId: $clientId, primeData: $primeData) { + name + } + } + """ + test "subscription with priming" do + client_id = "xyz" + prime_data = ["name1", "name2"] + + assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + run_subscription( + @query, + Schema, + variables: %{ + "primeData" => prime_data, + "clientId" => client_id + } + ) + + assert {:more, %{ + data: %{"prime" => %{"name" => "name1"}}, + continuation: continuation}} + = Absinthe.continue(continuation) + + assert {:ok, %{data: %{"prime" => %{"name" => "name2"}}}} + = Absinthe.continue(continuation) + end + def run_subscription(query, schema, opts \\ []) do opts = Keyword.update( @@ -910,7 +956,7 @@ defmodule Absinthe.Execution.SubscriptionTest do ) case run(query, schema, opts) do - {:ok, %{"subscribed" => topic}} = val -> + {response, %{"subscribed" => topic}} = val when response == :ok or response == :more -> opts[:context][:pubsub].subscribe(topic) val From 400df7a992bd9ba1df02d4c0d08309bed6abebf8 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Mon, 2 May 2022 13:59:26 +1000 Subject: [PATCH 2/9] Add subscription ordinal --- lib/absinthe.ex | 11 +- lib/absinthe/blueprint/continuation.ex | 7 +- lib/absinthe/blueprint/result/list.ex | 2 +- lib/absinthe/blueprint/result/object.ex | 2 +- lib/absinthe/phase/document/result.ex | 2 +- .../phase/subscription/get_ordinal.ex | 42 ++++++ lib/absinthe/phase/subscription/prime.ex | 4 +- lib/absinthe/phase/subscription/result.ex | 57 ++++---- .../phase/subscription/subscribe_self.ex | 27 ++-- lib/absinthe/pipeline.ex | 6 +- lib/absinthe/subscription.ex | 2 +- lib/absinthe/subscription/local.ex | 5 +- test/absinthe/execution/subscription_test.exs | 122 +++++++++++++++--- 13 files changed, 216 insertions(+), 73 deletions(-) create mode 100644 lib/absinthe/phase/subscription/get_ordinal.ex diff --git a/lib/absinthe.ex b/lib/absinthe.ex index dc95fd5d54..ecb55a9939 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -44,12 +44,15 @@ defmodule Absinthe do %{message: String.t()} | %{message: String.t(), locations: [%{line: pos_integer, column: integer}]} - @type continuation_t :: nil | [Continuation.t()] + @type continuation_t :: nil | [Absinthe.Blueprint.Continuation.t()] @type result_t :: - %{required(:data) => nil | result_selection_t, + %{ + required(:data) => nil | result_selection_t, + optional(:ordinal) => term(), optional(:continuation) => continuation_t, - optional(:errors) => [result_error_t]} + optional(:errors) => [result_error_t] + } | %{errors: [result_error_t]} @type pipeline_modifier_fun :: (Absinthe.Pipeline.t(), Keyword.t() -> Absinthe.Pipeline.t()) @@ -121,7 +124,7 @@ defmodule Absinthe do |> build_result() end - @spec continue([Continuation.t()]) :: run_result() + @spec continue([Absinthe.Blueprint.Continuation.t()]) :: run_result() def continue(continuation) do continuation |> Absinthe.Pipeline.continue() diff --git a/lib/absinthe/blueprint/continuation.ex b/lib/absinthe/blueprint/continuation.ex index d581c88bad..9c5e1198c9 100644 --- a/lib/absinthe/blueprint/continuation.ex +++ b/lib/absinthe/blueprint/continuation.ex @@ -12,8 +12,7 @@ defmodule Absinthe.Blueprint.Continuation do ] @type t :: %__MODULE__{ - phase_input: Pipeline.data_t, - pipeline: Pipeline.t() - } - + phase_input: Pipeline.data_t(), + pipeline: Pipeline.t() + } end diff --git a/lib/absinthe/blueprint/result/list.ex b/lib/absinthe/blueprint/result/list.ex index 25e50e4e7d..62df401a5c 100644 --- a/lib/absinthe/blueprint/result/list.ex +++ b/lib/absinthe/blueprint/result/list.ex @@ -19,6 +19,6 @@ defmodule Absinthe.Blueprint.Result.List do errors: [Phase.Error.t()], flags: Blueprint.flags_t(), extensions: %{any => any}, - continuations: [Continuation.t()] + continuations: [Blueprint.Continuation.t()] } end diff --git a/lib/absinthe/blueprint/result/object.ex b/lib/absinthe/blueprint/result/object.ex index c9acf60b11..3e1ab3f506 100644 --- a/lib/absinthe/blueprint/result/object.ex +++ b/lib/absinthe/blueprint/result/object.ex @@ -20,6 +20,6 @@ defmodule Absinthe.Blueprint.Result.Object do errors: [Phase.Error.t()], flags: Blueprint.flags_t(), extensions: %{any => any}, - continuations: [Continuation.t()] + continuations: [Blueprint.Continuation.t()] } end diff --git a/lib/absinthe/phase/document/result.ex b/lib/absinthe/phase/document/result.ex index 3598005633..b71cdedb86 100644 --- a/lib/absinthe/phase/document/result.ex +++ b/lib/absinthe/phase/document/result.ex @@ -160,7 +160,7 @@ defmodule Absinthe.Phase.Document.Result do defp format_location(_), do: [] defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [], - do: Map.put(result, :continuation, continuations) + do: Map.put(result, :continuation, continuations) defp maybe_add_continuations(result, _), do: result end diff --git a/lib/absinthe/phase/subscription/get_ordinal.ex b/lib/absinthe/phase/subscription/get_ordinal.ex new file mode 100644 index 0000000000..5dc3eb30eb --- /dev/null +++ b/lib/absinthe/phase/subscription/get_ordinal.ex @@ -0,0 +1,42 @@ +defmodule Absinthe.Phase.Subscription.GetOrdinal do + use Absinthe.Phase + + alias Absinthe.Phase.Subscription.SubscribeSelf + + @moduledoc false + + alias Absinthe.Blueprint + + @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} + def run(blueprint, _options \\ []) do + op = Blueprint.current_operation(blueprint) + + if op.type == :subscription do + {:ok, + %{blueprint | result: Map.put(blueprint.result, :ordinal, get_ordinal(op, blueprint))}} + else + {:ok, blueprint} + end + end + + defp get_ordinal(op, blueprint) do + %{selections: [field]} = op + {:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint) + + case config[:ordinal] do + nil -> + nil + + fun when is_function(fun, 1) -> + fun.(blueprint.execution.root_value) + + _fun -> + IO.write( + :stderr, + "Ordinal function must be 1-arity" + ) + + nil + end + end +end diff --git a/lib/absinthe/phase/subscription/prime.ex b/lib/absinthe/phase/subscription/prime.ex index 4b0835a2ad..c3e08252d8 100644 --- a/lib/absinthe/phase/subscription/prime.ex +++ b/lib/absinthe/phase/subscription/prime.ex @@ -1,8 +1,8 @@ defmodule Absinthe.Phase.Subscription.Prime do @moduledoc false - @spec run(any(), Keyword.t()) :: Phase.result_t() - def run(blueprint, [prime_result: cr]) do + @spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t() + def run(blueprint, prime_result: cr) do {:ok, put_in(blueprint.execution.root_value, cr)} end end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index 465c98fe36..fca8d153c9 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -6,46 +6,53 @@ defmodule Absinthe.Phase.Subscription.Result do alias Absinthe.Blueprint alias Absinthe.Blueprint.Continuation + alias Absinthe.Phase @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} def run(blueprint, options) do - topic = Keyword.get(options, :topic) + topic = Keyword.fetch!(options, :topic) prime = Keyword.get(options, :prime) result = %{"subscribed" => topic} + case prime do nil -> {:ok, put_in(blueprint.result, result)} - prime_fun when is_function(prime_fun, 0) -> - {:ok, prime_results} = prime_fun.() - - result = - if prime_results != [] do - continuations = - Enum.map(prime_results, fn cr -> - %Continuation{ - phase_input: blueprint, - pipeline: [ - {Absinthe.Phase.Subscription.Prime, [prime_result: cr]}, - {Absinthe.Phase.Document.Execution.Resolution, options}, - Absinthe.Phase.Document.Result - ] - } - end) - - Map.put(result, :continuation, continuations) - else - result - end - - {:ok, put_in(blueprint.result, result)} + prime_fun when is_function(prime_fun, 1) -> + do_prime(prime_fun, result, blueprint, options) val -> raise """ - Invalid prime function. Must be a function of arity 0. + Invalid prime function. Must be a function of arity 1. #{inspect(val)} """ end end + + def do_prime(prime_fun, base_result, blueprint, options) do + {:ok, prime_results} = prime_fun.(blueprint.execution) + + result = + if prime_results != [] do + continuations = + Enum.map(prime_results, fn cr -> + %Continuation{ + phase_input: blueprint, + pipeline: [ + {Phase.Subscription.Prime, [prime_result: cr]}, + {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, + Phase.Document.Result + ] + } + end) + + Map.put(base_result, :continuation, continuations) + else + base_result + end + + {:ok, put_in(blueprint.result, result)} + end end diff --git a/lib/absinthe/phase/subscription/subscribe_self.ex b/lib/absinthe/phase/subscription/subscribe_self.ex index 4b79f48e9f..d2124578ea 100644 --- a/lib/absinthe/phase/subscription/subscribe_self.ex +++ b/lib/absinthe/phase/subscription/subscribe_self.ex @@ -22,13 +22,13 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do %{selections: [field]} = op with {:ok, config} <- get_config(field, context, blueprint) do - {field_keys, prime} = get_field_keys(field, config) + field_keys = get_field_keys(field, config) subscription_id = get_subscription_id(config, blueprint, options) Absinthe.Subscription.subscribe(pubsub, field_keys, subscription_id, blueprint) pipeline = [ - {Phase.Subscription.Result, topic: subscription_id, prime: prime}, + {Phase.Subscription.Result, topic: subscription_id, prime: config[:prime]}, {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} ] @@ -46,11 +46,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do end end - defp get_config( - %{schema_node: schema_node, argument_data: argument_data} = field, - context, - blueprint - ) do + def get_config( + %{schema_node: schema_node, argument_data: argument_data} = field, + context, + blueprint + ) do name = schema_node.identifier config = @@ -97,9 +97,8 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do defp get_field_keys(%{schema_node: schema_node} = _field, config) do name = schema_node.identifier - {keys, prime} = find_field_keys!(config) - field_keys = Enum.map(keys, fn key -> {name, key} end) - {field_keys, prime} + find_field_keys!(config) + |> Enum.map(fn key -> {name, key} end) end defp ensure_pubsub!(context) do @@ -134,12 +133,8 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do """ val -> - topics = List.wrap(val) - |> Enum.map(&to_string/1) - - prime = config[:prime] || nil - - {topics, prime} + List.wrap(val) + |> Enum.map(&to_string/1) end end diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index 3698a964a1..a64ab24c7b 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -37,11 +37,14 @@ defmodule Absinthe.Pipeline do case result do {:ok, blueprint, phases} when rest == [] -> {:ok, blueprint, phases} + {:ok, blueprint, phases} -> bp_result = Map.put(blueprint.result, :continuation, rest) blueprint = Map.put(blueprint, :result, bp_result) {:ok, blueprint, phases} - error -> error + + error -> + error end end @@ -132,6 +135,7 @@ defmodule Absinthe.Pipeline do # Execution {Phase.Subscription.SubscribeSelf, options}, {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, # Format Result Phase.Document.Result, {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 79683c1f76..05eab77f82 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -82,7 +82,7 @@ defmodule Absinthe.Subscription do @type subscription_field_spec :: {atom, term | (term -> term)} - @type prime_fun :: (-> {:ok, [map()]}) + @type prime_fun :: (Absinthe.Resolution.t() -> {:ok, [map()]}) @doc """ Publish a mutation diff --git a/lib/absinthe/subscription/local.ex b/lib/absinthe/subscription/local.ex index 31b9b456f7..1881165871 100644 --- a/lib/absinthe/subscription/local.ex +++ b/lib/absinthe/subscription/local.ex @@ -69,7 +69,10 @@ defmodule Absinthe.Subscription.Local do |> Pipeline.without(Phase.Subscription.SubscribeSelf) |> Pipeline.insert_before( Phase.Document.Execution.Resolution, - {Phase.Document.OverrideRoot, root_value: mutation_result} + [ + {Phase.Document.OverrideRoot, root_value: mutation_result}, + Phase.Subscription.GetOrdinal + ] ) |> Pipeline.upto(Phase.Document.Execution.Resolution) diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 3583acd247..5991535a92 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -130,6 +130,7 @@ defmodule Absinthe.Execution.SubscriptionTest do object :user do field :id, :id field :name, :string + field :version, :integer field :group, :group do resolve fn user, _, %{context: %{test_pid: pid}} -> @@ -196,7 +197,6 @@ defmodule Absinthe.Execution.SubscriptionTest do end end - field :other_user, :user do arg :id, :id @@ -236,12 +236,39 @@ defmodule Absinthe.Execution.SubscriptionTest do { :ok, topic: args.client_id, - prime: fn -> - {:ok, Enum.map(args.prime_data, &(%{id: "some_id", name: &1}))} + prime: fn %{context: %{prime_id: prime_id}} -> + {:ok, Enum.map(args.prime_data, &%{id: prime_id, name: &1})} end } end end + + field :ordinal, :user do + arg :client_id, non_null(:id) + + config fn args, _ -> + { + :ok, + topic: args.client_id, ordinal: fn %{version: version} -> version end + } + end + end + + field :prime_ordinal, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn _ -> + {:ok, [%{name: "first_user", version: 1}, %{name: "second_user", version: 2}]} + end, + ordinal: fn %{version: version} -> version end + } + end + end end mutation do @@ -314,7 +341,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}}, + result: %{data: %{"thing" => "foo"}, ordinal: nil}, topic: topic } == msg end @@ -391,7 +418,7 @@ defmodule Absinthe.Execution.SubscriptionTest do msg = %{ event: "subscription:data", - result: %{data: %{"multipleTopics" => "foo"}}, + result: %{data: %{"multipleTopics" => "foo"}, ordinal: nil}, topic: topic } @@ -493,7 +520,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"user" => %{"id" => "1", "name" => "foo"}}}, + result: %{data: %{"user" => %{"id" => "1", "name" => "foo"}}, ordinal: nil}, topic: topic } == msg end @@ -566,7 +593,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}}, + result: %{data: %{"thing" => "foo"}, ordinal: nil}, topic: topic } == msg end @@ -585,7 +612,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}}, + result: %{data: %{"thing" => "foo"}, ordinal: nil}, topic: topic } == msg end) @@ -828,7 +855,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}}, + result: %{data: %{"thing" => "foo"}, ordinal: nil}, topic: topic } == msg @@ -919,12 +946,13 @@ defmodule Absinthe.Execution.SubscriptionTest do @query """ subscription ($clientId: ID!, $primeData: [String]) { prime(clientId: $clientId, primeData: $primeData) { + id name } } """ test "subscription with priming" do - client_id = "xyz" + client_id = "abc" prime_data = ["name1", "name2"] assert {:more, %{"subscribed" => _topic, continuation: continuation}} = @@ -934,16 +962,78 @@ defmodule Absinthe.Execution.SubscriptionTest do variables: %{ "primeData" => prime_data, "clientId" => client_id + }, + context: %{prime_id: "test_prime_id"} + ) + + assert {:more, + %{ + data: %{"prime" => %{"id" => "test_prime_id", "name" => "name1"}}, + continuation: continuation + }} = Absinthe.continue(continuation) + + assert {:ok, %{data: %{"prime" => %{"id" => "test_prime_id", "name" => "name2"}}}} = + Absinthe.continue(continuation) + end + + @query """ + subscription ($clientId: ID!) { + ordinal(clientId: $clientId) { + name + } + } + """ + test "subscription with ordinals" do + client_id = "abc" + + assert {:ok, %{"subscribed" => _topic}} = + run_subscription( + @query, + Schema, + variables: %{"clientId" => client_id}, + context: %{pubsub: PubSub} + ) + + userv1 = %{id: "1", name: "Alicia", group: %{name: "Elixir Users"}, version: 1} + userv2 = %{id: "1", name: "Alicia", group: %{name: "Elixir Users"}, version: 2} + + Absinthe.Subscription.publish(PubSub, userv1, ordinal: client_id) + Absinthe.Subscription.publish(PubSub, userv2, ordinal: client_id) + + assert_receive({:broadcast, msg}) + assert msg.result.ordinal == 1 + assert_receive({:broadcast, msg}) + assert msg.result.ordinal == 2 + end + + @query """ + subscription ($clientId: ID!) { + primeOrdinal(clientId: $clientId) { + name + } + } + """ + test "subscription with both priming and ordinals" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + run_subscription( + @query, + Schema, + variables: %{ + "clientId" => client_id } ) - assert {:more, %{ - data: %{"prime" => %{"name" => "name1"}}, - continuation: continuation}} - = Absinthe.continue(continuation) + assert {:more, + %{ + data: %{"primeOrdinal" => %{"name" => "first_user"}}, + ordinal: 1, + continuation: continuation + }} = Absinthe.continue(continuation) - assert {:ok, %{data: %{"prime" => %{"name" => "name2"}}}} - = Absinthe.continue(continuation) + assert {:ok, %{data: %{"primeOrdinal" => %{"name" => "second_user"}}, ordinal: 2}} = + Absinthe.continue(continuation) end def run_subscription(query, schema, opts \\ []) do From 98ef5a6d40f256af17e4976b0a206181a7c19e64 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 22 Jun 2022 17:09:55 +1000 Subject: [PATCH 3/9] Defer call to prime function until continuation This allows the caller to complete their pubsub call before the prime function, avoiding data races. --- lib/absinthe.ex | 6 ++- lib/absinthe/phase/subscription/prime.ex | 42 ++++++++++++++++++- lib/absinthe/phase/subscription/result.ex | 36 ++++++---------- lib/absinthe/pipeline.ex | 6 ++- test/absinthe/execution/subscription_test.exs | 17 ++++++++ 5 files changed, 79 insertions(+), 28 deletions(-) diff --git a/lib/absinthe.ex b/lib/absinthe.ex index ecb55a9939..d8fa9845be 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -105,6 +105,7 @@ defmodule Absinthe do ] @type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()} + @type continue_result :: run_result | :no_more_results @spec run( binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(), @@ -124,7 +125,7 @@ defmodule Absinthe do |> build_result() end - @spec continue([Absinthe.Blueprint.Continuation.t()]) :: run_result() + @spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result def continue(continuation) do continuation |> Absinthe.Pipeline.continue() @@ -133,6 +134,9 @@ defmodule Absinthe do defp build_result(output) do case output do + {:ok, %{result: :no_more_results}, _phases} -> + :no_more_results + {:ok, %{result: %{continuation: c} = result}, _phases} when c != [] -> {:more, result} diff --git a/lib/absinthe/phase/subscription/prime.ex b/lib/absinthe/phase/subscription/prime.ex index c3e08252d8..ad44527705 100644 --- a/lib/absinthe/phase/subscription/prime.ex +++ b/lib/absinthe/phase/subscription/prime.ex @@ -1,8 +1,46 @@ defmodule Absinthe.Phase.Subscription.Prime do @moduledoc false + alias Absinthe.Blueprint.Continuation + alias Absinthe.Phase + @spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t() - def run(blueprint, prime_result: cr) do - {:ok, put_in(blueprint.execution.root_value, cr)} + def run(blueprint, prime_result: prime_result) do + {:ok, put_in(blueprint.execution.root_value, prime_result)} + end + + def run(blueprint, prime_fun: prime_fun, resolution_options: options) do + {:ok, prime_results} = prime_fun.(blueprint.execution) + + case prime_results do + [first | rest] -> + blueprint = put_in(blueprint.execution.root_value, first) + blueprint = maybe_add_continuations(blueprint, rest, options) + {:ok, blueprint} + + [] -> + blueprint = put_in(blueprint.result, :no_more_results) + {:replace, blueprint, []} + end + end + + defp maybe_add_continuations(blueprint, [], _options), do: blueprint + + defp maybe_add_continuations(blueprint, remaining_results, options) do + continuations = + Enum.map( + remaining_results, + &%Continuation{ + phase_input: blueprint, + pipeline: [ + {__MODULE__, [prime_result: &1]}, + {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, + Phase.Document.Result + ] + } + ) + + put_in(blueprint.result, %{continuation: continuations}) end end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index fca8d153c9..8022130858 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -19,7 +19,7 @@ defmodule Absinthe.Phase.Subscription.Result do {:ok, put_in(blueprint.result, result)} prime_fun when is_function(prime_fun, 1) -> - do_prime(prime_fun, result, blueprint, options) + stash_prime(prime_fun, result, blueprint, options) val -> raise """ @@ -30,28 +30,18 @@ defmodule Absinthe.Phase.Subscription.Result do end end - def do_prime(prime_fun, base_result, blueprint, options) do - {:ok, prime_results} = prime_fun.(blueprint.execution) - - result = - if prime_results != [] do - continuations = - Enum.map(prime_results, fn cr -> - %Continuation{ - phase_input: blueprint, - pipeline: [ - {Phase.Subscription.Prime, [prime_result: cr]}, - {Phase.Document.Execution.Resolution, options}, - Phase.Subscription.GetOrdinal, - Phase.Document.Result - ] - } - end) - - Map.put(base_result, :continuation, continuations) - else - base_result - end + def stash_prime(prime_fun, base_result, blueprint, options) do + continuation = %Continuation{ + phase_input: blueprint, + pipeline: [ + {Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]}, + {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, + Phase.Document.Result + ] + } + + result = Map.put(base_result, :continuation, [continuation]) {:ok, put_in(blueprint.result, result)} end diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index a64ab24c7b..6f3dcd2b2d 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -19,6 +19,8 @@ defmodule Absinthe.Pipeline do @type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @type continue_result_t :: run_result_t | :no_more_results + @type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()} @type t :: [phase_config_t | [phase_config_t]] @@ -30,7 +32,7 @@ defmodule Absinthe.Pipeline do |> run_phase(input) end - @spec continue([Continuation.t()]) :: run_result_t + @spec continue([Continuation.t()]) :: continue_result_t def continue([continuation | rest]) do result = run_phase(continuation.pipeline, continuation.phase_input) @@ -41,7 +43,7 @@ defmodule Absinthe.Pipeline do {:ok, blueprint, phases} -> bp_result = Map.put(blueprint.result, :continuation, rest) blueprint = Map.put(blueprint, :result, bp_result) - {:ok, blueprint, phases} + {:more, blueprint, phases} error -> error diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 5991535a92..459c28b964 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -976,6 +976,23 @@ defmodule Absinthe.Execution.SubscriptionTest do Absinthe.continue(continuation) end + test "continuation with no extra data" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + run_subscription( + @query, + Schema, + variables: %{ + "primeData" => [], + "clientId" => client_id + }, + context: %{prime_id: "test_prime_id"} + ) + + assert :no_more_results == Absinthe.continue(continuation) + end + @query """ subscription ($clientId: ID!) { ordinal(clientId: $clientId) { From 3ea1038eb38145d8f3b98d530d604364207d1d14 Mon Sep 17 00:00:00 2001 From: Sam Bobroff Date: Thu, 28 Jul 2022 14:35:37 +1000 Subject: [PATCH 4/9] Fix crash when subscription priming 3 or more items --- lib/absinthe/pipeline.ex | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index 6f3dcd2b2d..a64ab24c7b 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -19,8 +19,6 @@ defmodule Absinthe.Pipeline do @type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} - @type continue_result_t :: run_result_t | :no_more_results - @type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()} @type t :: [phase_config_t | [phase_config_t]] @@ -32,7 +30,7 @@ defmodule Absinthe.Pipeline do |> run_phase(input) end - @spec continue([Continuation.t()]) :: continue_result_t + @spec continue([Continuation.t()]) :: run_result_t def continue([continuation | rest]) do result = run_phase(continuation.pipeline, continuation.phase_input) @@ -43,7 +41,7 @@ defmodule Absinthe.Pipeline do {:ok, blueprint, phases} -> bp_result = Map.put(blueprint.result, :continuation, rest) blueprint = Map.put(blueprint, :result, bp_result) - {:more, blueprint, phases} + {:ok, blueprint, phases} error -> error From 8efcc4c4d6438b6fc1317f8e23024a2053180b3e Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 17 Aug 2022 09:39:35 +1000 Subject: [PATCH 5/9] Consistant pluralisation of 'continuations' --- lib/absinthe.ex | 10 +++++----- lib/absinthe/phase/document/result.ex | 2 +- lib/absinthe/phase/subscription/prime.ex | 2 +- lib/absinthe/phase/subscription/result.ex | 2 +- lib/absinthe/pipeline.ex | 2 +- test/absinthe/execution/subscription_test.exs | 20 +++++++++---------- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/absinthe.ex b/lib/absinthe.ex index d8fa9845be..261d76f8a0 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -44,13 +44,13 @@ defmodule Absinthe do %{message: String.t()} | %{message: String.t(), locations: [%{line: pos_integer, column: integer}]} - @type continuation_t :: nil | [Absinthe.Blueprint.Continuation.t()] + @type continuations_t :: nil | [Absinthe.Blueprint.Continuation.t()] @type result_t :: %{ required(:data) => nil | result_selection_t, optional(:ordinal) => term(), - optional(:continuation) => continuation_t, + optional(:continuation) => continuations_t, optional(:errors) => [result_error_t] } | %{errors: [result_error_t]} @@ -126,8 +126,8 @@ defmodule Absinthe do end @spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result - def continue(continuation) do - continuation + def continue(continuations) do + continuations |> Absinthe.Pipeline.continue() |> build_result() end @@ -137,7 +137,7 @@ defmodule Absinthe do {:ok, %{result: :no_more_results}, _phases} -> :no_more_results - {:ok, %{result: %{continuation: c} = result}, _phases} when c != [] -> + {:ok, %{result: %{continuations: c} = result}, _phases} when c != [] -> {:more, result} {:ok, %{result: result}, _phases} -> diff --git a/lib/absinthe/phase/document/result.ex b/lib/absinthe/phase/document/result.ex index b71cdedb86..690312090a 100644 --- a/lib/absinthe/phase/document/result.ex +++ b/lib/absinthe/phase/document/result.ex @@ -160,7 +160,7 @@ defmodule Absinthe.Phase.Document.Result do defp format_location(_), do: [] defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [], - do: Map.put(result, :continuation, continuations) + do: Map.put(result, :continuations, continuations) defp maybe_add_continuations(result, _), do: result end diff --git a/lib/absinthe/phase/subscription/prime.ex b/lib/absinthe/phase/subscription/prime.ex index ad44527705..ac979a9984 100644 --- a/lib/absinthe/phase/subscription/prime.ex +++ b/lib/absinthe/phase/subscription/prime.ex @@ -41,6 +41,6 @@ defmodule Absinthe.Phase.Subscription.Prime do } ) - put_in(blueprint.result, %{continuation: continuations}) + put_in(blueprint.result, %{continuations: continuations}) end end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index 8022130858..0acf3575ca 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -41,7 +41,7 @@ defmodule Absinthe.Phase.Subscription.Result do ] } - result = Map.put(base_result, :continuation, [continuation]) + result = Map.put(base_result, :continuations, [continuation]) {:ok, put_in(blueprint.result, result)} end diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index a64ab24c7b..8076d62d43 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -39,7 +39,7 @@ defmodule Absinthe.Pipeline do {:ok, blueprint, phases} {:ok, blueprint, phases} -> - bp_result = Map.put(blueprint.result, :continuation, rest) + bp_result = Map.put(blueprint.result, :continuations, rest) blueprint = Map.put(blueprint, :result, bp_result) {:ok, blueprint, phases} diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 459c28b964..745e9603fb 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -955,7 +955,7 @@ defmodule Absinthe.Execution.SubscriptionTest do client_id = "abc" prime_data = ["name1", "name2"] - assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = run_subscription( @query, Schema, @@ -969,17 +969,17 @@ defmodule Absinthe.Execution.SubscriptionTest do assert {:more, %{ data: %{"prime" => %{"id" => "test_prime_id", "name" => "name1"}}, - continuation: continuation - }} = Absinthe.continue(continuation) + continuations: continuations + }} = Absinthe.continue(continuations) assert {:ok, %{data: %{"prime" => %{"id" => "test_prime_id", "name" => "name2"}}}} = - Absinthe.continue(continuation) + Absinthe.continue(continuations) end test "continuation with no extra data" do client_id = "abc" - assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = run_subscription( @query, Schema, @@ -990,7 +990,7 @@ defmodule Absinthe.Execution.SubscriptionTest do context: %{prime_id: "test_prime_id"} ) - assert :no_more_results == Absinthe.continue(continuation) + assert :no_more_results == Absinthe.continue(continuations) end @query """ @@ -1033,7 +1033,7 @@ defmodule Absinthe.Execution.SubscriptionTest do test "subscription with both priming and ordinals" do client_id = "abc" - assert {:more, %{"subscribed" => _topic, continuation: continuation}} = + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = run_subscription( @query, Schema, @@ -1046,11 +1046,11 @@ defmodule Absinthe.Execution.SubscriptionTest do %{ data: %{"primeOrdinal" => %{"name" => "first_user"}}, ordinal: 1, - continuation: continuation - }} = Absinthe.continue(continuation) + continuations: continuations + }} = Absinthe.continue(continuations) assert {:ok, %{data: %{"primeOrdinal" => %{"name" => "second_user"}}, ordinal: 2}} = - Absinthe.continue(continuation) + Absinthe.continue(continuations) end def run_subscription(query, schema, opts \\ []) do From a255cb9db91606aebf33f0aaaf4aa051ab89eba5 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 17 Aug 2022 10:05:30 +1000 Subject: [PATCH 6/9] Do not include ordinal in result when no function is set --- .../phase/subscription/get_ordinal.ex | 32 ++++++------------- test/absinthe/execution/subscription_test.exs | 12 +++---- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/lib/absinthe/phase/subscription/get_ordinal.ex b/lib/absinthe/phase/subscription/get_ordinal.ex index 5dc3eb30eb..ac71a7879a 100644 --- a/lib/absinthe/phase/subscription/get_ordinal.ex +++ b/lib/absinthe/phase/subscription/get_ordinal.ex @@ -9,34 +9,22 @@ defmodule Absinthe.Phase.Subscription.GetOrdinal do @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} def run(blueprint, _options \\ []) do - op = Blueprint.current_operation(blueprint) - - if op.type == :subscription do - {:ok, - %{blueprint | result: Map.put(blueprint.result, :ordinal, get_ordinal(op, blueprint))}} + with %{type: :subscription, selections: [field]} <- Blueprint.current_operation(blueprint), + {:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint), + ordinal_fun when is_function(ordinal_fun, 1) <- config[:ordinal] do + result = ordinal_fun.(blueprint.execution.root_value) + {:ok, %{blueprint | result: Map.put(blueprint.result, :ordinal, result)}} else - {:ok, blueprint} - end - end - - defp get_ordinal(op, blueprint) do - %{selections: [field]} = op - {:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint) - - case config[:ordinal] do - nil -> - nil - - fun when is_function(fun, 1) -> - fun.(blueprint.execution.root_value) - - _fun -> + f when is_function(f) -> IO.write( :stderr, "Ordinal function must be 1-arity" ) - nil + {:ok, blueprint} + + _ -> + {:ok, blueprint} end end end diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 745e9603fb..802150a012 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -341,7 +341,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}, ordinal: nil}, + result: %{data: %{"thing" => "foo"}}, topic: topic } == msg end @@ -418,7 +418,7 @@ defmodule Absinthe.Execution.SubscriptionTest do msg = %{ event: "subscription:data", - result: %{data: %{"multipleTopics" => "foo"}, ordinal: nil}, + result: %{data: %{"multipleTopics" => "foo"}}, topic: topic } @@ -520,7 +520,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"user" => %{"id" => "1", "name" => "foo"}}, ordinal: nil}, + result: %{data: %{"user" => %{"id" => "1", "name" => "foo"}}}, topic: topic } == msg end @@ -593,7 +593,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}, ordinal: nil}, + result: %{data: %{"thing" => "foo"}}, topic: topic } == msg end @@ -612,7 +612,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}, ordinal: nil}, + result: %{data: %{"thing" => "foo"}}, topic: topic } == msg end) @@ -855,7 +855,7 @@ defmodule Absinthe.Execution.SubscriptionTest do assert %{ event: "subscription:data", - result: %{data: %{"thing" => "foo"}, ordinal: nil}, + result: %{data: %{"thing" => "foo"}}, topic: topic } == msg From cc7e48ea14035e48bec7ecb259d3674c995eccdd Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Tue, 9 Jul 2024 17:47:24 +1000 Subject: [PATCH 7/9] Add optional ordinal comparison fun --- lib/absinthe.ex | 7 ++- .../phase/subscription/get_ordinal.ex | 32 +++++++++-- lib/absinthe/phase/subscription/result.ex | 30 +++++------ test/absinthe/execution/subscription_test.exs | 54 +++++++++++++++++++ 4 files changed, 101 insertions(+), 22 deletions(-) diff --git a/lib/absinthe.ex b/lib/absinthe.ex index 261d76f8a0..88b7d5833b 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -46,10 +46,15 @@ defmodule Absinthe do @type continuations_t :: nil | [Absinthe.Blueprint.Continuation.t()] + @type ordinal_fun :: (term() -> term()) + + @type ordinal_compare_fun :: (term(), term() -> {boolean(), term()}) + @type result_t :: %{ required(:data) => nil | result_selection_t, - optional(:ordinal) => term(), + optional(:ordinal_fun) => ordinal_fun(), + optional(:ordinal_compare_fun) => ordinal_compare_fun(), optional(:continuation) => continuations_t, optional(:errors) => [result_error_t] } diff --git a/lib/absinthe/phase/subscription/get_ordinal.ex b/lib/absinthe/phase/subscription/get_ordinal.ex index ac71a7879a..885e5c8de6 100644 --- a/lib/absinthe/phase/subscription/get_ordinal.ex +++ b/lib/absinthe/phase/subscription/get_ordinal.ex @@ -11,11 +11,20 @@ defmodule Absinthe.Phase.Subscription.GetOrdinal do def run(blueprint, _options \\ []) do with %{type: :subscription, selections: [field]} <- Blueprint.current_operation(blueprint), {:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint), - ordinal_fun when is_function(ordinal_fun, 1) <- config[:ordinal] do - result = ordinal_fun.(blueprint.execution.root_value) - {:ok, %{blueprint | result: Map.put(blueprint.result, :ordinal, result)}} + {_, ordinal_fun} when is_function(ordinal_fun, 1) <- {:ordinal_fun, config[:ordinal]}, + {_, ordinal_compare_fun} when is_function(ordinal_compare_fun, 2) <- + {:ordinal_compare_fun, + Keyword.get(config, :ordinal_compare, &default_ordinal_compare/2)} do + ordinal = ordinal_fun.(blueprint.execution.root_value) + + result = + blueprint.result + |> Map.put(:ordinal, ordinal) + |> Map.put(:ordinal_compare_fun, ordinal_compare_fun) + + {:ok, %{blueprint | result: result}} else - f when is_function(f) -> + {:ordinal_fun, f} when is_function(f) -> IO.write( :stderr, "Ordinal function must be 1-arity" @@ -23,8 +32,23 @@ defmodule Absinthe.Phase.Subscription.GetOrdinal do {:ok, blueprint} + {:ordinal_compare_fun, f} when is_function(f) -> + IO.write( + :stderr, + "Ordinal compare function must be 2-arity" + ) + + {:ok, blueprint} + _ -> {:ok, blueprint} end end + + defp default_ordinal_compare(nil, new_ordinal), do: {true, new_ordinal} + + defp default_ordinal_compare(old_ordinal, new_ordinal) when old_ordinal < new_ordinal, + do: {true, new_ordinal} + + defp default_ordinal_compare(old_ordinal, _new_ordinal), do: {false, old_ordinal} end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index 0acf3575ca..3cd4403a97 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -12,25 +12,15 @@ defmodule Absinthe.Phase.Subscription.Result do def run(blueprint, options) do topic = Keyword.fetch!(options, :topic) prime = Keyword.get(options, :prime) - result = %{"subscribed" => topic} - case prime do - nil -> - {:ok, put_in(blueprint.result, result)} + result = maybe_add_prime(%{"subscribed" => topic}, prime, blueprint, options) - prime_fun when is_function(prime_fun, 1) -> - stash_prime(prime_fun, result, blueprint, options) - - val -> - raise """ - Invalid prime function. Must be a function of arity 1. - - #{inspect(val)} - """ - end + {:ok, put_in(blueprint.result, result)} end - def stash_prime(prime_fun, base_result, blueprint, options) do + def maybe_add_prime(result, nil, _blueprint, _options), do: result + + def maybe_add_prime(result, prime_fun, blueprint, options) when is_function(prime_fun, 1) do continuation = %Continuation{ phase_input: blueprint, pipeline: [ @@ -41,8 +31,14 @@ defmodule Absinthe.Phase.Subscription.Result do ] } - result = Map.put(base_result, :continuations, [continuation]) + Map.put(result, :continuations, [continuation]) + end - {:ok, put_in(blueprint.result, result)} + def maybe_add_prime(_result, prime_fun, _blueprint, _options) do + raise """ + Invalid prime function. Must be a function of arity 1. + + #{inspect(prime_fun)} + """ end end diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 802150a012..fe32c30e17 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -269,6 +269,23 @@ defmodule Absinthe.Execution.SubscriptionTest do } end end + + field :prime_ordinal_with_compare, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn _ -> + {:ok, [%{name: "first_user", version: 1}, %{name: "second_user", version: 2}]} + end, + ordinal: fn %{version: version} -> version end, + ordinal_compare: &custom_ordinal_compare/2 + } + end + end end mutation do @@ -280,6 +297,10 @@ defmodule Absinthe.Execution.SubscriptionTest do end end end + + def custom_ordinal_compare(a, b) do + {a > b, b + 0.5} + end end setup_all do @@ -1053,6 +1074,39 @@ defmodule Absinthe.Execution.SubscriptionTest do Absinthe.continue(continuations) end + @query """ + subscription ($clientId: ID!) { + primeOrdinalWithCompare(clientId: $clientId) { + name + } + } + """ + test "subscription with priming, ordinals, and custom ordinal compare function" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = + run_subscription( + @query, + Schema, + variables: %{ + "clientId" => client_id + } + ) + + assert {:more, + %{ + data: %{"primeOrdinalWithCompare" => %{"name" => "first_user"}}, + ordinal: 1, + ordinal_compare_fun: custom_ordinal_compare, + continuations: continuations + }} = Absinthe.continue(continuations) + + assert custom_ordinal_compare.(1, 2) == {false, 2.5} + + assert {:ok, %{data: %{"primeOrdinalWithCompare" => %{"name" => "second_user"}}, ordinal: 2}} = + Absinthe.continue(continuations) + end + def run_subscription(query, schema, opts \\ []) do opts = Keyword.update( From dc0bf1c3aad6ddfba1d264cbbe74cbe84775d1f0 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 23 Oct 2024 15:39:16 +1100 Subject: [PATCH 8/9] Fix formatting --- lib/absinthe/pipeline.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index 8076d62d43..a5dab4b7af 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -17,7 +17,9 @@ defmodule Absinthe.Pipeline do @type data_t :: any - @type run_result_t :: {:ok, data_t, [Phase.t()]} | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @type run_result_t :: + {:ok, data_t, [Phase.t()]} + | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} @type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()} From f7a0f0976696de8d5fa74a5efca4e1c453bcf480 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Wed, 4 Dec 2024 15:43:25 +1100 Subject: [PATCH 9/9] Fix type --- lib/absinthe.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/absinthe.ex b/lib/absinthe.ex index 88b7d5833b..228f929acd 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -55,7 +55,7 @@ defmodule Absinthe do required(:data) => nil | result_selection_t, optional(:ordinal_fun) => ordinal_fun(), optional(:ordinal_compare_fun) => ordinal_compare_fun(), - optional(:continuation) => continuations_t, + optional(:continuations) => continuations_t, optional(:errors) => [result_error_t] } | %{errors: [result_error_t]}