From 8d6a73b0c8caa6c39c0e414a25409e116d6243a7 Mon Sep 17 00:00:00 2001 From: VovaGula Date: Fri, 5 Aug 2022 18:57:50 +0300 Subject: [PATCH] feat: ability to run custom callback on timeout for Absinthe.Middleware.Async middleware Async middleware sends an exit signal that kills the request and causes a 500 on timeout, as it uses Task.await/2 under the hood. This PR adds the ability to pass on_timeout/1 callback function to the middleware and trigger it on timeout. By default, it stops the process current process with exit/1, as before. ```elixir field :time_consuming, :thing do resolve fn _, _, _ -> async( fn -> {:ok, long_time_consuming_function()} end, timeout: 7_000, on_timeout: fn _timeout -> {:error, "Failed to get a result"} end ) end end ``` --- lib/absinthe/middleware/async.ex | 55 +++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/lib/absinthe/middleware/async.ex b/lib/absinthe/middleware/async.ex index b56c95d57c..472a853051 100644 --- a/lib/absinthe/middleware/async.ex +++ b/lib/absinthe/middleware/async.ex @@ -17,6 +17,21 @@ defmodule Absinthe.Middleware.Async do end ``` + With timeout options: + ```elixir + field :time_consuming, :thing do + resolve fn _, _, _ -> + async( + fn -> {:ok, long_time_consuming_function()} end, + timeout: 7_000, + on_timeout: fn _timeout -> + {:error, "Failed to get a result"} + end + ) + end + end + ``` + Using the bare plugin API ```elixir field :time_consuming, :thing do @@ -50,30 +65,34 @@ defmodule Absinthe.Middleware.Async do # This function inserts additional middleware into the remaining middleware # stack for this field. On the next resolution pass, we need to `Task.await` the # task so we have actual data. Thus, we prepend this module to the middleware stack. - def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun) do + def call(%{state: :unresolved} = resolution, {fun, opts}) when is_function(fun) do task = Task.async(fn -> :telemetry.span([:absinthe, :middleware, :async, :task], %{}, fn -> {fun.(), %{}} end) end) - call(res, {task, opts}) + call(resolution, {task, opts}) end - def call(%{state: :unresolved} = res, {task, opts}) do + def call(%{state: :unresolved} = resolution, {task, opts}) do task_data = {task, opts} %{ - res + resolution | state: :suspended, - acc: Map.put(res.acc, __MODULE__, true), - middleware: [{__MODULE__, task_data} | res.middleware] + acc: Map.put(resolution.acc, __MODULE__, true), + middleware: [{__MODULE__, task_data} | resolution.middleware] } end - def call(%{state: :unresolved} = res, %Task{} = task), do: call(res, {task, []}) + def call(%{state: :unresolved} = resolution, %Task{} = task) do + call(resolution, {task, []}) + end # This is the clause that gets called on the second pass. There's very little # to do here. We just need to await the task started in the previous pass. + # It's also possible to pass on_timeout/1 callback function to handle the case, + # when time runs out before a message from the task is received. # # Finally, we apply the result to the resolution using a helper function that ensures # we handle the different tuple results. @@ -82,11 +101,25 @@ defmodule Absinthe.Middleware.Async do # If the result is an `{:ok, value} | {:error, reason}` tuple it will set # the state to `:resolved`, and if it is another middleware tuple it will # set the state to unresolved. - def call(%{state: :suspended} = res, {task, opts}) do - result = Task.await(task, opts[:timeout] || 30_000) + @default_timeout 30_000 + def call(%{state: :suspended} = resolution, {task, opts}) do + timeout = opts[:timeout] || @default_timeout + + result = + case Task.yield(task, timeout) || Task.shutdown(task) do + {:ok, result} -> + result + + nil -> + on_timeout = + Keyword.get(opts, :on_timeout, fn _timeout -> + exit({:timeout, {__MODULE__, :call, [task, timeout]}}) + end) + + on_timeout.(timeout) + end - res - |> Absinthe.Resolution.put_result(result) + Absinthe.Resolution.put_result(resolution, result) end # We must set the flag to false because if a previous resolution iteration