From d78926a199d3f2ccfd71d03aa332a983a6c53504 Mon Sep 17 00:00:00 2001 From: Zack Kayser Date: Fri, 20 Oct 2023 21:20:46 -0400 Subject: [PATCH] Telemetry/use configuration based strategy for batch telemetry (#9) * Create skeleton module for Pacer.Config * Setup first base case unit test for Pacer.Config.batch_telemetry_options/1 * Add unit test case showing batch_telemetry_options passed through at the module level * Add unit case demonstrating behavior of generated Module.__config__/1 behavior when no config options are passed * Allow batch_telemetry_options to be passed at the workflow level * Add implementation for Config.batch_telemetry_options/1 * Add test case for when only global batch_telemetry_options are provided * Add test case for module-level batch_telemetry_config * Add unit test demonstrating behavior of module-level config overriding global config; clean up cached terms in persistent term between test runs * Bump version one minor version up -- Backwards incompatible change coming in how telemetry is metadata is handled in batched resolvers * Fix typo in moduledoc for Pacer.Config * Change assertions on config_test functions to expect keyword lists instead of maps * Add test case for mfa support on batch_telemetry_options global config * Add unit test case capturing support for mfa style batch_telemetry_options on workflow modules directly * Add unit test demonstrating expected behavior when batch_telemetry_options are set with mfa style args in both global and workflow-level config * Rework batch_telemetry_options configuration to allow for mfa-style args * Add test cases for fetch_batch_telemetry_options/1 * Add docs to batch telemetry option-related configuration functions * Add unit test asserting that batched resolvers inject user-provided telemetery config into event metadata when config is a keyword list * Clean up persistent term on all tests and add unit test case for batch telemetry metadata injection from MFA style config * Inject user-provided telemetry metadata into [:pacer, :execute_vertex, ...] events fired inside of batched resolvers * Add docs on Telemetry events and detail how users can provide their own metadata to inject into batched resolver telemetry events * Set version to 0.1.2 * Move the setup/cleanup to run for entire test suite on workflow config --- lib/config/config.ex | 68 +++++++++++++++++++++ lib/workflow.ex | 110 ++++++++++++++++++++++++++++++++-- mix.exs | 2 +- test/config/config_test.exs | 115 ++++++++++++++++++++++++++++++++++++ test/workflow_test.exs | 89 ++++++++++++++++++++++++++++ 5 files changed, 377 insertions(+), 7 deletions(-) create mode 100644 lib/config/config.ex create mode 100644 test/config/config_test.exs diff --git a/lib/config/config.ex b/lib/config/config.ex new file mode 100644 index 0000000..1fa90e1 --- /dev/null +++ b/lib/config/config.ex @@ -0,0 +1,68 @@ +defmodule Pacer.Config do + @moduledoc """ + The #{inspect(__MODULE__)} module provides functions for + extracting user-provided configuration values specific + to Pacer. + """ + + @doc """ + Fetches configuration options for extending the metadata provided in the + `[:pacer, :execute_vertex, :start | :stop | :exception]` events for batched resolvers. + + The configuration must be set under the key `:batch_telemetry_options` at the application + level (i.e., `Application.get_env(:pacer, :batch_telemetry_options)`) or when defining the + workflow itself (`use Pacer.Workflow, batch_telemetry_options: `). + + The batch_telemetry_options defined by the user must be either: + - a keyword list, or + - a {module, function, args} mfa tuple; when invoked, this function must return a keyword list + + The keyword list of values returned by the mfa-style config, or the hardcoded keyword list, is + fetched and converted into a map that gets merged into the telemetry event metadata for batched resolvers. + """ + @spec batch_telemetry_options(module()) :: keyword() | {module(), atom(), list()} + def batch_telemetry_options(workflow_module) do + case :persistent_term.get({__MODULE__, workflow_module, :batch_telemetry_options}, :unset) do + :unset -> fetch_and_write({workflow_module, :batch_telemetry_options}) + config -> config + end + end + + @doc """ + Takes the batch_telemetry_options configuration, invoking mfa-style config if available, + and converts the batch_telemetry_options keyword list into a map that gets merged into + the metadata for the `[:pacer, :execute_vertex, :start | :stop | :exception]` events for + batched resolvers. + """ + @spec fetch_batch_telemetry_options(module()) :: map() + def fetch_batch_telemetry_options(workflow_module) do + case batch_telemetry_options(workflow_module) do + {mod, fun, args} -> Map.new(apply(mod, fun, args)) + opts -> Map.new(opts) + end + end + + @spec fetch_and_write({workflow_module, :batch_telemetry_options}) :: + keyword() | {module(), atom(), list()} + when workflow_module: module() + defp fetch_and_write({workflow_module, :batch_telemetry_options = key}) do + global_config = Application.get_env(:pacer, key) || [] + module_config = workflow_module.__config__(key) || [] + + cond do + is_list(global_config) && is_list(module_config) -> + global_config + |> Keyword.merge(module_config) + |> tap(&:persistent_term.put({__MODULE__, workflow_module, key}, &1)) + + match?({_m, _f, _a}, module_config) || is_list(module_config) -> + tap(module_config, &:persistent_term.put({__MODULE__, workflow_module, key}, &1)) + + match?({_m, _f, _a}, global_config) || is_list(global_config) -> + tap(global_config, &:persistent_term.put({__MODULE__, workflow_module, key}, &1)) + + true -> + tap([], &:persistent_term.put({__MODULE__, workflow_module, key}, &1)) + end + end +end diff --git a/lib/workflow.ex b/lib/workflow.ex index e7fc42f..5ef861a 100644 --- a/lib/workflow.ex +++ b/lib/workflow.ex @@ -260,7 +260,93 @@ defmodule Pacer.Workflow do The order fields are defined in within a `graph` definition does not matter. For example, if you have a field `:request_one` that depends on another field `:request_two`, the fields can be declared in any order. + + ## Telemetry + + Pacer provides two levels of granularity for workflow telemetry: one at the entire workflow level, and one at the resolver level. + + For workflow execution, Pacer will trigger the following telemetry events: + + - `[:pacer, :workflow, :start]` + - Measurements include: `%{system_time: integer(), monotonic_time: integer()}` + - Metadata provided: `%{telemetry_span_context: term(), workflow: module()}`, where the `workflow` key contains the module name for the workflow being executed + - `[:pacer, :workflow, :stop]` + - Measurements include: `%{duration: integer(), monotonic_time: integer()}` + - Metadata provided: `%{telemetry_span_context: term(), workflow: module()}`, where the `workflow` key contains the module name for the workflow being executed + - `[:pacer, :workflow, :exception]` + - Measurements include: `%{duration: integer(), monotonic_time: integer()}` + - Metadata provided: %{kind: :throw | :error | :exit, reason: term(), stacktrace: list(), telemetry_span_context: term(), workflow: module()}, where the `workflow` key contains the module name for the workflow being executed + + At the resolver level, Pacer will trigger the following telemetry events: + + - `[:pacer, :execute_vertex, :start]` + - Measurements and metadata similar to `:workflow` start event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed. + - `[:pacer, :execute_vertex, :stop]` + - Measurements and metadata similar to `:workflow` stop event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed. + - `[:pacer, :execute_vertex, :exception]` + - Measurements and metadata similar to `:workflow` exception event, with the addition of the `%{field: atom()}` value passed in metadata. The `field` is the name of the field for which the resolver is being executed. + + Additionally, for `[:pacer, :execute_vertex]` events fired on batched resolvers (which will run in parallel processes), users can provide their own metadata through configuration. + + Users may provide either a keyword list of options which will be merged into the `:execute_vertex` event metadata, or an MFA `{mod, fun, args}` tuple that points to a function which + returns a keyword list that will be merged into the `:execute_vertex` event metadata. + + There are two routes for configuring these telemetry options for batched resolvers: in the application environment using the `:pacer, :batch_telemetry_options` config key, or + on the individual workflow modules themselves by passing `:batch_telemetry_options` when invoking `use Pacer.Workflow`. + Configuration defined at the workflow module will override configuration defined in the application environment. + + Here are a couple of examples: + + ### User-Provided Telemetry Metadata for Batched Resolvers in Applicaton Config + ```elixir + # In config.exs (or whatever env config file you want to target): + + config :pacer, :batch_telemetry_options, application_name: MyApp + + ## When you invoke a workflow with batched resolvers now, you will get `%{application_name: MyApp}` merged into your + ## event metadata in the `[:pacer, :execute_vertex, :start | :stop | :exception]` events. + ``` + + ### User-Provided Telemetry Metadata for Batched Resolvers at the Workflow Level + ```elixir + defmodule MyWorkflow do + use Pacer.Workflow, batch_telemetry_options: [extra_context: "some context from my application"] + + graph do + field(:a) + + batch :long_running_requests do + field(:b, dependencies: [:a], resolver: &Requests.trigger_b/1, default: nil) + field(:c, dependencies: [:a], resolver: &Requests.trigger_c/1, default: nil) + end + end + end + + ## Now when you invoke `Pacer.execute(MyWorkflow)`, you will get `%{extra_context: "some context from my application"}` + ## merged into the metadata for the `[:pacer, :execute_vertex, :start | :stop | :exception]` events for fields `:b` and `:c` + ``` + + Note that you can also provide an MFA tuple that points to a module/function that returns a keyword list of options to be + injected into the metadata on `:execute_vertex` telemetry events for batched resolvers. This allows users to execute code at runtime + to inject dynamic values into the metadata. Users may use this to inject things like span_context from the top-level workflow process + into the parallel processes that run the batch resolvers. This lets you propagate context from, i.e., a process dictionary at the top-level + into the sub-processes: + + ```elixir + defmodule MyApp.BatchOptions do + def inject_context do + [span_context: MyTracingLibrary.Tracer.current_context()] + end + end + + ## Use this function to inject span context by configuring it at the workflow level or in the application environment + + ## In config.exs: + + config :pacer, :batch_telemetry_options, {MyApp.BatchOptions, :inject_context, []} + ``` """ + alias Pacer.Config alias Pacer.Workflow.Error alias Pacer.Workflow.FieldNotSet alias Pacer.Workflow.Options @@ -317,6 +403,10 @@ defmodule Pacer.Workflow do generate_docs? ) + batch_telemetry_options = Keyword.get(unquote(opts), :batch_telemetry_options, %{}) + + Module.put_attribute(__MODULE__, :pacer_batch_telemetry_options, batch_telemetry_options) + Module.register_attribute(__MODULE__, :pacer_docs, accumulate: true) Module.register_attribute(__MODULE__, :pacer_graph_vertices, accumulate: true) Module.register_attribute(__MODULE__, :pacer_field_to_batch_mapping, accumulate: false) @@ -551,6 +641,9 @@ defmodule Pacer.Workflow do defstruct Enum.reverse(@pacer_struct_fields) + def __config__(:batch_telemetry_options), do: @pacer_batch_telemetry_options + def __config__(_), do: nil + def __graph__(:fields), do: Enum.reverse(@pacer_fields) def __graph__(:dependencies), do: Enum.reverse(@pacer_dependencies) def __graph__(:batch_dependencies), do: Enum.reverse(@pacer_batch_dependencies) @@ -940,6 +1033,8 @@ defmodule Pacer.Workflow do parent_pid = self() + user_provided_metadata = Config.fetch_batch_telemetry_options(module) + resolvers |> filter_guarded_resolvers(workflow) |> Enum.map(fn {field, resolver} -> @@ -951,18 +1046,21 @@ defmodule Pacer.Workflow do end) |> Task.async_stream( fn {field, partial_workflow, resolver} -> - metadata = %{ - field: field, - workflow: module, - parent_pid: parent_pid - } + metadata = + %{ + field: field, + workflow: module, + parent_pid: parent_pid + } + |> Map.merge(user_provided_metadata) try do :telemetry.span( [:pacer, :execute_vertex], metadata, fn -> - {{field, resolver.(partial_workflow)}, %{parent_pid: parent_pid}} + {{field, resolver.(partial_workflow)}, + Map.merge(%{parent_pid: parent_pid}, user_provided_metadata)} end ) rescue diff --git a/mix.exs b/mix.exs index 863f4be..a7ed652 100644 --- a/mix.exs +++ b/mix.exs @@ -2,7 +2,7 @@ defmodule Pacer.MixProject do use Mix.Project @name "Pacer" - @version "0.1.0" + @version "0.1.2" @source_url "https://github.com/carsdotcom/pacer" def project do diff --git a/test/config/config_test.exs b/test/config/config_test.exs new file mode 100644 index 0000000..6b4b708 --- /dev/null +++ b/test/config/config_test.exs @@ -0,0 +1,115 @@ +defmodule Pacer.ConfigTest do + use ExUnit.Case, async: false + + alias Pacer.ConfigTest.NoOptions + alias Pacer.Config + + setup do + default = Application.get_env(:pacer, :batch_telemetry_options) + + on_exit(fn -> + :persistent_term.erase({Config, NoOptions, :batch_telemetry_options}) + :persistent_term.erase({Config, TestBatchConfig, :batch_telemetry_options}) + Application.put_env(:pacer, :batch_telemetry_options, default) + end) + + :ok + end + + describe "batch_telemetry_options/1" do + defmodule NoOptions do + use Pacer.Workflow + + graph do + field(:foo) + end + end + + test "returns an empty map if no user-provided options are available" do + assert Config.batch_telemetry_options(NoOptions) == [] + end + + test "returns global batch_telemetry_options if no module-level options are provided" do + Application.put_env(:pacer, :batch_telemetry_options, foo: "bar") + assert Config.batch_telemetry_options(NoOptions) == [foo: "bar"] + end + + test "accepts {module, function, args} tuples for batch_telemetry_options from global config" do + Application.put_env(:pacer, :batch_telemetry_options, {MyTelemetryOptions, :run, []}) + assert Config.batch_telemetry_options(NoOptions) == {MyTelemetryOptions, :run, []} + end + + defmodule TestBatchConfig do + use Pacer.Workflow, batch_telemetry_options: [batched: "config"] + + graph do + field(:foo) + end + end + + test "returns module-level options when provided" do + assert Config.batch_telemetry_options(TestBatchConfig) == [batched: "config"] + end + + test "module-level batch_telemetry_options overrides global batch_telemetry_options" do + Application.put_env(:pacer, :batch_telemetry_options, + batched: "this value should be overridden" + ) + + assert Config.batch_telemetry_options(TestBatchConfig) == [batched: "config"] + end + + defmodule TestConfigWithMFA do + use Pacer.Workflow, batch_telemetry_options: {__MODULE__, :batch_telemetry_opts, []} + + graph do + field(:foo) + end + end + + test "returns {module, function, args} options stored in module config" do + assert Config.batch_telemetry_options(TestConfigWithMFA) == + {TestConfigWithMFA, :batch_telemetry_opts, []} + end + + test "module config overrides global config when both are present and use {module, function, args} style config" do + Application.put_env(:pacer, :batch_telemetry_options, {PacerGlobal, :default_options, []}) + + assert Config.batch_telemetry_options(TestConfigWithMFA) == + {TestConfigWithMFA, :batch_telemetry_opts, []} + end + end + + describe "fetch_batch_telemetry_options/1" do + defmodule MyWorkflowExample do + use Pacer.Workflow + + graph do + field(:foo) + end + + def default_options do + [ + foo: "bar", + baz: "quux" + ] + end + end + + test "invokes {module, fun, args} style config when present and converts the keyword list returned into a map" do + Application.put_env( + :pacer, + :batch_telemetry_options, + {MyWorkflowExample, :default_options, []} + ) + + assert Config.fetch_batch_telemetry_options(MyWorkflowExample) == %{foo: "bar", baz: "quux"} + end + + test "converts keyword list style configuration into a map" do + Application.put_env(:pacer, :batch_telemetry_options, foo: "bar", baz: "quux") + + assert Config.fetch_batch_telemetry_options(MyWorkflowExample) == %{foo: "bar", baz: "quux"} + end + end +end diff --git a/test/workflow_test.exs b/test/workflow_test.exs index 43ec06f..ddcd44a 100644 --- a/test/workflow_test.exs +++ b/test/workflow_test.exs @@ -57,6 +57,14 @@ defmodule Pacer.WorkflowTest do end """ + setup do + on_exit(fn -> + :persistent_term.erase({Pacer.Config, TestGraph, :batch_telemetry_options}) + end) + + :ok + end + describe "telemetry" do test "execute/1 emits a [:pacer, :workflow, :start] and [:pacer, :workflow, :stop] event" do ref = @@ -91,6 +99,59 @@ defmodule Pacer.WorkflowTest do assert_received {[:pacer, :workflow, :exception], ^ref, _, %{workflow: RaisingWorkflow}} end + + test "batch resolvers inject user-provided telemetry config into metadata" do + starting_config = Application.get_env(:pacer, :batch_telemetry_options) + + on_exit(fn -> + Application.put_env(:pacer, :batch_telemetry_options, starting_config) + end) + + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:pacer, :execute_vertex, :start], + [:pacer, :execute_vertex, :stop] + ]) + + telemetry_options = [span_context: :rand.uniform()] + Application.put_env(:pacer, :batch_telemetry_options, telemetry_options) + + Pacer.Workflow.execute(TestGraph) + + assert_receive {[:pacer, :execute_vertex, :start], ^ref, _, %{span_context: _}} + assert_receive {[:pacer, :execute_vertex, :stop], ^ref, _, %{span_context: _}} + end + + defmodule TestBatchConfigProvider do + def telemetry_options do + [span_context: :rand.uniform()] + end + end + + test "batch resolvers inject user-provided telemetry config into metadata when configured to use an MFA returning a keyword list" do + starting_config = Application.get_env(:pacer, :batch_telemetry_options) + + on_exit(fn -> + Application.put_env(:pacer, :batch_telemetry_options, starting_config) + end) + + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:pacer, :execute_vertex, :start], + [:pacer, :execute_vertex, :stop] + ]) + + Application.put_env( + :pacer, + :batch_telemetry_options, + {TestBatchConfigProvider, :telemetry_options, []} + ) + + Pacer.Workflow.execute(TestGraph) + + assert_receive {[:pacer, :execute_vertex, :start], ^ref, _, %{span_context: _}} + assert_receive {[:pacer, :execute_vertex, :stop], ^ref, _, %{span_context: _}} + end end test "graph metadata" do @@ -242,6 +303,34 @@ defmodule Pacer.WorkflowTest do end end + describe "workflow config" do + defmodule WorkflowWithBatchOptions do + use Pacer.Workflow, batch_telemetry_options: %{some_options: "foo"} + + graph do + field(:bar) + end + end + + test "allows batch_telemetry_config option to be passed" do + assert WorkflowWithBatchOptions.__config__(:batch_telemetry_options) == %{ + some_options: "foo" + } + end + + defmodule WorkflowWithNoConfigOptions do + use Pacer.Workflow + + graph do + field(:foo) + end + end + + test "returns nil for missing or non-existent config values" do + assert is_nil(WorkflowWithNoConfigOptions.__config__(:foo)) + end + end + describe "graph validations" do test "options validations" do module = """