Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription priming/ordinals #1168

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions lib/absinthe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,20 @@ defmodule Absinthe do
%{message: String.t()}
| %{message: String.t(), locations: [%{line: pos_integer, column: integer}]}

@type continuations_t :: nil | [Absinthe.Blueprint.Continuation.t()]

@type ordinal_fun :: (term() -> term())

@type ordinal_compare_fun :: (term(), term() -> {boolean(), term()})

@type result_t ::
%{data: nil | result_selection_t}
| %{data: nil | result_selection_t, errors: [result_error_t]}
%{
required(:data) => nil | result_selection_t,
optional(:ordinal_fun) => ordinal_fun(),
optional(:ordinal_compare_fun) => ordinal_compare_fun(),
optional(:continuations) => continuations_t,
optional(:errors) => [result_error_t]
}
| %{errors: [result_error_t]}

@type pipeline_modifier_fun :: (Absinthe.Pipeline.t(), Keyword.t() -> Absinthe.Pipeline.t())
Expand Down Expand Up @@ -98,7 +109,8 @@ defmodule Absinthe do
pipeline_modifier: pipeline_modifier_fun()
]

@type run_result :: {:ok, result_t} | {:error, String.t()}
@type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()}
@type continue_result :: run_result | :no_more_results

@spec run(
binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(),
Expand All @@ -113,7 +125,26 @@ defmodule Absinthe do
|> Absinthe.Pipeline.for_document(options)
|> pipeline_modifier.(options)

case Absinthe.Pipeline.run(document, pipeline) do
document
|> Absinthe.Pipeline.run(pipeline)
|> build_result()
end

@spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result
def continue(continuations) do
continuations
|> Absinthe.Pipeline.continue()
|> build_result()
end

defp build_result(output) do
case output do
{:ok, %{result: :no_more_results}, _phases} ->
:no_more_results

{:ok, %{result: %{continuations: c} = result}, _phases} when c != [] ->
{:more, result}

{:ok, %{result: result}, _phases} ->
{:ok, result}

Expand All @@ -137,6 +168,7 @@ defmodule Absinthe do
def run!(input, schema, options \\ []) do
case run(input, schema, options) do
{:ok, result} -> result
{:more, result} -> result
{:error, err} -> raise ExecutionError, message: err
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/absinthe/blueprint/continuation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Absinthe.Blueprint.Continuation do
@moduledoc false

# Continuations allow further resolutions after the initial result is
# returned

alias Absinthe.Pipeline

defstruct [
:phase_input,
:pipeline
]

@type t :: %__MODULE__{
phase_input: Pipeline.data_t(),
pipeline: Pipeline.t()
}
end
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/result/list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ defmodule Absinthe.Blueprint.Result.List do
:values,
errors: [],
flags: %{},
extensions: %{}
extensions: %{},
continuations: []
]

@type t :: %__MODULE__{
emitter: Blueprint.Document.Field.t(),
values: [Blueprint.Execution.node_t()],
errors: [Phase.Error.t()],
flags: Blueprint.flags_t(),
extensions: %{any => any}
extensions: %{any => any},
continuations: [Blueprint.Continuation.t()]
}
end
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/result/object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ defmodule Absinthe.Blueprint.Result.Object do
:fields,
errors: [],
flags: %{},
extensions: %{}
extensions: %{},
continuations: []
]

@type t :: %__MODULE__{
emitter: Blueprint.Document.Field.t(),
fields: [Blueprint.Execution.node_t()],
errors: [Phase.Error.t()],
flags: Blueprint.flags_t(),
extensions: %{any => any}
extensions: %{any => any},
continuations: [Blueprint.Continuation.t()]
}
end
9 changes: 8 additions & 1 deletion lib/absinthe/phase/document/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ defmodule Absinthe.Phase.Document.Result do
{:validation_failed, errors}
end

format_result(result, opts)
result
|> format_result(opts)
|> maybe_add_continuations(blueprint.execution.result)
end

