Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Better span events serialization when supported #4279

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .github/forced-tests-list.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
{

"DEFAULT":
[
"tests/test_span_events.py::Test_SpanEvents_WithAgentSupport::setup_v04_v07_default_format"
],
"AGENT_NOT_SUPPORTING_SPAN_EVENTS":
[
"tests/test_span_events.py"
],
"PARAMETRIC":
[
"tests/parametric/test_span_events.py"
]
}
8 changes: 7 additions & 1 deletion lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
require_relative '../../di/component'
require_relative '../crashtracking/component'

require_relative '../environment/agent_info'

module Datadog
module Core
module Configuration
Expand Down Expand Up @@ -85,7 +87,8 @@ def build_crashtracker(settings, agent_settings, logger:)
:tracer,
:crashtracker,
:dynamic_instrumentation,
:appsec
:appsec,
:agent_info

def initialize(settings)
@logger = self.class.build_logger(settings)
Expand All @@ -96,6 +99,9 @@ def initialize(settings)
# the Core resolver from within your product/component's namespace.
agent_settings = AgentSettingsResolver.call(settings, logger: @logger)

# Exposes agent capability information for detection by any components
@agent_info = Core::Environment::AgentInfo.new(agent_settings)

@telemetry = self.class.build_telemetry(settings, agent_settings, @logger)

@remote = Remote::Component.build(settings, agent_settings, telemetry: telemetry)
Expand Down
14 changes: 14 additions & 0 deletions lib/datadog/core/encoding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ def join(encoded_elements)
def encode(_)
raise NotImplementedError
end

# Deserializes a value serialized with {#encode}.
# This method is used for debugging purposes.
def decode(_)
raise NotImplementedError
end
end

# Encoder for the JSON format
Expand All @@ -41,6 +47,10 @@ def encode(obj)
JSON.dump(obj)
end

def decode(obj)
JSON.parse(obj)
end

def join(encoded_data)
"[#{encoded_data.join(',')}]"
end
Expand All @@ -62,6 +72,10 @@ def encode(obj)
MessagePack.pack(obj)
end

def decode(obj)
MessagePack.unpack(obj)
end

def join(encoded_data)
packer = MessagePack::Packer.new
packer.write_array_header(encoded_data.size)
Expand Down
77 changes: 77 additions & 0 deletions lib/datadog/core/environment/agent_info.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# frozen_string_literal: true

module Datadog
module Core
module Environment
# Retrieves the agent's `/info` endpoint data.
# This data can be used to determine the capabilities of the local Datadog agent.
#
# @example Example response payload
# {
# "version" : "7.57.2",
# "git_commit" : "38ba0c7",
# "endpoints" : [ "/v0.4/traces", "/v0.4/services", "/v0.7/traces", "/v0.7/config" ],
# "client_drop_p0s" : true,
# "span_meta_structs" : true,
# "long_running_spans" : true,
# "evp_proxy_allowed_headers" : [ "Content-Type", "Accept-Encoding", "Content-Encoding", "User-Agent" ],
# "config" : {
# "default_env" : "none",
# "target_tps" : 10,
# "max_eps" : 200,
# "receiver_port" : 8126,
# "receiver_socket" : "/var/run/datadog/apm.socket",
# "connection_limit" : 0,
# "receiver_timeout" : 0,
# "max_request_bytes" : 26214400,
# "statsd_port" : 8125,
# "analyzed_spans_by_service" : { },
# "obfuscation" : {
# "elastic_search" : true,
# "mongo" : true,
# "sql_exec_plan" : false,
# "sql_exec_plan_normalize" : false,
# "http" : {
# "remove_query_string" : false,
# "remove_path_digits" : false
# },
# "remove_stack_traces" : false,
# "redis" : {
# "Enabled" : true,
# "RemoveAllArgs" : false
# },
# "memcached" : {
# "Enabled" : true,
# "KeepCommand" : false
# }
# }
# },
# "peer_tags" : null
# }
#
# @see https://github.com/DataDog/datadog-agent/blob/f07df0a3c1fca0c83b5a15f553bd994091b0c8ac/pkg/trace/api/info.go#L20
class AgentInfo
attr_reader :agent_settings

def initialize(agent_settings)
@agent_settings = agent_settings
@client = Remote::Transport::HTTP.root(agent_settings: agent_settings)
end

# Fetches the information from the agent.
# @return [Datadog::Core::Remote::Transport::HTTP::Negotiation::Response] the response from the agent
# @return [nil] if an error occurred while fetching the information
def fetch
res = @client.send_info
return unless res.ok?

