Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Add 'otel' feature for OpenTelemetry support #97

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 283 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tracing-subscriber = { version = "0.3.11", default-features = false, features =
"env-filter",
"ansi",
] }

clap = { workspace = true, features = [
"std",
"env",
Expand All @@ -66,6 +67,12 @@ futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

# OTEL
opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] }
opentelemetry-semantic-conventions = { version="0.11.0", optional = true }
tracing-opentelemetry = { version="0.19.0", optional = true }

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"
Expand All @@ -76,6 +83,7 @@ tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
libtest = []
otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"]

[build-dependencies]
anyhow = "1.0"
Expand Down
98 changes: 96 additions & 2 deletions databroker/src/broker.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions databroker/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Matcher {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex_string(glob: &str) -> String {
// Construct regular expression

Expand Down Expand Up @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String {
re
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex(glob: &str) -> Result<Regex, Error> {
let re = to_regex_string(glob);
Regex::new(&re).map_err(|_err| Error::RegexError)
Expand Down Expand Up @@ -160,6 +162,7 @@ lazy_static! {
.expect("regex compilation (of static pattern) should always succeed");
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_valid_pattern(input: &str) -> bool {
REGEX_VALID_PATTERN.is_match(input)
}
Expand Down
3 changes: 3 additions & 0 deletions databroker/src/grpc/kuksa_val_v1/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl From<broker::DataValue> for Option<proto::Datapoint> {
}

impl From<Option<proto::datapoint::Value>> for broker::DataValue {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<Option<proto::datapoint::Value>>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: Option<proto::datapoint::Value>) -> Self {
match from {
Some(value) => match value {
Expand Down Expand Up @@ -316,6 +317,7 @@ impl From<proto::Datapoint> for broker::Datapoint {
}

impl From<broker::EntryUpdate> for proto::DataEntry {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<broker::EntryUpdate>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: broker::EntryUpdate) -> Self {
Self {
path: from.path.unwrap_or_default(),
Expand All @@ -331,6 +333,7 @@ impl From<broker::EntryUpdate> for proto::DataEntry {
metadata: {
let metadata = proto::Metadata {
unit: from.unit,
description: from.description,
..Default::default()
};
Some(metadata)
Expand Down
92 changes: 88 additions & 4 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::collections::HashMap;
use std::collections::HashSet;
use std::iter::FromIterator;
Expand All @@ -24,8 +24,18 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tonic::{Response, Status, Streaming};
use tracing::debug;
use tracing::info;
use tracing::{debug, info};

#[cfg(feature="otel")]
use {
tracing_opentelemetry::OpenTelemetrySpanExt,
tonic::metadata::KeyAndValueRef,
opentelemetry::global,
};





use crate::broker;
use crate::broker::ReadError;
Expand Down Expand Up @@ -255,11 +265,24 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

#[cfg_attr(feature="otel",tracing::instrument(name="val_set",skip(self, request), fields(trace_id, timestamp= chrono::Utc::now().to_string())))]
async fn set(
&self,
request: tonic::Request<proto::SetRequest>,
) -> Result<tonic::Response<proto::SetResponse>, tonic::Status> {
debug!(?request);

#[cfg(feature="otel")]
let request = (||{
let (trace_id, request) = read_incoming_trace_id(request);
let metadata = request.metadata();
let cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&MetadataMapExtractor(&metadata))
});
tracing::Span::current().record("trace_id", &trace_id).set_parent(cx);
request
})();

let permissions = match request.extensions().get::<Permissions>() {
Some(permissions) => {
debug!(?permissions);
Expand Down Expand Up @@ -471,6 +494,7 @@ impl proto::val_server::Val for broker::DataBroker {
>,
>;

#[cfg_attr(feature="otel", tracing::instrument(name="subscribe", skip(self, request), fields(trace_id, timestamp=chrono::Utc::now().to_string())))]
async fn subscribe(
&self,
request: tonic::Request<proto::SubscribeRequest>,
Expand Down Expand Up @@ -661,6 +685,7 @@ async fn validate_entry_update(
Ok((id, update))
}

#[cfg_attr(feature="otel", tracing::instrument(name="val_convert_to_data_entry_error", skip(path, error), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError {
match error {
broker::UpdateError::NotFound => DataEntryError {
Expand Down Expand Up @@ -714,6 +739,7 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "val_convert_to_proto_stream", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
Expand Down Expand Up @@ -955,7 +981,54 @@ fn combine_view_and_fields(
combined
}

// Metadata extractor for gRPC
#[cfg(feature="otel")]
struct MetadataMapExtractor<'a>(&'a tonic::metadata::MetadataMap);

#[cfg(feature="otel")]
impl<'a> opentelemetry::propagation::Extractor for MetadataMapExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|val| val.to_str().ok())
}

/// Collect all the keys from the HeaderMap.
fn keys(&self) -> Vec<&str> {
self.0.iter()
.filter_map(|kv| {
if let KeyAndValueRef::Ascii(key, _) = kv {
Some(key.as_str())
} else {
None
}
})
.collect()
}
}

#[cfg(feature="otel")]
#[cfg_attr(feature="otel", tracing::instrument(name="val_read_incoming_trace_id", skip(request), fields(timestamp=chrono::Utc::now().to_string())))]
fn read_incoming_trace_id(request: tonic::Request<proto::SetRequest>) -> (String, tonic::Request<proto::SetRequest>){
let mut trace_id: String = String::from("");
let request_copy = tonic::Request::new(request.get_ref().clone());
for request in request_copy.into_inner().updates {
match &request.entry {
Some(entry) => match &entry.metadata {
Some(metadata) => match &metadata.description{
Some(description)=> {
trace_id = String::from(description);
}
None => trace_id = String::from("")
}
None => trace_id = String::from("")
}
None => trace_id = String::from("")
}
}
return(trace_id, request);
}

impl broker::EntryUpdate {
#[cfg_attr(feature="otel", tracing::instrument(name = "val_from_proto_entry_and_fields",skip(entry,fields), fields(timestamp=chrono::Utc::now().to_string())))]
fn from_proto_entry_and_fields(
entry: &proto::DataEntry,
fields: HashSet<proto::Field>,
Expand All @@ -976,13 +1049,24 @@ impl broker::EntryUpdate {
} else {
None
};
let metadata_description = if fields.contains(&proto::Field::MetadataDescription) {
lukasmittag marked this conversation as resolved.
Show resolved Hide resolved
match &entry.metadata {
Some(metadata) => match &metadata.description {
Some(description) => Some(description),
None => None,
}
None => None,
}
} else {
None
};
Self {
path: None,
datapoint,
actuator_target,
entry_type: None,
data_type: None,
description: None,
description: metadata_description.cloned(),
lukasmittag marked this conversation as resolved.
Show resolved Hide resolved
allowed: None,
unit: None,
}
Expand Down
36 changes: 36 additions & 0 deletions databroker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub mod permissions;
pub mod query;
pub mod types;
pub mod vss;
pub mod open_telemetry;


#[cfg(feature = "viss")]
pub mod viss;
Expand All @@ -28,6 +30,15 @@ use std::fmt::Write;
use tracing::info;
use tracing_subscriber::filter::EnvFilter;

#[cfg(feature="otel")]
use {
tracing_subscriber::layer::SubscriberExt,
open_telemetry::init_trace,
opentelemetry::global,
opentelemetry::sdk::propagation::TraceContextPropagator,
};

#[cfg(not(feature="otel"))]
pub fn init_logging() {
let mut output = String::from("Init logging from RUST_LOG");
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|err| {
Expand All @@ -42,3 +53,28 @@ pub fn init_logging() {

info!("{}", output);
}

#[cfg(feature="otel")]
pub fn init_logging() {
let output = String::from("Init logging from RUST_LOG");

// Set OpenTelemetry trace propagator
global::set_text_map_propagator(TraceContextPropagator::new());

// Initialize OpenTelemetry tracer
let tracer = init_trace().expect("Failed to initialize tracer");

// telemetry layer
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let subscriber = tracing_subscriber::fmt::Subscriber::builder()
.with_max_level(tracing::Level::INFO) // adjust this log level as needed
.finish()
.with(telemetry); // Add telemetry layer

// Set the subscriber as the global default for tracing
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to install global logging subscriber");

info!("{}", output);
}
26 changes: 26 additions & 0 deletions databroker/src/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#[cfg(feature="otel")]
use {
opentelemetry::{KeyValue, runtime},
opentelemetry::sdk::{Resource, trace},
opentelemetry::trace::TraceError,
opentelemetry_otlp::WithExportConfig,
std::env
};

#[cfg(feature="otel")]
pub fn init_trace() -> Result<trace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(env::var("OTEL_ENDPOINT").unwrap_or_else(|_| "http://localhost:4317".to_string())),
).with_batch_config(trace::BatchConfig::default()) // to change default of max_queue_size use .with_max_queue_size(8192) or set env OTEL_BSP_MAX_QUEUE_SIZE, by default it is set to 2_048
.with_trace_config(
trace::config().with_resource(Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"kuksa-rust-app",
)])),
)
.install_batch(runtime::Tokio)
}
4 changes: 4 additions & 0 deletions databroker/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_actuator_target", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;

Expand All @@ -195,6 +196,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_can_write_datapoint", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;

Expand All @@ -213,6 +215,7 @@ impl Permissions {
Err(PermissionError::Denied)
}

#[cfg_attr(feature="otel", tracing::instrument(name="permissions_expired", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
#[inline]
pub fn expired(&self) -> Result<(), PermissionError> {
if let Some(expires_at) = self.expires_at {
Expand All @@ -225,6 +228,7 @@ impl Permissions {
}

impl PathMatcher {
#[cfg_attr(feature="otel", tracing::instrument(name="permissions_is_match", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_match(&self, path: &str) -> bool {
match self {
PathMatcher::Nothing => false,
Expand Down
5 changes: 5 additions & 0 deletions databroker/src/query/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub trait ExecutionInput {
}

impl CompiledQuery {
#[cfg_attr(feature="otel", tracing::instrument(name="executor_execute_internal", skip(query, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn execute_internal(
query: &CompiledQuery,
input: &impl ExecutionInput,
Expand Down Expand Up @@ -157,6 +158,8 @@ impl CompiledQuery {
Ok(None)
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="executor_execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn execute(
&self,
input: &impl ExecutionInput,
Expand All @@ -166,6 +169,7 @@ impl CompiledQuery {
}

impl Expr {
#[cfg_attr(feature="otel", tracing::instrument(name="execute", skip(self, input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn execute(&self, input: &impl ExecutionInput) -> Result<DataValue, ExecutionError> {
match &self {
Expr::Datapoint {
Expand Down Expand Up @@ -396,6 +400,7 @@ impl ExecutionInput for ExecutionInputImpl {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="executor_get_fields", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
fn get_fields(&self) -> &HashMap<String, ExecutionInputImplData> {
&self.fields
}
Expand Down