diff --git a/.tool-versions b/.tool-versions index a3865e3..54976d6 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.10.0-otp-22 -erlang 22.0.7 +elixir 1.13.4 +erlang 24.2.1 diff --git a/lib/absinthe/phoenix/channel.ex b/lib/absinthe/phoenix/channel.ex index 5e32a8b..afa1e48 100644 --- a/lib/absinthe/phoenix/channel.ex +++ b/lib/absinthe/phoenix/channel.ex @@ -35,6 +35,7 @@ defmodule Absinthe.Phoenix.Channel do Map.put(absinthe_config, :pipeline, pipeline || {__MODULE__, :default_pipeline}) socket = socket |> assign(:absinthe, absinthe_config) + {:ok, socket} end @@ -87,22 +88,70 @@ defmodule Absinthe.Phoenix.Channel do {:reply, {:ok, %{subscriptionId: doc_id}}, socket} end + def handle_info( + %Phoenix.Socket.Broadcast{ + payload: %{result: %{ordinal: ordinal, ordinal_compare_fun: ordinal_compare_fun}} + } = msg, + socket + ) + when not is_nil(ordinal) do + absinthe_assigns = Map.get(socket.assigns, :absinthe, %{}) + last_ordinal = absinthe_assigns[:subscription_ordinals][msg.topic] + + {should_send, new_ordinal} = ordinal_compare_fun.(last_ordinal, ordinal) + + if should_send do + send_msg(msg, socket) + end + + socket = update_ordinal(socket, msg.topic, new_ordinal) + {:noreply, socket} + end + + def handle_info(msg, socket) do + send_msg(msg, socket) + {:noreply, socket} + end + + defp send_msg(msg, socket) do + {_ordinal, msg} = pop_in(msg.payload.result[:ordinal]) + {_ordinal_compare_fun, msg} = pop_in(msg.payload.result[:ordinal_compare_fun]) + encoded_msg = socket.serializer.fastlane!(msg) + send(socket.transport_pid, encoded_msg) + end + + defp update_ordinal(socket, topic, ordinal) do + absinthe_assigns = Map.get(socket.assigns, :absinthe, %{}) + + ordinals = + absinthe_assigns + |> Map.get(:subscription_ordinals, %{}) + |> Map.put(topic, ordinal) + + Phoenix.Socket.assign( + socket, + :absinthe, + Map.put(absinthe_assigns, :subscription_ordinals, ordinals) + ) + end + defp run_doc(socket, query, config, opts) do case run(query, config[:schema], config[:pipeline], opts) do {:ok, %{"subscribed" => topic}, context} -> - %{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} = - socket + pubsub_subscribe(topic, socket) + socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) + + {{:ok, %{subscriptionId: topic}}, socket} - :ok = - Phoenix.PubSub.subscribe( - pubsub_server, - topic, - metadata: {:fastlane, transport_pid, serializer, []}, - link: true - ) + {:more, %{"subscribed" => topic}, continuations, context} -> + reply(socket_ref(socket), {:ok, %{subscriptionId: topic}}) + pubsub_subscribe(topic, socket) socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) - {{:ok, %{subscriptionId: topic}}, socket} + + handle_subscription_continuation(continuations, topic, socket) + + {:noreply, socket} {:ok, %{data: _} = reply, context} -> socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) @@ -112,6 +161,16 @@ defmodule Absinthe.Phoenix.Channel do socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) {{:error, reply}, socket} + {:more, %{data: _} = reply, continuations, context} -> + id = new_query_id() + + socket = + socket + |> Absinthe.Phoenix.Socket.put_options(context: context) + |> handle_continuation(continuations, id) + + {{:ok, add_query_id(reply, id)}, socket} + {:error, reply} -> {reply, socket} end @@ -121,6 +180,9 @@ defmodule Absinthe.Phoenix.Channel do {module, fun} = pipeline case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do + {:ok, %{result: %{continuations: continuations} = result, execution: res}, _phases} -> + {:more, Map.delete(result, :continuations), continuations, res.context} + {:ok, %{result: result, execution: res}, _phases} -> {:ok, result, res.context} @@ -129,6 +191,19 @@ defmodule Absinthe.Phoenix.Channel do end end + defp pubsub_subscribe( + topic, + %{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} + ) do + :ok = + Phoenix.PubSub.subscribe( + pubsub_server, + topic, + metadata: {:fastlane, transport_pid, serializer, ["subscription:data"]}, + link: true + ) + end + defp extract_variables(payload) do case Map.get(payload, "variables", %{}) do nil -> %{} @@ -142,7 +217,60 @@ defmodule Absinthe.Phoenix.Channel do |> Absinthe.Pipeline.for_document(options) end - def handle_info(_, state) do - {:noreply, state} + defp handle_continuation(socket, continuations, id) do + case Absinthe.Pipeline.continue(continuations) do + {:ok, %{result: %{continuation: next_continuations} = result}, _phases} -> + result = + result + |> Map.delete(:continuations) + |> add_query_id(id) + + push(socket, "doc", result) + handle_continuation(socket, next_continuations, id) + + {:ok, %{result: result}, _phases} -> + push(socket, "doc", add_query_id(result, id)) + + {:ok, %{errors: errors}, _phases} -> + push(socket, "doc", add_query_id(%{errors: errors}, id)) + + {:error, msg, _phases} -> + push(socket, "doc", add_query_id(msg, id)) + + {:ok, %{result: :no_more_results}, _phases} -> + socket + end + end + + defp new_query_id, + do: "absinthe_query:" <> to_string(:erlang.unique_integer([:positive])) + + defp add_query_id(result, id), do: Map.put(result, :queryId, id) + + defp handle_subscription_continuation(continuations, topic, socket) do + case Absinthe.Pipeline.continue(continuations) do + {:ok, %{result: :no_more_results}, _phases} -> + :ok + + {:ok, %{result: result}, _phases} -> + socket = push_subscription_item(result.data, topic, socket) + + case result[:continuations] do + nil -> :ok + c -> handle_subscription_continuation(c, topic, socket) + end + end + end + + defp push_subscription_item(data, topic, socket) do + msg = %Phoenix.Socket.Broadcast{ + topic: topic, + event: "subscription:data", + payload: %{result: %{data: data}, subscriptionId: topic} + } + + {:noreply, socket} = handle_info(msg, socket) + + socket end end diff --git a/mix.exs b/mix.exs index 52ccb7b..c6f00cb 100644 --- a/mix.exs +++ b/mix.exs @@ -52,7 +52,9 @@ defmodule Absinthe.Phoenix.Mixfile do defp deps do [ {:absinthe_plug, "~> 1.5"}, - {:absinthe, "~> 1.5"}, + # {:absinthe, "~> 1.5"}, + {:absinthe, + github: "circles-learning-labs/absinthe", branch: "subscription-prime", override: true}, {:decimal, "~> 1.0 or ~> 2.0"}, {:phoenix, "~> 1.5"}, {:phoenix_pubsub, "~> 2.0"}, diff --git a/mix.lock b/mix.lock index 85b4526..4462f54 100644 --- a/mix.lock +++ b/mix.lock @@ -1,20 +1,21 @@ %{ - "absinthe": {:hex, :absinthe, "1.5.0", "911844d3a4aa51a795c84ba6e04c21e1bd573bb8d9331f972027136a94611eef", [:mix], [{:dataloader, "~> 1.0.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "50b1672ccd1143f92b672e8b20255cf84c0f9cee314a3a1645a1a9f7aa524a84"}, + "absinthe": {:git, "https://github.com/circles-learning-labs/absinthe.git", "55129781c95994c73751f53d2c52d2eed78e551b", [branch: "subscription-prime"]}, "absinthe_plug": {:hex, :absinthe_plug, "1.5.0", "018ef544cf577339018d1f482404b4bed762e1b530c78be9de4bbb88a6f3a805", [:mix], [{:absinthe, "~> 1.5.0", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.2 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "4c160f4ce9a1233a4219a42de946e4e05d0e8733537cd5d8d20e7d4ef8d4b7c7"}, - "decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"}, + "decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"}, "earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"}, "earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"}, "ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"}, - "jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"}, + "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"}, + "mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, "phoenix": {:hex, :phoenix, "1.5.1", "95156589879dc69201d5fc0ebdbfdfc7901a09a3616ea611ec297f81340275a2", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc272b38e79d2881790fccae6f67a9fbe9b790103d6878175ea03d23003152eb"}, - "phoenix_html": {:hex, :phoenix_html, "2.14.2", "b8a3899a72050f3f48a36430da507dd99caf0ac2d06c77529b1646964f3d563e", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "58061c8dfd25da5df1ea0ca47c972f161beb6c875cd293917045b92ffe1bf617"}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"}, - "plug": {:hex, :plug, "1.10.0", "6508295cbeb4c654860845fb95260737e4a8838d34d115ad76cd487584e2fc4d", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "422a9727e667be1bf5ab1de03be6fa0ad67b775b2d84ed908f3264415ef29d4a"}, - "plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"}, - "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, + "phoenix_html": {:hex, :phoenix_html, "2.14.3", "51f720d0d543e4e157ff06b65de38e13303d5778a7919bcc696599e5934271b8", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "efd697a7fff35a13eeeb6b43db884705cba353a1a41d127d118fda5f90c8e80f"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"}, + "phoenix_view": {:hex, :phoenix_view, "1.1.2", "1b82764a065fb41051637872c7bd07ed2fdb6f5c3bd89684d4dca6e10115c95a", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "7ae90ad27b09091266f6adbb61e1d2516a7c3d7062c6789d46a7554ec40f3a56"}, + "plug": {:hex, :plug, "1.13.6", "187beb6b67c6cec50503e940f0434ea4692b19384d47e5fdfd701e93cadb4cc2", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02b9c6b9955bce92c829f31d6284bf53c591ca63c4fb9ff81dfd0418667a34ff"}, + "plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"}, + "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } diff --git a/test/absinthe/phoenix_test.exs b/test/absinthe/phoenix_test.exs index 8662951..8e7b366 100644 --- a/test/absinthe/phoenix_test.exs +++ b/test/absinthe/phoenix_test.exs @@ -4,13 +4,16 @@ defmodule Absinthe.PhoenixTest do import ExUnit.CaptureLog + alias Absinthe.Subscription + alias Absinthe.Phoenix.TestEndpoint + setup_all do Absinthe.Test.prime(Schema) children = [ {Phoenix.PubSub, [name: Absinthe.Phoenix.PubSub, adapter: Phoenix.PubSub.PG2]}, Absinthe.Phoenix.TestEndpoint, - {Absinthe.Subscription, Absinthe.Phoenix.TestEndpoint} + {Subscription, TestEndpoint} ] {:ok, _} = Supervisor.start_link(children, strategy: :one_for_one) @@ -130,6 +133,79 @@ defmodule Absinthe.PhoenixTest do assert String.contains?(log, "boom") end + test "subcription with prime", %{socket: socket} do + ref = + push(socket, "doc", %{ + "query" => "subscription { prime }" + }) + + assert_reply(ref, :ok, %{subscriptionId: subscription_ref}) + + assert_push("subscription:data", push) + + expected = %{ + result: %{data: %{"prime" => "prime1"}}, + subscriptionId: subscription_ref + } + + assert expected == push + + assert_push("subscription:data", push) + + expected = %{ + result: %{data: %{"prime" => "prime2"}}, + subscriptionId: subscription_ref + } + + assert expected == push + end + + test "subscription with ordinal", %{socket: socket} do + ref = push(socket, "doc", %{"query" => "subscription { ordinal }"}) + + assert_reply(ref, :ok, %{subscriptionId: subscription_ref}) + + Subscription.publish(TestEndpoint, 1, ordinal: "ordinal_topic") + + assert_push("subscription:data", push) + expected = %{result: %{data: %{"ordinal" => 1}}, subscriptionId: subscription_ref} + assert expected == push + + Subscription.publish(TestEndpoint, 0, ordinal: "ordinal_topic") + refute_push("subscription:data", _) + # This message should not generate a notification because it has a lower ordinal + + Subscription.publish(TestEndpoint, 2, ordinal: "ordinal_topic") + + assert_push("subscription:data", push) + expected = %{result: %{data: %{"ordinal" => 2}}, subscriptionId: subscription_ref} + assert expected == push + end + + test "subscription with ordinal and compare fun", %{socket: socket} do + ref = push(socket, "doc", %{"query" => "subscription { ordinalWithCompare }"}) + + assert_reply(ref, :ok, %{subscriptionId: subscription_ref}) + + Subscription.publish(TestEndpoint, 1, ordinal_with_compare: "ordinal_with_compare_topic") + + assert_push("subscription:data", push) + expected = %{result: %{data: %{"ordinalWithCompare" => 1}}, subscriptionId: subscription_ref} + assert expected == push + + Subscription.publish(TestEndpoint, 2, ordinal_with_compare: "ordinal_with_compare_topic") + refute_push("subscription:data", _) + # This message should not generate a notification because the compare function is old > new + + Subscription.publish(TestEndpoint, 2, ordinal_with_compare: "ordinal_with_compare_topic") + # This message should generate a notification because the compare function result + # adds 1 to the ordinal + + assert_push("subscription:data", push) + expected = %{result: %{data: %{"ordinalWithCompare" => 2}}, subscriptionId: subscription_ref} + assert expected == push + end + test "context changes are persisted across documents", %{socket: socket} do ref = push(socket, "doc", %{ diff --git a/test/support/schema.ex b/test/support/schema.ex index 582a012..ac55e98 100644 --- a/test/support/schema.ex +++ b/test/support/schema.ex @@ -97,5 +97,30 @@ defmodule Schema do {:error, "unauthorized"} end end + + field :prime, :string do + config fn _, _ -> + {:ok, + topic: "prime_topic", + prime: fn _ -> + {:ok, ["prime1", "prime2"]} + end} + end + end + + field :ordinal, :integer do + config fn _, _ -> + {:ok, topic: "ordinal_topic", ordinal: fn value -> value end} + end + end + + field :ordinal_with_compare, :integer do + config fn _, _ -> + {:ok, + topic: "ordinal_with_compare_topic", + ordinal: fn value -> value end, + ordinal_compare: fn old, new -> {is_nil(old) || old > new, new + 1} end} + end + end end end