Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription prime #93

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.10.0-otp-22
erlang 22.0.7
elixir 1.13.4
erlang 24.2.1
152 changes: 140 additions & 12 deletions lib/absinthe/phoenix/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Absinthe.Phoenix.Channel do
Map.put(absinthe_config, :pipeline, pipeline || {__MODULE__, :default_pipeline})

socket = socket |> assign(:absinthe, absinthe_config)

{:ok, socket}
end

Expand Down Expand Up @@ -87,22 +88,70 @@ defmodule Absinthe.Phoenix.Channel do
{:reply, {:ok, %{subscriptionId: doc_id}}, socket}
end

def handle_info(
%Phoenix.Socket.Broadcast{
payload: %{result: %{ordinal: ordinal, ordinal_compare_fun: ordinal_compare_fun}}
} = msg,
socket
)
when not is_nil(ordinal) do
bernardd marked this conversation as resolved.
Show resolved Hide resolved
absinthe_assigns = Map.get(socket.assigns, :absinthe, %{})
last_ordinal = absinthe_assigns[:subscription_ordinals][msg.topic]

{should_send, new_ordinal} = ordinal_compare_fun.(last_ordinal, ordinal)

if should_send do
send_msg(msg, socket)
end

socket = update_ordinal(socket, msg.topic, new_ordinal)
{:noreply, socket}
end

def handle_info(msg, socket) do
send_msg(msg, socket)
{:noreply, socket}
end

defp send_msg(msg, socket) do
{_ordinal, msg} = pop_in(msg.payload.result[:ordinal])
{_ordinal_compare_fun, msg} = pop_in(msg.payload.result[:ordinal_compare_fun])
encoded_msg = socket.serializer.fastlane!(msg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we care to try/catch this and return a normalized error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't, but maybe there's a good reason to do it here?

send(socket.transport_pid, encoded_msg)
end

defp update_ordinal(socket, topic, ordinal) do
absinthe_assigns = Map.get(socket.assigns, :absinthe, %{})

ordinals =
absinthe_assigns
|> Map.get(:subscription_ordinals, %{})
|> Map.put(topic, ordinal)

Phoenix.Socket.assign(
socket,
:absinthe,
Map.put(absinthe_assigns, :subscription_ordinals, ordinals)
)
end

defp run_doc(socket, query, config, opts) do
case run(query, config[:schema], config[:pipeline], opts) do
{:ok, %{"subscribed" => topic}, context} ->
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} =
socket
pubsub_subscribe(topic, socket)
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)

{{:ok, %{subscriptionId: topic}}, socket}

:ok =
Phoenix.PubSub.subscribe(
pubsub_server,
topic,
metadata: {:fastlane, transport_pid, serializer, []},
link: true
)
{:more, %{"subscribed" => topic}, continuations, context} ->
reply(socket_ref(socket), {:ok, %{subscriptionId: topic}})

pubsub_subscribe(topic, socket)
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:ok, %{subscriptionId: topic}}, socket}

handle_subscription_continuation(continuations, topic, socket)

{:noreply, socket}

{:ok, %{data: _} = reply, context} ->
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
Expand All @@ -112,6 +161,16 @@ defmodule Absinthe.Phoenix.Channel do
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
{{:error, reply}, socket}

{:more, %{data: _} = reply, continuations, context} ->
id = new_query_id()

socket =
socket
|> Absinthe.Phoenix.Socket.put_options(context: context)
|> handle_continuation(continuations, id)

{{:ok, add_query_id(reply, id)}, socket}

{:error, reply} ->
{reply, socket}
end
Expand All @@ -121,6 +180,9 @@ defmodule Absinthe.Phoenix.Channel do
{module, fun} = pipeline

case Absinthe.Pipeline.run(document, apply(module, fun, [schema, options])) do
{:ok, %{result: %{continuations: continuations} = result, execution: res}, _phases} ->
{:more, Map.delete(result, :continuations), continuations, res.context}

{:ok, %{result: result, execution: res}, _phases} ->
{:ok, result, res.context}

Expand All @@ -129,6 +191,19 @@ defmodule Absinthe.Phoenix.Channel do
end
end

defp pubsub_subscribe(
topic,
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server}
) do
:ok =
Phoenix.PubSub.subscribe(
pubsub_server,
topic,
metadata: {:fastlane, transport_pid, serializer, ["subscription:data"]},
link: true
)
end

defp extract_variables(payload) do
case Map.get(payload, "variables", %{}) do
nil -> %{}
Expand All @@ -142,7 +217,60 @@ defmodule Absinthe.Phoenix.Channel do
|> Absinthe.Pipeline.for_document(options)
end

