Skip to content

Commit

Permalink
Telemetry/use configuration based strategy for batch telemetry (#9)
Browse files Browse the repository at this point in the history
* Create skeleton module for Pacer.Config

* Setup first base case unit test for Pacer.Config.batch_telemetry_options/1

* Add unit test case showing batch_telemetry_options passed through at the module level

* Add unit case demonstrating behavior of generated Module.__config__/1 behavior when no config options are passed

* Allow batch_telemetry_options to be passed at the workflow level

* Add implementation for Config.batch_telemetry_options/1

* Add test case for when only global batch_telemetry_options are provided

* Add test case for module-level batch_telemetry_config

* Add unit test demonstrating behavior of module-level config overriding global config; clean up cached terms in persistent term between test runs

* Bump version one minor version up -- Backwards incompatible change coming in how telemetry is metadata is handled in batched resolvers

* Fix typo in moduledoc for Pacer.Config

* Change assertions on config_test functions to expect keyword lists instead of maps

* Add test case for mfa support on batch_telemetry_options global config

* Add unit test case capturing support for mfa style batch_telemetry_options on workflow modules directly

* Add unit test demonstrating expected behavior when batch_telemetry_options are set with mfa style args in both global and workflow-level config

* Rework batch_telemetry_options configuration to allow for mfa-style args

* Add test cases for fetch_batch_telemetry_options/1

* Add docs to batch telemetry option-related configuration functions

* Add unit test asserting that batched resolvers inject user-provided telemetery config into event metadata when config is a keyword list

* Clean up persistent term on all tests and add unit test case for batch telemetry metadata injection from MFA style config

* Inject user-provided telemetry metadata into [:pacer, :execute_vertex, ...] events fired inside of batched resolvers

* Add docs on Telemetry events and detail how users can provide their own metadata to inject into batched resolver telemetry events

* Set version to 0.1.2

* Move the setup/cleanup to run for entire test suite on workflow config
  • Loading branch information
zkayser authored Oct 21, 2023
1 parent 7b84135 commit d78926a
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 7 deletions.
68 changes: 68 additions & 0 deletions lib/config/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
defmodule Pacer.Config do
@moduledoc """
The #{inspect(__MODULE__)} module provides functions for
extracting user-provided configuration values specific
to Pacer.
"""

@doc """
Fetches configuration options for extending the metadata provided in the
`[:pacer, :execute_vertex, :start | :stop | :exception]` events for batched resolvers.
The configuration must be set under the key `:batch_telemetry_options` at the application
level (i.e., `Application.get_env(:pacer, :batch_telemetry_options)`) or when defining the
workflow itself (`use Pacer.Workflow, batch_telemetry_options: <opts>`).
The batch_telemetry_options defined by the user must be either:
- a keyword list, or
- a {module, function, args} mfa tuple; when invoked, this function must return a keyword list
The keyword list of values returned by the mfa-style config, or the hardcoded keyword list, is
fetched and converted into a map that gets merged into the telemetry event metadata for batched resolvers.
"""
@spec batch_telemetry_options(module()) :: keyword() | {module(), atom(), list()}
def batch_telemetry_options(workflow_module) do
case :persistent_term.get({__MODULE__, workflow_module, :batch_telemetry_options}, :unset) do
:unset -> fetch_and_write({workflow_module, :batch_telemetry_options})
config -> config
end
end

@doc """
Takes the batch_telemetry_options configuration, invoking mfa-style config if available,
and converts the batch_telemetry_options keyword list into a map that gets merged into
the metadata for the `[:pacer, :execute_vertex, :start | :stop | :exception]` events for
batched resolvers.
"""
@spec fetch_batch_telemetry_options(module()) :: map()
def fetch_batch_telemetry_options(workflow_module) do
case batch_telemetry_options(workflow_module) do
{mod, fun, args} -> Map.new(apply(mod, fun, args))
opts -> Map.new(opts)
end
end

@spec fetch_and_write({workflow_module, :batch_telemetry_options}) ::
keyword() | {module(), atom(), list()}
when workflow_module: module()
defp fetch_and_write({workflow_module, :batch_telemetry_options = key}) do
global_config = Application.get_env(:pacer, key) || []
module_config = workflow_module.__config__(key) || []

cond do
is_list(global_config) && is_list(module_config) ->
global_config
|> Keyword.merge(module_config)
|> tap(&:persistent_term.put({__MODULE__, workflow_module, key}, &1))

match?({_m, _f, _a}, module_config) || is_list(module_config) ->
tap(module_config, &:persistent_term.put({__MODULE__, workflow_module, key}, &1))

match?({_m, _f, _a}, global_config) || is_list(global_config) ->
tap(global_config, &:persistent_term.put({__MODULE__, workflow_module, key}, &1))

true ->
tap([], &:persistent_term.put({__MODULE__, workflow_module, key}, &1))
end
end
end
110 changes: 104 additions & 6 deletions lib/workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,93 @@ defmodule Pacer.Workflow do
The order fields are defined in within a `graph` definition does not matter. For example, if you have a field `:request_one` that depends
on another field `:request_two`, the fields can be declared in any order.
## Telemetry
Pacer provides two levels of granularity for workflow telemetry: one at the entire workflow level, and one at the resolver level.
For workflow execution, Pacer will trigger the following telemetry events:
- `[:pacer, :workflow, :start]`
- Measurements include: `%{system_time: integer(), monotonic_time: integer()}`
- Metadata provided: `%{telemetry_span_context: term(), workflow: module()}`, where the `workflow` key contains the module name for the workflow being executed
- `[:pacer, :workflow, :stop]`
- Measurements include: `%{duration: integer(), monotonic_time: integer()}`
- Metadata provided: `%{telemetry_span_context: term(), workflow: module()}`, where the `workflow` key contains the module name for the workflow being executed
- `[:pacer, :workflow, :exception]`
- Measurements include: `%{duration: integer(), monotonic_time: integer()}`
- Metadata provided: %{kind: :throw | :error | :exit, reason: term(), stacktrace: list(), telemetry_span_context: term(), workflow: module()}, where the `workflow` key contains the module name for the workflow being executed
At the resolver level, Pacer will trigger the following telemetry events:
- `[:pacer, :execute_vertex, :start]`
- Measurements and metadata similar to `:workflow` start event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed.
- `[:pacer, :execute_vertex, :stop]`
- Measurements and metadata similar to `:workflow` stop event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed.
- `[:pacer, :execute_vertex, :exception]`
- Measurements and metadata similar to `:workflow` exception event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed.
Additionally, for `[:pacer, :execute_vertex]` events fired on batched resolvers (which will run in parallel processes), users can provide their own metadata through configuration.
Users may provide either a keyword list of options which will be merged into the `:execute_vertex` event metadata, or an MFA `{mod, fun, args}` tuple that points to a function which
returns a keyword list that will be merged into the `:execute_vertex` event metadata.
There are two routes for configuring these telemetry options for batched resolvers: in the application environment using the `:pacer, :batch_telemetry_options` config key, or
on the individual workflow modules themselves by passing `:batch_telemetry_options` when invoking `use Pacer.Workflow`.
Configuration defined at the workflow module will override configuration defined in the application environment.
Here are a couple of examples:
### User-Provided Telemetry Metadata for Batched Resolvers in Applicaton Config
```elixir
# In config.exs (or whatever env config file you want to target):
config :pacer, :batch_telemetry_options, application_name: MyApp
## When you invoke a workflow with batched resolvers now, you will get `%{application_name: MyApp}` merged into your
## event metadata in the `[:pacer, :execute_vertex, :start | :stop | :exception]` events.
```
### User-Provided Telemetry Metadata for Batched Resolvers at the Workflow Level
```elixir
defmodule MyWorkflow do
use Pacer.Workflow, batch_telemetry_options: [extra_context: "some context from my application"]
graph do
field(:a)
batch :long_running_requests do
field(:b, dependencies: [:a], resolver: &Requests.trigger_b/1, default: nil)
field(:c, dependencies: [:a], resolver: &Requests.trigger_c/1, default: nil)
end
end
end
## Now when you invoke `Pacer.execute(MyWorkflow)`, you will get `%{extra_context: "some context from my application"}`
## merged into the metadata for the `[:pacer, :execute_vertex, :start | :stop | :exception]` events for fields `:b` and `:c`
```
Note that you can also provide an MFA tuple that points to a module/function that returns a keyword list of options to be
injected into the metadata on `:execute_vertex` telemetry events for batched resolvers. This allows users to execute code at runtime
to inject dynamic values into the metadata. Users may use this to inject things like span_context from the top-level workflow process
into the parallel processes that run the batch resolvers. This lets you propagate context from, i.e., a process dictionary at the top-level
into the sub-processes:
```elixir
defmodule MyApp.BatchOptions do
def inject_context do
[span_context: MyTracingLibrary.Tracer.current_context()]
end
end
## Use this function to inject span context by configuring it at the workflow level or in the application environment
## In config.exs:
config :pacer, :batch_telemetry_options, {MyApp.BatchOptions, :inject_context, []}
```
"""
alias Pacer.Config
alias Pacer.Workflow.Error
alias Pacer.Workflow.FieldNotSet
alias Pacer.Workflow.Options
Expand Down Expand Up @@ -317,6 +403,10 @@ defmodule Pacer.Workflow do
generate_docs?
)

batch_telemetry_options = Keyword.get(unquote(opts), :batch_telemetry_options, %{})

Module.put_attribute(__MODULE__, :pacer_batch_telemetry_options, batch_telemetry_options)

Module.register_attribute(__MODULE__, :pacer_docs, accumulate: true)
Module.register_attribute(__MODULE__, :pacer_graph_vertices, accumulate: true)
Module.register_attribute(__MODULE__, :pacer_field_to_batch_mapping, accumulate: false)
Expand Down Expand Up @@ -551,6 +641,9 @@ defmodule Pacer.Workflow do

defstruct Enum.reverse(@pacer_struct_fields)

def __config__(:batch_telemetry_options), do: @pacer_batch_telemetry_options
def __config__(_), do: nil

def __graph__(:fields), do: Enum.reverse(@pacer_fields)
def __graph__(:dependencies), do: Enum.reverse(@pacer_dependencies)
def __graph__(:batch_dependencies), do: Enum.reverse(@pacer_batch_dependencies)
Expand Down Expand Up @@ -940,6 +1033,8 @@ defmodule Pacer.Workflow do

parent_pid = self()

user_provided_metadata = Config.fetch_batch_telemetry_options(module)

resolvers
|> filter_guarded_resolvers(workflow)
|> Enum.map(fn {field, resolver} ->
Expand All @@ -951,18 +1046,21 @@ defmodule Pacer.Workflow do
end)
|> Task.async_stream(
fn {field, partial_workflow, resolver} ->
metadata = %{
field: field,
workflow: module,
parent_pid: parent_pid
}
metadata =
%{
field: field,
workflow: module,
parent_pid: parent_pid
}
|> Map.merge(user_provided_metadata)

try do
:telemetry.span(
[:pacer, :execute_vertex],
metadata,
fn ->
{{field, resolver.(partial_workflow)}, %{parent_pid: parent_pid}}
{{field, resolver.(partial_workflow)},
Map.merge(%{parent_pid: parent_pid}, user_provided_metadata)}
end
)
rescue
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Pacer.MixProject do
use Mix.Project

@name "Pacer"
@version "0.1.0"
@version "0.1.2"
@source_url "https://github.com/carsdotcom/pacer"

def project do
Expand Down
115 changes: 115 additions & 0 deletions test/config/config_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule Pacer.ConfigTest do
use ExUnit.Case, async: false

alias Pacer.ConfigTest.NoOptions
alias Pacer.Config

setup do
default = Application.get_env(:pacer, :batch_telemetry_options)

on_exit(fn ->
:persistent_term.erase({Config, NoOptions, :batch_telemetry_options})
:persistent_term.erase({Config, TestBatchConfig, :batch_telemetry_options})
Application.put_env(:pacer, :batch_telemetry_options, default)
end)

:ok
end

describe "batch_telemetry_options/1" do
defmodule NoOptions do
use Pacer.Workflow

graph do
field(:foo)
end
end

test "returns an empty map if no user-provided options are available" do
assert Config.batch_telemetry_options(NoOptions) == []
end

test "returns global batch_telemetry_options if no module-level options are provided" do
Application.put_env(:pacer, :batch_telemetry_options, foo: "bar")
assert Config.batch_telemetry_options(NoOptions) == [foo: "bar"]
end

test "accepts {module, function, args} tuples for batch_telemetry_options from global config" do
Application.put_env(:pacer, :batch_telemetry_options, {MyTelemetryOptions, :run, []})
assert Config.batch_telemetry_options(NoOptions) == {MyTelemetryOptions, :run, []}
end

defmodule TestBatchConfig do
use Pacer.Workflow, batch_telemetry_options: [batched: "config"]

graph do
field(:foo)
end
end

test "returns module-level options when provided" do
assert Config.batch_telemetry_options(TestBatchConfig) == [batched: "config"]
end

test "module-level batch_telemetry_options overrides global batch_telemetry_options" do
Application.put_env(:pacer, :batch_telemetry_options,
batched: "this value should be overridden"
)

assert Config.batch_telemetry_options(TestBatchConfig) == [batched: "config"]
end

defmodule TestConfigWithMFA do
use Pacer.Workflow, batch_telemetry_options: {__MODULE__, :batch_telemetry_opts, []}

graph do
field(:foo)
end
end

test "returns {module, function, args} options stored in module config" do
assert Config.batch_telemetry_options(TestConfigWithMFA) ==
{TestConfigWithMFA, :batch_telemetry_opts, []}
end

test "module config overrides global config when both are present and use {module, function, args} style config" do
Application.put_env(:pacer, :batch_telemetry_options, {PacerGlobal, :default_options, []})

assert Config.batch_telemetry_options(TestConfigWithMFA) ==
{TestConfigWithMFA, :batch_telemetry_opts, []}
end
end

describe "fetch_batch_telemetry_options/1" do
defmodule MyWorkflowExample do
use Pacer.Workflow

graph do
field(:foo)
end

def default_options do
[
foo: "bar",
baz: "quux"
]
end
end

test "invokes {module, fun, args} style config when present and converts the keyword list returned into a map" do
Application.put_env(
:pacer,
:batch_telemetry_options,
{MyWorkflowExample, :default_options, []}
)

assert Config.fetch_batch_telemetry_options(MyWorkflowExample) == %{foo: "bar", baz: "quux"}
end

test "converts keyword list style configuration into a map" do
Application.put_env(:pacer, :batch_telemetry_options, foo: "bar", baz: "quux")

assert Config.fetch_batch_telemetry_options(MyWorkflowExample) == %{foo: "bar", baz: "quux"}
end
end
end
Loading

0 comments on commit d78926a

Please sign in to comment.