defp format_result({:ok, {data, []}}, _) do
Expand Down Expand Up @@ -156,4 +158,9 @@ defmodule Absinthe.Phase.Document.Result do
end

defp format_location(_), do: []

defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [],
do: Map.put(result, :continuations, continuations)

defp maybe_add_continuations(result, _), do: result
end
54 changes: 54 additions & 0 deletions lib/absinthe/phase/subscription/get_ordinal.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Absinthe.Phase.Subscription.GetOrdinal do
use Absinthe.Phase

alias Absinthe.Phase.Subscription.SubscribeSelf

@moduledoc false

alias Absinthe.Blueprint

@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
def run(blueprint, _options \\ []) do
with %{type: :subscription, selections: [field]} <- Blueprint.current_operation(blueprint),
{:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint),
{_, ordinal_fun} when is_function(ordinal_fun, 1) <- {:ordinal_fun, config[:ordinal]},
{_, ordinal_compare_fun} when is_function(ordinal_compare_fun, 2) <-
{:ordinal_compare_fun,
Keyword.get(config, :ordinal_compare, &default_ordinal_compare/2)} do
ordinal = ordinal_fun.(blueprint.execution.root_value)

result =
blueprint.result
|> Map.put(:ordinal, ordinal)
|> Map.put(:ordinal_compare_fun, ordinal_compare_fun)

{:ok, %{blueprint | result: result}}
else
{:ordinal_fun, f} when is_function(f) ->
IO.write(
:stderr,
"Ordinal function must be 1-arity"
)

{:ok, blueprint}

{:ordinal_compare_fun, f} when is_function(f) ->
IO.write(
:stderr,
"Ordinal compare function must be 2-arity"
)

{:ok, blueprint}

_ ->
{:ok, blueprint}
end
end

defp default_ordinal_compare(nil, new_ordinal), do: {true, new_ordinal}

defp default_ordinal_compare(old_ordinal, new_ordinal) when old_ordinal < new_ordinal,
do: {true, new_ordinal}

defp default_ordinal_compare(old_ordinal, _new_ordinal), do: {false, old_ordinal}
end
46 changes: 46 additions & 0 deletions lib/absinthe/phase/subscription/prime.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Absinthe.Phase.Subscription.Prime do
@moduledoc false

alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t()
def run(blueprint, prime_result: prime_result) do
{:ok, put_in(blueprint.execution.root_value, prime_result)}
end

def run(blueprint, prime_fun: prime_fun, resolution_options: options) do
{:ok, prime_results} = prime_fun.(blueprint.execution)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it the purpose of the {:ok, } tuple wrapping for now is to leave room for other return tuples in the future? Is it conceivable that prime would return some non :ok value?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think we should have the prime function return the tuple, similar to resolutions. Otherwise you could conceivably have a {:ok, {:error, :my_error_state}} returned here or something equally wonky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was mostly just to keep it consistent with return types in the other resolution callback functions. You're right that it doesn't serve much purpose at the moment, but I think at the time I was thinking we may want to allow error conditions to be propagated up like in resolutions. I'm not quite sure how that would be presented at a graphQL level though.


case prime_results do
[first | rest] ->
blueprint = put_in(blueprint.execution.root_value, first)
blueprint = maybe_add_continuations(blueprint, rest, options)
{:ok, blueprint}

[] ->
blueprint = put_in(blueprint.result, :no_more_results)
{:replace, blueprint, []}
end
end

defp maybe_add_continuations(blueprint, [], _options), do: blueprint

