diff --git a/lib/absinthe.ex b/lib/absinthe.ex index 5ed484cdc2..228f929acd 100644 --- a/lib/absinthe.ex +++ b/lib/absinthe.ex @@ -44,9 +44,20 @@ defmodule Absinthe do %{message: String.t()} | %{message: String.t(), locations: [%{line: pos_integer, column: integer}]} + @type continuations_t :: nil | [Absinthe.Blueprint.Continuation.t()] + + @type ordinal_fun :: (term() -> term()) + + @type ordinal_compare_fun :: (term(), term() -> {boolean(), term()}) + @type result_t :: - %{data: nil | result_selection_t} - | %{data: nil | result_selection_t, errors: [result_error_t]} + %{ + required(:data) => nil | result_selection_t, + optional(:ordinal_fun) => ordinal_fun(), + optional(:ordinal_compare_fun) => ordinal_compare_fun(), + optional(:continuations) => continuations_t, + optional(:errors) => [result_error_t] + } | %{errors: [result_error_t]} @type pipeline_modifier_fun :: (Absinthe.Pipeline.t(), Keyword.t() -> Absinthe.Pipeline.t()) @@ -98,7 +109,8 @@ defmodule Absinthe do pipeline_modifier: pipeline_modifier_fun() ] - @type run_result :: {:ok, result_t} | {:error, String.t()} + @type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()} + @type continue_result :: run_result | :no_more_results @spec run( binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(), @@ -113,7 +125,26 @@ defmodule Absinthe do |> Absinthe.Pipeline.for_document(options) |> pipeline_modifier.(options) - case Absinthe.Pipeline.run(document, pipeline) do + document + |> Absinthe.Pipeline.run(pipeline) + |> build_result() + end + + @spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result + def continue(continuations) do + continuations + |> Absinthe.Pipeline.continue() + |> build_result() + end + + defp build_result(output) do + case output do + {:ok, %{result: :no_more_results}, _phases} -> + :no_more_results + + {:ok, %{result: %{continuations: c} = result}, _phases} when c != [] -> + {:more, result} + {:ok, %{result: result}, _phases} -> {:ok, result} @@ -137,6 +168,7 @@ defmodule Absinthe do def run!(input, schema, options \\ []) do case run(input, schema, options) do {:ok, result} -> result + {:more, result} -> result {:error, err} -> raise ExecutionError, message: err end end diff --git a/lib/absinthe/blueprint/continuation.ex b/lib/absinthe/blueprint/continuation.ex new file mode 100644 index 0000000000..9c5e1198c9 --- /dev/null +++ b/lib/absinthe/blueprint/continuation.ex @@ -0,0 +1,18 @@ +defmodule Absinthe.Blueprint.Continuation do + @moduledoc false + + # Continuations allow further resolutions after the initial result is + # returned + + alias Absinthe.Pipeline + + defstruct [ + :phase_input, + :pipeline + ] + + @type t :: %__MODULE__{ + phase_input: Pipeline.data_t(), + pipeline: Pipeline.t() + } +end diff --git a/lib/absinthe/blueprint/result/list.ex b/lib/absinthe/blueprint/result/list.ex index 40d41896e9..62df401a5c 100644 --- a/lib/absinthe/blueprint/result/list.ex +++ b/lib/absinthe/blueprint/result/list.ex @@ -9,7 +9,8 @@ defmodule Absinthe.Blueprint.Result.List do :values, errors: [], flags: %{}, - extensions: %{} + extensions: %{}, + continuations: [] ] @type t :: %__MODULE__{ @@ -17,6 +18,7 @@ defmodule Absinthe.Blueprint.Result.List do values: [Blueprint.Execution.node_t()], errors: [Phase.Error.t()], flags: Blueprint.flags_t(), - extensions: %{any => any} + extensions: %{any => any}, + continuations: [Blueprint.Continuation.t()] } end diff --git a/lib/absinthe/blueprint/result/object.ex b/lib/absinthe/blueprint/result/object.ex index 745dc16062..3e1ab3f506 100644 --- a/lib/absinthe/blueprint/result/object.ex +++ b/lib/absinthe/blueprint/result/object.ex @@ -10,7 +10,8 @@ defmodule Absinthe.Blueprint.Result.Object do :fields, errors: [], flags: %{}, - extensions: %{} + extensions: %{}, + continuations: [] ] @type t :: %__MODULE__{ @@ -18,6 +19,7 @@ defmodule Absinthe.Blueprint.Result.Object do fields: [Blueprint.Execution.node_t()], errors: [Phase.Error.t()], flags: Blueprint.flags_t(), - extensions: %{any => any} + extensions: %{any => any}, + continuations: [Blueprint.Continuation.t()] } end diff --git a/lib/absinthe/phase/document/result.ex b/lib/absinthe/phase/document/result.ex index 651555b78d..690312090a 100644 --- a/lib/absinthe/phase/document/result.ex +++ b/lib/absinthe/phase/document/result.ex @@ -25,7 +25,9 @@ defmodule Absinthe.Phase.Document.Result do {:validation_failed, errors} end - format_result(result, opts) + result + |> format_result(opts) + |> maybe_add_continuations(blueprint.execution.result) end defp format_result({:ok, {data, []}}, _) do @@ -156,4 +158,9 @@ defmodule Absinthe.Phase.Document.Result do end defp format_location(_), do: [] + + defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [], + do: Map.put(result, :continuations, continuations) + + defp maybe_add_continuations(result, _), do: result end diff --git a/lib/absinthe/phase/subscription/get_ordinal.ex b/lib/absinthe/phase/subscription/get_ordinal.ex new file mode 100644 index 0000000000..885e5c8de6 --- /dev/null +++ b/lib/absinthe/phase/subscription/get_ordinal.ex @@ -0,0 +1,54 @@ +defmodule Absinthe.Phase.Subscription.GetOrdinal do + use Absinthe.Phase + + alias Absinthe.Phase.Subscription.SubscribeSelf + + @moduledoc false + + alias Absinthe.Blueprint + + @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} + def run(blueprint, _options \\ []) do + with %{type: :subscription, selections: [field]} <- Blueprint.current_operation(blueprint), + {:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint), + {_, ordinal_fun} when is_function(ordinal_fun, 1) <- {:ordinal_fun, config[:ordinal]}, + {_, ordinal_compare_fun} when is_function(ordinal_compare_fun, 2) <- + {:ordinal_compare_fun, + Keyword.get(config, :ordinal_compare, &default_ordinal_compare/2)} do + ordinal = ordinal_fun.(blueprint.execution.root_value) + + result = + blueprint.result + |> Map.put(:ordinal, ordinal) + |> Map.put(:ordinal_compare_fun, ordinal_compare_fun) + + {:ok, %{blueprint | result: result}} + else + {:ordinal_fun, f} when is_function(f) -> + IO.write( + :stderr, + "Ordinal function must be 1-arity" + ) + + {:ok, blueprint} + + {:ordinal_compare_fun, f} when is_function(f) -> + IO.write( + :stderr, + "Ordinal compare function must be 2-arity" + ) + + {:ok, blueprint} + + _ -> + {:ok, blueprint} + end + end + + defp default_ordinal_compare(nil, new_ordinal), do: {true, new_ordinal} + + defp default_ordinal_compare(old_ordinal, new_ordinal) when old_ordinal < new_ordinal, + do: {true, new_ordinal} + + defp default_ordinal_compare(old_ordinal, _new_ordinal), do: {false, old_ordinal} +end diff --git a/lib/absinthe/phase/subscription/prime.ex b/lib/absinthe/phase/subscription/prime.ex new file mode 100644 index 0000000000..ac979a9984 --- /dev/null +++ b/lib/absinthe/phase/subscription/prime.ex @@ -0,0 +1,46 @@ +defmodule Absinthe.Phase.Subscription.Prime do + @moduledoc false + + alias Absinthe.Blueprint.Continuation + alias Absinthe.Phase + + @spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t() + def run(blueprint, prime_result: prime_result) do + {:ok, put_in(blueprint.execution.root_value, prime_result)} + end + + def run(blueprint, prime_fun: prime_fun, resolution_options: options) do + {:ok, prime_results} = prime_fun.(blueprint.execution) + + case prime_results do + [first | rest] -> + blueprint = put_in(blueprint.execution.root_value, first) + blueprint = maybe_add_continuations(blueprint, rest, options) + {:ok, blueprint} + + [] -> + blueprint = put_in(blueprint.result, :no_more_results) + {:replace, blueprint, []} + end + end + + defp maybe_add_continuations(blueprint, [], _options), do: blueprint + + defp maybe_add_continuations(blueprint, remaining_results, options) do + continuations = + Enum.map( + remaining_results, + &%Continuation{ + phase_input: blueprint, + pipeline: [ + {__MODULE__, [prime_result: &1]}, + {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, + Phase.Document.Result + ] + } + ) + + put_in(blueprint.result, %{continuations: continuations}) + end +end diff --git a/lib/absinthe/phase/subscription/result.ex b/lib/absinthe/phase/subscription/result.ex index 7dbe61eb8c..3cd4403a97 100644 --- a/lib/absinthe/phase/subscription/result.ex +++ b/lib/absinthe/phase/subscription/result.ex @@ -5,10 +5,40 @@ defmodule Absinthe.Phase.Subscription.Result do # subscription alias Absinthe.Blueprint + alias Absinthe.Blueprint.Continuation + alias Absinthe.Phase @spec run(any, Keyword.t()) :: {:ok, Blueprint.t()} - def run(blueprint, topic: topic) do - result = %{"subscribed" => topic} + def run(blueprint, options) do + topic = Keyword.fetch!(options, :topic) + prime = Keyword.get(options, :prime) + + result = maybe_add_prime(%{"subscribed" => topic}, prime, blueprint, options) + {:ok, put_in(blueprint.result, result)} end + + def maybe_add_prime(result, nil, _blueprint, _options), do: result + + def maybe_add_prime(result, prime_fun, blueprint, options) when is_function(prime_fun, 1) do + continuation = %Continuation{ + phase_input: blueprint, + pipeline: [ + {Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]}, + {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, + Phase.Document.Result + ] + } + + Map.put(result, :continuations, [continuation]) + end + + def maybe_add_prime(_result, prime_fun, _blueprint, _options) do + raise """ + Invalid prime function. Must be a function of arity 1. + + #{inspect(prime_fun)} + """ + end end diff --git a/lib/absinthe/phase/subscription/subscribe_self.ex b/lib/absinthe/phase/subscription/subscribe_self.ex index 662ee69365..d2124578ea 100644 --- a/lib/absinthe/phase/subscription/subscribe_self.ex +++ b/lib/absinthe/phase/subscription/subscribe_self.ex @@ -28,7 +28,7 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do Absinthe.Subscription.subscribe(pubsub, field_keys, subscription_id, blueprint) pipeline = [ - {Phase.Subscription.Result, topic: subscription_id}, + {Phase.Subscription.Result, topic: subscription_id, prime: config[:prime]}, {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} ] @@ -46,11 +46,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do end end - defp get_config( - %{schema_node: schema_node, argument_data: argument_data} = field, - context, - blueprint - ) do + def get_config( + %{schema_node: schema_node, argument_data: argument_data} = field, + context, + blueprint + ) do name = schema_node.identifier config = diff --git a/lib/absinthe/pipeline.ex b/lib/absinthe/pipeline.ex index 16c7d1c2ed..a5dab4b7af 100644 --- a/lib/absinthe/pipeline.ex +++ b/lib/absinthe/pipeline.ex @@ -12,23 +12,44 @@ defmodule Absinthe.Pipeline do * See `Absinthe.Schema` on adjusting the schema pipeline for schema manipulation. """ + alias Absinthe.Blueprint.Continuation alias Absinthe.Phase @type data_t :: any + @type run_result_t :: + {:ok, data_t, [Phase.t()]} + | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()} @type t :: [phase_config_t | [phase_config_t]] - @spec run(data_t, t) :: - {:ok, data_t, [Phase.t()]} - | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @spec run(data_t, t) :: run_result_t def run(input, pipeline) do pipeline |> List.flatten() |> run_phase(input) end + @spec continue([Continuation.t()]) :: run_result_t + def continue([continuation | rest]) do + result = run_phase(continuation.pipeline, continuation.phase_input) + + case result do + {:ok, blueprint, phases} when rest == [] -> + {:ok, blueprint, phases} + + {:ok, blueprint, phases} -> + bp_result = Map.put(blueprint.result, :continuations, rest) + blueprint = Map.put(blueprint, :result, bp_result) + {:ok, blueprint, phases} + + error -> + error + end + end + @defaults [ adapter: Absinthe.Adapter.LanguageConventions, operation_name: nil, @@ -116,6 +137,7 @@ defmodule Absinthe.Pipeline do # Execution {Phase.Subscription.SubscribeSelf, options}, {Phase.Document.Execution.Resolution, options}, + Phase.Subscription.GetOrdinal, # Format Result Phase.Document.Result, {Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])} @@ -393,9 +415,7 @@ defmodule Absinthe.Pipeline do end) end - @spec run_phase(t, data_t, [Phase.t()]) :: - {:ok, data_t, [Phase.t()]} - | {:error, String.t() | {:http_method, String.t()}, [Phase.t()]} + @spec run_phase(t, data_t, [Phase.t()]) :: run_result_t def run_phase(pipeline, input, done \\ []) def run_phase([], input, done) do diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 694ba75bfe..05eab77f82 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -82,6 +82,8 @@ defmodule Absinthe.Subscription do @type subscription_field_spec :: {atom, term | (term -> term)} + @type prime_fun :: (Absinthe.Resolution.t() -> {:ok, [map()]}) + @doc """ Publish a mutation diff --git a/lib/absinthe/subscription/local.ex b/lib/absinthe/subscription/local.ex index 31b9b456f7..1881165871 100644 --- a/lib/absinthe/subscription/local.ex +++ b/lib/absinthe/subscription/local.ex @@ -69,7 +69,10 @@ defmodule Absinthe.Subscription.Local do |> Pipeline.without(Phase.Subscription.SubscribeSelf) |> Pipeline.insert_before( Phase.Document.Execution.Resolution, - {Phase.Document.OverrideRoot, root_value: mutation_result} + [ + {Phase.Document.OverrideRoot, root_value: mutation_result}, + Phase.Subscription.GetOrdinal + ] ) |> Pipeline.upto(Phase.Document.Execution.Resolution) diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index ddf1e5c002..fe32c30e17 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -130,6 +130,7 @@ defmodule Absinthe.Execution.SubscriptionTest do object :user do field :id, :id field :name, :string + field :version, :integer field :group, :group do resolve fn user, _, %{context: %{test_pid: pid}} -> @@ -226,6 +227,65 @@ defmodule Absinthe.Execution.SubscriptionTest do {:error, %{message: "failed", extensions: %{code: "FAILED"}}} end end + + field :prime, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn %{context: %{prime_id: prime_id}} -> + {:ok, Enum.map(args.prime_data, &%{id: prime_id, name: &1})} + end + } + end + end + + field :ordinal, :user do + arg :client_id, non_null(:id) + + config fn args, _ -> + { + :ok, + topic: args.client_id, ordinal: fn %{version: version} -> version end + } + end + end + + field :prime_ordinal, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn _ -> + {:ok, [%{name: "first_user", version: 1}, %{name: "second_user", version: 2}]} + end, + ordinal: fn %{version: version} -> version end + } + end + end + + field :prime_ordinal_with_compare, :user do + arg :client_id, non_null(:id) + arg :prime_data, list_of(:string) + + config fn args, _ -> + { + :ok, + topic: args.client_id, + prime: fn _ -> + {:ok, [%{name: "first_user", version: 1}, %{name: "second_user", version: 2}]} + end, + ordinal: fn %{version: version} -> version end, + ordinal_compare: &custom_ordinal_compare/2 + } + end + end end mutation do @@ -237,6 +297,10 @@ defmodule Absinthe.Execution.SubscriptionTest do end end end + + def custom_ordinal_compare(a, b) do + {a > b, b + 0.5} + end end setup_all do @@ -900,6 +964,149 @@ defmodule Absinthe.Execution.SubscriptionTest do } == msg end + @query """ + subscription ($clientId: ID!, $primeData: [String]) { + prime(clientId: $clientId, primeData: $primeData) { + id + name + } + } + """ + test "subscription with priming" do + client_id = "abc" + prime_data = ["name1", "name2"] + + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = + run_subscription( + @query, + Schema, + variables: %{ + "primeData" => prime_data, + "clientId" => client_id + }, + context: %{prime_id: "test_prime_id"} + ) + + assert {:more, + %{ + data: %{"prime" => %{"id" => "test_prime_id", "name" => "name1"}}, + continuations: continuations + }} = Absinthe.continue(continuations) + + assert {:ok, %{data: %{"prime" => %{"id" => "test_prime_id", "name" => "name2"}}}} = + Absinthe.continue(continuations) + end + + test "continuation with no extra data" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = + run_subscription( + @query, + Schema, + variables: %{ + "primeData" => [], + "clientId" => client_id + }, + context: %{prime_id: "test_prime_id"} + ) + + assert :no_more_results == Absinthe.continue(continuations) + end + + @query """ + subscription ($clientId: ID!) { + ordinal(clientId: $clientId) { + name + } + } + """ + test "subscription with ordinals" do + client_id = "abc" + + assert {:ok, %{"subscribed" => _topic}} = + run_subscription( + @query, + Schema, + variables: %{"clientId" => client_id}, + context: %{pubsub: PubSub} + ) + + userv1 = %{id: "1", name: "Alicia", group: %{name: "Elixir Users"}, version: 1} + userv2 = %{id: "1", name: "Alicia", group: %{name: "Elixir Users"}, version: 2} + + Absinthe.Subscription.publish(PubSub, userv1, ordinal: client_id) + Absinthe.Subscription.publish(PubSub, userv2, ordinal: client_id) + + assert_receive({:broadcast, msg}) + assert msg.result.ordinal == 1 + assert_receive({:broadcast, msg}) + assert msg.result.ordinal == 2 + end + + @query """ + subscription ($clientId: ID!) { + primeOrdinal(clientId: $clientId) { + name + } + } + """ + test "subscription with both priming and ordinals" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = + run_subscription( + @query, + Schema, + variables: %{ + "clientId" => client_id + } + ) + + assert {:more, + %{ + data: %{"primeOrdinal" => %{"name" => "first_user"}}, + ordinal: 1, + continuations: continuations + }} = Absinthe.continue(continuations) + + assert {:ok, %{data: %{"primeOrdinal" => %{"name" => "second_user"}}, ordinal: 2}} = + Absinthe.continue(continuations) + end + + @query """ + subscription ($clientId: ID!) { + primeOrdinalWithCompare(clientId: $clientId) { + name + } + } + """ + test "subscription with priming, ordinals, and custom ordinal compare function" do + client_id = "abc" + + assert {:more, %{"subscribed" => _topic, continuations: continuations}} = + run_subscription( + @query, + Schema, + variables: %{ + "clientId" => client_id + } + ) + + assert {:more, + %{ + data: %{"primeOrdinalWithCompare" => %{"name" => "first_user"}}, + ordinal: 1, + ordinal_compare_fun: custom_ordinal_compare, + continuations: continuations + }} = Absinthe.continue(continuations) + + assert custom_ordinal_compare.(1, 2) == {false, 2.5} + + assert {:ok, %{data: %{"primeOrdinalWithCompare" => %{"name" => "second_user"}}, ordinal: 2}} = + Absinthe.continue(continuations) + end + def run_subscription(query, schema, opts \\ []) do opts = Keyword.update( @@ -910,7 +1117,7 @@ defmodule Absinthe.Execution.SubscriptionTest do ) case run(query, schema, opts) do - {:ok, %{"subscribed" => topic}} = val -> + {response, %{"subscribed" => topic}} = val when response == :ok or response == :more -> opts[:context][:pubsub].subscribe(topic) val