diff --git a/lib/absinthe/subscription/local.ex b/lib/absinthe/subscription/local.ex index b128bc4cf1..31b9b456f7 100644 --- a/lib/absinthe/subscription/local.ex +++ b/lib/absinthe/subscription/local.ex @@ -27,7 +27,10 @@ defmodule Absinthe.Subscription.Local do {topic, key_strategy, doc} end - run_docset(pubsub, docs_and_topics, mutation_result) + run_docset_fn = + if function_exported?(pubsub, :run_docset, 3), do: &pubsub.run_docset/3, else: &run_docset/3 + + run_docset_fn.(pubsub, docs_and_topics, mutation_result) :ok end @@ -37,26 +40,7 @@ defmodule Absinthe.Subscription.Local do defp run_docset(pubsub, docs_and_topics, mutation_result) do for {topic, key_strategy, doc} <- docs_and_topics do try do - pipeline = - doc.initial_phases - |> Pipeline.replace( - Phase.Telemetry, - {Phase.Telemetry, event: [:subscription, :publish, :start]} - ) - |> Pipeline.without(Phase.Subscription.SubscribeSelf) - |> Pipeline.insert_before( - Phase.Document.Execution.Resolution, - {Phase.Document.OverrideRoot, root_value: mutation_result} - ) - |> Pipeline.upto(Phase.Document.Execution.Resolution) - - pipeline = [ - pipeline, - [ - result_phase(doc), - {Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]} - ] - ] + pipeline = pipeline(doc, mutation_result) {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) @@ -75,6 +59,31 @@ defmodule Absinthe.Subscription.Local do end end + def pipeline(doc, mutation_result) do + pipeline = + doc.initial_phases + |> Pipeline.replace( + Phase.Telemetry, + {Phase.Telemetry, event: [:subscription, :publish, :start]} + ) + |> Pipeline.without(Phase.Subscription.SubscribeSelf) + |> Pipeline.insert_before( + Phase.Document.Execution.Resolution, + {Phase.Document.OverrideRoot, root_value: mutation_result} + ) + |> Pipeline.upto(Phase.Document.Execution.Resolution) + + pipeline = [ + pipeline, + [ + result_phase(doc), + {Absinthe.Phase.Telemetry, event: [:subscription, :publish, :stop]} + ] + ] + + pipeline + end + defp get_docs(pubsub, field, mutation_result, topic: topic_fun) when is_function(topic_fun, 1) do do_get_docs(pubsub, field, topic_fun.(mutation_result)) diff --git a/lib/absinthe/subscription/pubsub.ex b/lib/absinthe/subscription/pubsub.ex index a701c68a3d..db8cfa1566 100644 --- a/lib/absinthe/subscription/pubsub.ex +++ b/lib/absinthe/subscription/pubsub.ex @@ -73,4 +73,12 @@ defmodule Absinthe.Subscription.Pubsub do only. """ @callback publish_subscription(topic :: binary, data :: map) :: term + + # @doc """ + # This function is called by publish_mutation and is responsible for resolving the documents + # and publishing the results to the appropriate topics. + # """ + @callback run_docset(pubsub :: t, docs_and_topics :: list, mutation_result :: term) :: term + + @optional_callbacks run_docset: 3 end diff --git a/test/absinthe/execution/subscription_test.exs b/test/absinthe/execution/subscription_test.exs index 40e9479b6a..875d03cb1c 100644 --- a/test/absinthe/execution/subscription_test.exs +++ b/test/absinthe/execution/subscription_test.exs @@ -79,6 +79,47 @@ defmodule Absinthe.Execution.SubscriptionTest do end end + defmodule PubSubWithDocsetRunner do + @behaviour Absinthe.Subscription.Pubsub + + def start_link() do + Registry.start_link(keys: :duplicate, name: __MODULE__) + end + + def node_name() do + node() + end + + def subscribe(topic) do + Registry.register(__MODULE__, topic, []) + :ok + end + + def publish_subscription(topic, data) do + message = %{ + topic: topic, + event: "subscription:data", + result: data + } + + Registry.dispatch(__MODULE__, topic, fn entries -> + for {pid, _} <- entries, do: send(pid, {:broadcast, message}) + end) + end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + # this pubsub is local and doesn't support clusters + :ok + end + + def run_docset(pubsub, docs_and_topics, _mutation_result) do + for {topic, _key_strategy, _doc} <- docs_and_topics do + # publish mutation results to topic + pubsub.publish_subscription(topic, %{data: %{runner: "calls the custom docset runner"}}) + end + end + end + defmodule Schema do use Absinthe.Schema @@ -189,6 +230,9 @@ defmodule Absinthe.Execution.SubscriptionTest do setup_all do {:ok, _} = PubSub.start_link() {:ok, _} = Absinthe.Subscription.start_link(PubSub) + + {:ok, _} = PubSubWithDocsetRunner.start_link() + {:ok, _} = Absinthe.Subscription.start_link(PubSubWithDocsetRunner) :ok end @@ -719,12 +763,55 @@ defmodule Absinthe.Execution.SubscriptionTest do refute_receive({:broadcast, _}) end - defp run_subscription(query, schema, opts \\ []) do - opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub)) + @query """ + subscription ($userId: ID!) { + user(id: $userId) { id name } + } + """ + test "calls the optional run_docset callback if supplied" do + id = "1" + + assert {:ok, %{"subscribed" => topic}} = + run_subscription( + @query, + Schema, + variables: %{"userId" => id}, + context: %{pubsub: PubSubWithDocsetRunner} + ) + + mutation = """ + mutation ($userId: ID!) { + updateUser(id: $userId) { id name } + } + """ + + assert {:ok, %{data: _}} = + run_subscription(mutation, Schema, + variables: %{"userId" => id}, + context: %{pubsub: PubSubWithDocsetRunner} + ) + + assert_receive({:broadcast, msg}) + + assert %{ + event: "subscription:data", + result: %{data: %{runner: "calls the custom docset runner"}}, + topic: topic + } == msg + end + + def run_subscription(query, schema, opts \\ []) do + opts = + Keyword.update( + opts, + :context, + %{pubsub: PubSub}, + &Map.put(&1, :pubsub, opts[:context][:pubsub] || PubSub) + ) case run(query, schema, opts) do {:ok, %{"subscribed" => topic}} = val -> - PubSub.subscribe(topic) + opts[:context][:pubsub].subscribe(topic) val val -> diff --git a/test/absinthe/middleware/batch_test.exs b/test/absinthe/middleware/batch_test.exs index 8513c05976..d196bdb41e 100644 --- a/test/absinthe/middleware/batch_test.exs +++ b/test/absinthe/middleware/batch_test.exs @@ -182,7 +182,8 @@ defmodule Absinthe.Middleware.BatchTest do end) wait_for_process_to_exit(pid) - end) =~ "fn: {Absinthe.Middleware.BatchTest.TimeoutModule, :arbitrary_fn_name, %{arbitrary: :data}}" + end) =~ + "fn: {Absinthe.Middleware.BatchTest.TimeoutModule, :arbitrary_fn_name, %{arbitrary: :data}}" end defp wait_for_process_to_exit(pid) do