res
end

def ==(other)
other.is_a?(self.class) && other.agent_settings == agent_settings
end
end
end
end
end
6 changes: 3 additions & 3 deletions lib/datadog/core/remote/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(settings, capabilities, agent_settings)
transport_options = {}
transport_options[:agent_settings] = agent_settings if agent_settings

negotiation = Negotiation.new(settings, agent_settings)
@negotiation = Negotiation.new(settings, agent_settings)
transport_v7 = Datadog::Core::Remote::Transport::HTTP.v7(**transport_options.dup)

@barrier = Barrier.new(settings.remote.boot_timeout_seconds)
Expand All @@ -29,7 +29,7 @@ def initialize(settings, capabilities, agent_settings)
Datadog.logger.debug { "new remote configuration client: #{@client.id}" }

@worker = Worker.new(interval: settings.remote.poll_interval_seconds) do
unless @healthy || negotiation.endpoint?('/v0.7/config')
unless @healthy || @negotiation.endpoint?('/v0.7/config')
@barrier.lift

next
Expand All @@ -47,7 +47,7 @@ def initialize(settings, capabilities, agent_settings)
# In case of unexpected errors, reset the negotiation object
# given external conditions have changed and the negotiation
# negotiation object stores error logging state that should be reset.
negotiation = Negotiation.new(settings, agent_settings)
@negotiation = Negotiation.new(settings, agent_settings)

# Transient errors due to network or agent. Logged the error but not via telemetry
Datadog.logger.error do
Expand Down
2 changes: 2 additions & 0 deletions lib/datadog/core/remote/negotiation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module Core
module Remote
# Endpoint negotiation
class Negotiation
attr_reader :transport_root

def initialize(_settings, agent_settings, suppress_logging: {})
transport_options = {}
transport_options[:agent_settings] = agent_settings if agent_settings
Expand Down
1 change: 1 addition & 0 deletions lib/datadog/core/remote/transport/http/negotiation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def initialize(http_response, options = {})
@version = options[:version]
@endpoints = options[:endpoints]
@config = options[:config]
@span_events = options[:span_events]
end
end

Expand Down
14 changes: 13 additions & 1 deletion lib/datadog/core/remote/transport/negotiation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,19 @@ class Request < Datadog::Core::Transport::Request

# Negotiation response
module Response
attr_reader :version, :endpoints, :config
# @!attribute [r] span_events
# Whether the agent supports the top-level span events field in flushed spans.
# @return [Boolean,nil]
# @!attribute [r] version
# The version of the agent.
# @return [String]
# @!attribute [r] endpoints
# The HTTP endpoints the agent supports.
# @return [Array<String>]
# @!attribute [r] config
# The agent configuration. These are configured by the user when starting the agent, as well as any defaults.
# @return [Hash]
attr_reader :version, :endpoints, :config, :span_events
end

# Negotiation transport
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/transport/serializable_trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class SerializableTrace