defp maybe_add_continuations(blueprint, remaining_results, options) do
continuations =
Enum.map(
remaining_results,
&%Continuation{
phase_input: blueprint,
pipeline: [
{__MODULE__, [prime_result: &1]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}
)

put_in(blueprint.result, %{continuations: continuations})
end
end
34 changes: 32 additions & 2 deletions lib/absinthe/phase/subscription/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,40 @@ defmodule Absinthe.Phase.Subscription.Result do
# subscription

alias Absinthe.Blueprint
alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
def run(blueprint, topic: topic) do
result = %{"subscribed" => topic}
def run(blueprint, options) do
bernardd marked this conversation as resolved.
Show resolved Hide resolved
topic = Keyword.fetch!(options, :topic)
prime = Keyword.get(options, :prime)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on a more descriptive name like after_connect or initial_response or initial or something similar?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm fine with prime, just there may be something more apt/intuitive here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prime was suggested by @benwilson512 and I kind of like it since it's both terse and descriptive. after_connect doesn't really fit because the "connection" has nothing to do with this. It's after a successful subscription that it's sent. The other two options I don't really have any problem with but nor do I think they really have anything to recommend them over prime.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. after_subscribe also viable. But either way, if it LGTBen, then it LGTM. LOL.


result = maybe_add_prime(%{"subscribed" => topic}, prime, blueprint, options)

{:ok, put_in(blueprint.result, result)}
end

def maybe_add_prime(result, nil, _blueprint, _options), do: result

def maybe_add_prime(result, prime_fun, blueprint, options) when is_function(prime_fun, 1) do
continuation = %Continuation{
phase_input: blueprint,
pipeline: [
{Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}

Map.put(result, :continuations, [continuation])
end

def maybe_add_prime(_result, prime_fun, _blueprint, _options) do
raise """
Invalid prime function. Must be a function of arity 1.

#{inspect(prime_fun)}
"""
end
end
12 changes: 6 additions & 6 deletions lib/absinthe/phase/subscription/subscribe_self.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
Absinthe.Subscription.subscribe(pubsub, field_keys, subscription_id, blueprint)

pipeline = [
{Phase.Subscription.Result, topic: subscription_id},
{Phase.Subscription.Result, topic: subscription_id, prime: config[:prime]},
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
]

Expand All @@ -46,11 +46,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
end
end

defp get_config(
%{schema_node: schema_node, argument_data: argument_data} = field,
context,
blueprint
) do
def get_config(
%{schema_node: schema_node, argument_data: argument_data} = field,
context,
blueprint
) do
name = schema_node.identifier

config =
Expand Down
32 changes: 26 additions & 6 deletions lib/absinthe/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,44 @@ defmodule Absinthe.Pipeline do
* See `Absinthe.Schema` on adjusting the schema pipeline for schema manipulation.
"""

alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@type data_t :: any

@type run_result_t ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}

@type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()}

@type t :: [phase_config_t | [phase_config_t]]

@spec run(data_t, t) ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}
@spec run(data_t, t) :: run_result_t
def run(input, pipeline) do
pipeline
|> List.flatten()
|> run_phase(input)
end

@spec continue([Continuation.t()]) :: run_result_t
def continue([continuation | rest]) do
result = run_phase(continuation.pipeline, continuation.phase_input)

case result do
{:ok, blueprint, phases} when rest == [] ->
{:ok, blueprint, phases}

{:ok, blueprint, phases} ->
bp_result = Map.put(blueprint.result, :continuations, rest)
blueprint = Map.put(blueprint, :result, bp_result)
{:ok, blueprint, phases}

error ->
error
end
end

@defaults [
adapter: Absinthe.Adapter.LanguageConventions,
operation_name: nil,
Expand Down Expand Up @@ -116,6 +137,7 @@ defmodule Absinthe.Pipeline do
# Execution
{Phase.Subscription.SubscribeSelf, options},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
# Format Result
Phase.Document.Result,
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
Expand Down Expand Up @@ -393,9 +415,7 @@ defmodule Absinthe.Pipeline do
end)
end

@spec run_phase(t, data_t, [Phase.t()]) ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}
@spec run_phase(t, data_t, [Phase.t()]) :: run_result_t
def run_phase(pipeline, input, done \\ [])

def run_phase([], input, done) do
Expand Down
2 changes: 2 additions & 0 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ defmodule Absinthe.Subscription do

@type subscription_field_spec :: {atom, term | (term -> term)}

@type prime_fun :: (Absinthe.Resolution.t() -> {:ok, [map()]})

@doc """
Publish a mutation

Expand Down
Loading
Loading