def handle_info(_, state) do
{:noreply, state}
defp handle_continuation(socket, continuations, id) do
case Absinthe.Pipeline.continue(continuations) do
{:ok, %{result: %{continuation: next_continuations} = result}, _phases} ->
result =
result
|> Map.delete(:continuations)
|> add_query_id(id)

push(socket, "doc", result)
handle_continuation(socket, next_continuations, id)

{:ok, %{result: result}, _phases} ->
push(socket, "doc", add_query_id(result, id))

{:ok, %{errors: errors}, _phases} ->
push(socket, "doc", add_query_id(%{errors: errors}, id))

{:error, msg, _phases} ->
push(socket, "doc", add_query_id(msg, id))

{:ok, %{result: :no_more_results}, _phases} ->
socket
end
end

defp new_query_id,
do: "absinthe_query:" <> to_string(:erlang.unique_integer([:positive]))

defp add_query_id(result, id), do: Map.put(result, :queryId, id)

defp handle_subscription_continuation(continuations, topic, socket) do
case Absinthe.Pipeline.continue(continuations) do
{:ok, %{result: :no_more_results}, _phases} ->
:ok

{:ok, %{result: result}, _phases} ->
socket = push_subscription_item(result.data, topic, socket)

case result[:continuations] do
nil -> :ok
c -> handle_subscription_continuation(c, topic, socket)
end
end
end

defp push_subscription_item(data, topic, socket) do
msg = %Phoenix.Socket.Broadcast{
topic: topic,
event: "subscription:data",
payload: %{result: %{data: data}, subscriptionId: topic}
}

{:noreply, socket} = handle_info(msg, socket)