# @param trace [Datadog::Trace] the trace to serialize
# @param native_events_supported [Boolean] whether the agent supports span events as a top-level field
def initialize(trace, native_events_supported = false)
def initialize(trace, native_events_supported)
@trace = trace
@native_events_supported = native_events_supported
end
Expand Down
26 changes: 18 additions & 8 deletions lib/datadog/tracing/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class Chunker
#
# @param encoder [Datadog::Core::Encoding::Encoder]
# @param max_size [String] maximum acceptable payload size
def initialize(encoder, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
def initialize(encoder, native_events_supported, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
@encoder = encoder
@native_events_supported = native_events_supported
@max_size = max_size
end

Expand All @@ -77,7 +78,7 @@ def encode_in_chunks(traces)
private

def encode_one(trace)
encoded = Encoder.encode_trace(encoder, trace)
encoded = Encoder.encode_trace(encoder, trace, @native_events_supported)

if encoded.size > max_size
# This single trace is too large, we can't flush it
Expand All @@ -95,17 +96,18 @@ def encode_one(trace)
module Encoder
module_function

def encode_trace(encoder, trace)
def encode_trace(encoder, trace, native_events_supported)
# Format the trace for transport
TraceFormatter.format!(trace)

# Make the trace serializable
serializable_trace = SerializableTrace.new(trace)

Datadog.logger.debug { "Flushing trace: #{JSON.dump(serializable_trace)}" }
serializable_trace = SerializableTrace.new(trace, native_events_supported)

# Encode the trace
encoder.encode(serializable_trace)
encoder.encode(serializable_trace).tap do |encoded|
# Print the actual serialized trace, since the encoder can change make non-trivial changes
Datadog.logger.debug { "Flushing trace: #{encoder.decode(encoded)}" }
end
end
end

Expand All @@ -126,7 +128,7 @@ def initialize(apis, default_api)

def send_traces(traces)
encoder = current_api.encoder
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(encoder)
chunker = Datadog::Tracing::Transport::Traces::Chunker.new(encoder, native_events_supported?)

responses = chunker.encode_in_chunks(traces.lazy).map do |encoded_traces, trace_count|
request = Request.new(EncodedParcel.new(encoded_traces, trace_count))
Expand Down Expand Up @@ -188,6 +190,14 @@ def change_api!(api_id)
@client = HTTP::Client.new(current_api)
end

# Queries the agent for native span events serialization support.
# This changes how the serialization of span events performed.
def native_events_supported?
return @native_events_supported if defined?(@native_events_supported)

@native_events_supported = Datadog.send(:components).agent_info.fetch&.span_events == true
end

# Raised when configured with an unknown API version
class UnknownApiVersionError < StandardError
attr_reader :version
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/core/configuration/components.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ module Datadog

attr_reader remote: Datadog::Core::Remote::Component

attr_reader agent_info: Datadog::Core::Environment::AgentInfo

def initialize: (untyped settings) -> untyped

def startup!: (untyped settings) -> untyped
Expand Down
30 changes: 17 additions & 13 deletions sig/datadog/core/encoding.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,43 @@ module Datadog
# Encoder interface that provides the logic to encode traces and service
# @abstract
module Encoder
def content_type: () -> untyped
def content_type: () -> String

# Concatenates a list of elements previously encoded by +#encode+.
def join: (untyped encoded_elements) -> untyped
def encode: (untyped obj) -> String

# Serializes a single trace into a String suitable for network transmission.
def encode: (untyped _) -> untyped
def join: (Array[untyped] encoded_data) -> String

def decode: (String obj)-> untyped
end

# Encoder for the JSON format
module JSONEncoder
extend Encoder

CONTENT_TYPE: "application/json"
CONTENT_TYPE: String

def self?.content_type: () -> String

def self?.content_type: () -> untyped
def self?.encode: (untyped obj) -> String

def self?.encode: (untyped obj) -> untyped
def self?.join: (Array[untyped] encoded_data) -> String

def self?.join: (untyped encoded_data) -> ::String
def self?.decode: (String obj)-> untyped
end

# Encoder for the Msgpack format
module MsgpackEncoder
extend Encoder

CONTENT_TYPE: "application/msgpack"
CONTENT_TYPE: String

def self?.content_type: () -> String

def self?.content_type: () -> untyped
def self?.encode: (untyped obj) -> String

def self?.encode: (untyped obj) -> untyped
def self?.join: (Array[untyped] encoded_data) -> String

def self?.join: (untyped encoded_data) -> untyped
def self?.decode: (String obj)-> untyped
end
end
end
Expand Down
13 changes: 13 additions & 0 deletions sig/datadog/core/environment/agent_info.rbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module Datadog
module Core
module Environment
class AgentInfo
attr_reader agent_settings: Configuration::AgentSettingsResolver::AgentSettings

def initialize: (Configuration::AgentSettingsResolver::AgentSettings agent_settings) -> void

def fetch: -> Remote::Transport::HTTP::Negotiation::Response?
end
end
end
end
2 changes: 2 additions & 0 deletions sig/datadog/core/remote/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Datadog
class Component
BARRIER_TIMEOUT: Barrier::timeout_s

@negotiation: Negotiation

attr_reader client: Datadog::Core::Remote::Client
attr_reader worker: Datadog::Core::Remote::Worker
attr_reader healthy: bool
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/remote/negotiation.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Datadog
module Core
module Remote
class Negotiation
@transport_root: Datadog::Core::Remote::Transport::Negotiation::Transport
attr_reader transport_root: Datadog::Core::Remote::Transport::Negotiation::Transport
@logged: ::Hash[::Symbol, bool]

def initialize: (Datadog::Core::Configuration::Settings _settings, Datadog::Core::Configuration::AgentSettingsResolver::AgentSettings agent_settings, ?suppress_logging: ::Hash[::Symbol, bool]) -> void
Expand Down
Loading
Loading