socket
end
end
4 changes: 3 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ defmodule Absinthe.Phoenix.Mixfile do
defp deps do
[
{:absinthe_plug, "~> 1.5"},
{:absinthe, "~> 1.5"},
# {:absinthe, "~> 1.5"},
{:absinthe,
github: "circles-learning-labs/absinthe", branch: "subscription-prime", override: true},
{:decimal, "~> 1.0 or ~> 2.0"},
{:phoenix, "~> 1.5"},
{:phoenix_pubsub, "~> 2.0"},
Expand Down
21 changes: 11 additions & 10 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
%{
"absinthe": {:hex, :absinthe, "1.5.0", "911844d3a4aa51a795c84ba6e04c21e1bd573bb8d9331f972027136a94611eef", [:mix], [{:dataloader, "~> 1.0.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "50b1672ccd1143f92b672e8b20255cf84c0f9cee314a3a1645a1a9f7aa524a84"},
"absinthe": {:git, "https://github.com/circles-learning-labs/absinthe.git", "55129781c95994c73751f53d2c52d2eed78e551b", [branch: "subscription-prime"]},
"absinthe_plug": {:hex, :absinthe_plug, "1.5.0", "018ef544cf577339018d1f482404b4bed762e1b530c78be9de4bbb88a6f3a805", [:mix], [{:absinthe, "~> 1.5.0", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.2 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "4c160f4ce9a1233a4219a42de946e4e05d0e8733537cd5d8d20e7d4ef8d4b7c7"},
"decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"},
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
"earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"},
"ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"},
"jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"},
"mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"phoenix": {:hex, :phoenix, "1.5.1", "95156589879dc69201d5fc0ebdbfdfc7901a09a3616ea611ec297f81340275a2", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc272b38e79d2881790fccae6f67a9fbe9b790103d6878175ea03d23003152eb"},
"phoenix_html": {:hex, :phoenix_html, "2.14.2", "b8a3899a72050f3f48a36430da507dd99caf0ac2d06c77529b1646964f3d563e", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "58061c8dfd25da5df1ea0ca47c972f161beb6c875cd293917045b92ffe1bf617"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"},
"plug": {:hex, :plug, "1.10.0", "6508295cbeb4c654860845fb95260737e4a8838d34d115ad76cd487584e2fc4d", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "422a9727e667be1bf5ab1de03be6fa0ad67b775b2d84ed908f3264415ef29d4a"},
"plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
"phoenix_html": {:hex, :phoenix_html, "2.14.3", "51f720d0d543e4e157ff06b65de38e13303d5778a7919bcc696599e5934271b8", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "efd697a7fff35a13eeeb6b43db884705cba353a1a41d127d118fda5f90c8e80f"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"},
"phoenix_view": {:hex, :phoenix_view, "1.1.2", "1b82764a065fb41051637872c7bd07ed2fdb6f5c3bd89684d4dca6e10115c95a", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "7ae90ad27b09091266f6adbb61e1d2516a7c3d7062c6789d46a7554ec40f3a56"},
"plug": {:hex, :plug, "1.13.6", "187beb6b67c6cec50503e940f0434ea4692b19384d47e5fdfd701e93cadb4cc2", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02b9c6b9955bce92c829f31d6284bf53c591ca63c4fb9ff81dfd0418667a34ff"},
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
78 changes: 77 additions & 1 deletion test/absinthe/phoenix_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ defmodule Absinthe.PhoenixTest do

import ExUnit.CaptureLog

alias Absinthe.Subscription
alias Absinthe.Phoenix.TestEndpoint

setup_all do
Absinthe.Test.prime(Schema)

children = [
{Phoenix.PubSub, [name: Absinthe.Phoenix.PubSub, adapter: Phoenix.PubSub.PG2]},
Absinthe.Phoenix.TestEndpoint,
{Absinthe.Subscription, Absinthe.Phoenix.TestEndpoint}
{Subscription, TestEndpoint}
]

{:ok, _} = Supervisor.start_link(children, strategy: :one_for_one)
Expand Down Expand Up @@ -130,6 +133,79 @@ defmodule Absinthe.PhoenixTest do
assert String.contains?(log, "boom")
end

test "subcription with prime", %{socket: socket} do
ref =
push(socket, "doc", %{
"query" => "subscription { prime }"
})

assert_reply(ref, :ok, %{subscriptionId: subscription_ref})

assert_push("subscription:data", push)

expected = %{
result: %{data: %{"prime" => "prime1"}},
subscriptionId: subscription_ref
}

assert expected == push

assert_push("subscription:data", push)

expected = %{
result: %{data: %{"prime" => "prime2"}},
subscriptionId: subscription_ref
}

assert expected == push
end

test "subscription with ordinal", %{socket: socket} do
ref = push(socket, "doc", %{"query" => "subscription { ordinal }"})

assert_reply(ref, :ok, %{subscriptionId: subscription_ref})

Subscription.publish(TestEndpoint, 1, ordinal: "ordinal_topic")

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinal" => 1}}, subscriptionId: subscription_ref}
assert expected == push

Subscription.publish(TestEndpoint, 0, ordinal: "ordinal_topic")
refute_push("subscription:data", _)
# This message should not generate a notification because it has a lower ordinal

Subscription.publish(TestEndpoint, 2, ordinal: "ordinal_topic")

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinal" => 2}}, subscriptionId: subscription_ref}
assert expected == push
end

test "subscription with ordinal and compare fun", %{socket: socket} do
ref = push(socket, "doc", %{"query" => "subscription { ordinalWithCompare }"})

assert_reply(ref, :ok, %{subscriptionId: subscription_ref})

Subscription.publish(TestEndpoint, 1, ordinal_with_compare: "ordinal_with_compare_topic")

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinalWithCompare" => 1}}, subscriptionId: subscription_ref}
assert expected == push

Subscription.publish(TestEndpoint, 2, ordinal_with_compare: "ordinal_with_compare_topic")
refute_push("subscription:data", _)
# This message should not generate a notification because the compare function is old > new

Subscription.publish(TestEndpoint, 2, ordinal_with_compare: "ordinal_with_compare_topic")
# This message should generate a notification because the compare function result
# adds 1 to the ordinal

assert_push("subscription:data", push)
expected = %{result: %{data: %{"ordinalWithCompare" => 2}}, subscriptionId: subscription_ref}
assert expected == push
end

test "context changes are persisted across documents", %{socket: socket} do
ref =
push(socket, "doc", %{
Expand Down
25 changes: 25 additions & 0 deletions test/support/schema.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,30 @@ defmodule Schema do
{:error, "unauthorized"}
end
end

field :prime, :string do
config fn _, _ ->
{:ok,
topic: "prime_topic",
prime: fn _ ->
{:ok, ["prime1", "prime2"]}
end}
end
end

field :ordinal, :integer do
config fn _, _ ->
{:ok, topic: "ordinal_topic", ordinal: fn value -> value end}
end
end

field :ordinal_with_compare, :integer do
config fn _, _ ->
{:ok,
topic: "ordinal_with_compare_topic",
ordinal: fn value -> value end,
ordinal_compare: fn old, new -> {is_nil(old) || old > new, new + 1} end}
end
end
end
end