Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel feature with tracing functionality only #119

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 292 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

# OTEL
opentelemetry = { version = "0.19.0", optional = true, features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version="0.12.0", optional = true, features = ["tonic", "metrics"] }
opentelemetry-semantic-conventions = { version="0.11.0", optional = true }
tracing-opentelemetry = { version="0.19.0", optional = true }

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"
Expand All @@ -78,6 +84,7 @@ tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []
otel = ["dep:chrono", "dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry-semantic-conventions", "dep:tracing-opentelemetry"]

[build-dependencies]
anyhow = "1.0"
Expand Down
37 changes: 37 additions & 0 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub struct EntryUpdate {
}

impl Entry {
#[cfg_attr(feature="otel",tracing::instrument(name="broker_diff", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn diff(&self, mut update: EntryUpdate) -> EntryUpdate {
if let Some(datapoint) = &update.datapoint {
if self.metadata.change_type != ChangeType::Continuous {
Expand All @@ -259,6 +260,7 @@ impl Entry {
Ok(())
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate", skip(self, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn validate(&self, update: &EntryUpdate) -> Result<(), UpdateError> {
if let Some(datapoint) = &update.datapoint {
self.validate_value(&datapoint.value)?;
Expand All @@ -280,6 +282,7 @@ impl Entry {
* DataType is VSS type, where we have also smaller type based on 8/16 bits
* That we do not have for DataValue
*/
#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed_type", skip(self, allowed), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn validate_allowed_type(&self, allowed: &Option<DataValue>) -> Result<(), UpdateError> {
if let Some(allowed_values) = allowed {
match (allowed_values, &self.metadata.data_type) {
Expand Down Expand Up @@ -319,6 +322,7 @@ impl Entry {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_allowed", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))]
fn validate_allowed(&self, value: &DataValue) -> Result<(), UpdateError> {
// check if allowed value
if let Some(allowed_values) = &self.metadata.allowed {
Expand Down Expand Up @@ -471,6 +475,7 @@ impl Entry {
Ok(())
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_validate_value", skip(self, value), fields(timestamp=chrono::Utc::now().to_string())))]
fn validate_value(&self, value: &DataValue) -> Result<(), UpdateError> {
// Not available is always valid
if value == &DataValue::NotAvailable {
Expand Down Expand Up @@ -706,6 +711,7 @@ impl Entry {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="apply_lag_after_execute", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add broker_ prefix

Copy link
Author

@LikhithST LikhithST Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback,

Do I have to follow this pattern name="class_name_+method_name" strictly, because previously I have used this pattern only if more than one class have same method name.

Example:
for the method validate of class Entry, name="broker_validate", I have followed name=file_name+method_name.

Whereas for method notify of class ChangeSubscription, name=broker_change_subscription_notify here I have followed name=file_name+class_name+method_name

similarly for method notify of class QuerySubscription, name="broker_query_subscription_notify"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best approach from my point of view is to use the pattern name="class_name_+method_name".

That way we can keep it short and concise so we can identify to which class belongs the method.

Also, find it too long the pattern:
name=file_name+class_name+method_name

pub fn apply_lag_after_execute(&mut self) {
self.lag_datapoint = self.datapoint.clone();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add otel to the apply method https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/databroker/src/broker.rs#L713
It would be also revelant to know how long does it take to initialize the hashset in the heap.

Expand Down Expand Up @@ -749,10 +755,15 @@ impl Subscriptions {
self.query_subscriptions.push(subscription)
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_add_change_subscription",skip(self, subscription), fields(timestamp=chrono::Utc::now().to_string())))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove blank space between name and its value.

pub fn add_change_subscription(&mut self, subscription: ChangeSubscription) {
self.change_subscriptions.push(subscription)
}

#[cfg_attr(
feature = "otel",
tracing::instrument(name = "broker_Subscriptions_notify", skip(self, changed, db))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use lower case pattern and remove blank space between name and it value.

)]
pub async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -799,6 +810,7 @@ impl Subscriptions {
self.change_subscriptions.clear();
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_cleanup", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn cleanup(&mut self) {
self.query_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
Expand Down Expand Up @@ -835,6 +847,10 @@ impl Subscriptions {
}

impl ChangeSubscription {
#[cfg_attr(
feature = "otel",
tracing::instrument(name = "broker_ChangeSubscription_notify", skip(self, changed, db))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use lower case pattern and remove blank space between name and its value.

)]
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -959,6 +975,7 @@ impl ChangeSubscription {
}

impl QuerySubscription {
#[cfg_attr(feature="otel", tracing::instrument(name="broker_find_in_db_and_add", skip(self, name, db, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn find_in_db_and_add(
&self,
name: &String,
Expand Down Expand Up @@ -987,6 +1004,7 @@ impl QuerySubscription {
}
}
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_check_if_changes_match", skip(query, changed_origin, db), fields(timestamp=chrono::Utc::now().to_string())))]
fn check_if_changes_match(
query: &CompiledQuery,
changed_origin: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -1022,6 +1040,7 @@ impl QuerySubscription {
}
false
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input_list", skip(self, query, db, input), fields(timestamp=chrono::Utc::now().to_string())))]
fn generate_input_list(
&self,
query: &CompiledQuery,
Expand All @@ -1037,6 +1056,7 @@ impl QuerySubscription {
}
}
}
#[cfg_attr(feature="otel", tracing::instrument(name="broker_generate_input", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))]
fn generate_input(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand All @@ -1053,6 +1073,7 @@ impl QuerySubscription {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_query_subscription_notify", skip(self, changed, db), fields(timestamp=chrono::Utc::now().to_string())))]
async fn notify(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
Expand Down Expand Up @@ -1112,6 +1133,7 @@ pub enum EntryReadAccess<'a> {
}

impl EntryReadAccess<'_> {
#[cfg_attr(feature="otel", tracing::instrument(name="broker_datapoint", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn datapoint(&self) -> Result<&Datapoint, ReadError> {
match self {
Self::Entry(entry) => Ok(&entry.datapoint),
Expand All @@ -1126,6 +1148,7 @@ impl EntryReadAccess<'_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_metadata", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn metadata(&self) -> &Metadata {
match self {
Self::Entry(entry) => &entry.metadata,
Expand Down Expand Up @@ -1168,6 +1191,7 @@ impl<'a> Iterator for EntryReadIterator<'a, '_> {
}

impl DatabaseReadAccess<'_, '_> {
#[cfg_attr(feature="otel", tracing::instrument(name="get_entry_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_entry_by_id(&self, id: i32) -> Result<&Entry, ReadError> {
match self.db.entries.get(&id) {
Some(entry) => match self.permissions.can_read(&entry.metadata.path) {
Expand All @@ -1186,15 +1210,18 @@ impl DatabaseReadAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_id", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_metadata_by_id(&self, id: i32) -> Option<&Metadata> {
self.db.entries.get(&id).map(|entry| &entry.metadata)
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_get_metadata_by_path", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn get_metadata_by_path(&self, path: &str) -> Option<&Metadata> {
let id = self.db.path_to_id.get(path)?;
self.get_metadata_by_id(*id)
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_iter_entries", skip(self), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn iter_entries(&self) -> EntryReadIterator {
EntryReadIterator {
inner: self.db.entries.values(),
Expand All @@ -1215,6 +1242,7 @@ impl DatabaseWriteAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_update_entry_lag_to_be_equal", skip(self, path), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> {
match self.db.path_to_id.get(path) {
Some(id) => match self.db.entries.get_mut(id) {
Expand All @@ -1228,6 +1256,7 @@ impl DatabaseWriteAccess<'_, '_> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_update", skip(self, id, update), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> {
match self.db.entries.get_mut(&id) {
Some(entry) => {
Expand Down Expand Up @@ -1376,6 +1405,7 @@ impl Database {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_read_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_read_access<'a, 'b>(
&'a self,
permissions: &'b Permissions,
Expand All @@ -1386,6 +1416,7 @@ impl Database {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_authorized_write_access", skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_write_access<'a, 'b>(
&'a mut self,
permissions: &'b Permissions,
Expand Down Expand Up @@ -1453,6 +1484,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.authorized_read_access(self.permissions))
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_get_id_by_path", skip(self, name) fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn get_id_by_path(&self, name: &str) -> Option<i32> {
self.broker
.database
Expand Down Expand Up @@ -1483,6 +1515,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.map(|entry| entry.datapoint.clone())
}

#[cfg_attr(feature="otel", tracing::instrument(name="get_metadata", skip(self, id), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn get_metadata(&self, id: i32) -> Option<Metadata> {
self.broker
.database
Expand Down Expand Up @@ -1523,6 +1556,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.cloned()
}

#[cfg_attr(feature="otel", tracing::instrument(name="broker_for_each_entry", skip(self, f), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn for_each_entry(&self, f: impl FnMut(EntryReadAccess)) {
self.broker
.database
Expand Down Expand Up @@ -1558,6 +1592,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.collect()
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_update_entries",skip(self, updates), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn update_entries(
&self,
updates: impl IntoIterator<Item = (i32, EntryUpdate)>,
Expand Down Expand Up @@ -1629,6 +1664,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_subscribe", skip(self, valid_entries), fields(timestamp=chrono::Utc::now().to_string())))]
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
Expand Down Expand Up @@ -2023,6 +2059,7 @@ impl DataBroker {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "broker_authorized_access",skip(self, permissions), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn authorized_access<'a, 'b>(
&'a self,
permissions: &'b Permissions,
Expand Down
3 changes: 3 additions & 0 deletions databroker/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Matcher {
}
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex_string", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex_string(glob: &str) -> String {
// Construct regular expression

Expand Down Expand Up @@ -121,6 +122,7 @@ pub fn to_regex_string(glob: &str) -> String {
re
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_to_regex", skip(glob), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn to_regex(glob: &str) -> Result<Regex, Error> {
let re = to_regex_string(glob);
Regex::new(&re).map_err(|_err| Error::RegexError)
Expand Down Expand Up @@ -160,6 +162,7 @@ lazy_static! {
.expect("regex compilation (of static pattern) should always succeed");
}

#[cfg_attr(feature="otel", tracing::instrument(name="glob_is_valid_pattern", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
pub fn is_valid_pattern(input: &str) -> bool {
REGEX_VALID_PATTERN.is_match(input)
}
Expand Down
2 changes: 2 additions & 0 deletions databroker/src/grpc/kuksa_val_v1/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl From<broker::DataValue> for Option<proto::Datapoint> {
}

impl From<Option<proto::datapoint::Value>> for broker::DataValue {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<Option<proto::datapoint::Value>>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: Option<proto::datapoint::Value>) -> Self {
match from {
Some(value) => match value {
Expand Down Expand Up @@ -316,6 +317,7 @@ impl From<proto::Datapoint> for broker::Datapoint {
}

impl From<broker::EntryUpdate> for proto::DataEntry {
#[cfg_attr(feature="otel", tracing::instrument(name="conversion_From<broker::EntryUpdate>", skip(from), fields(timestamp=chrono::Utc::now().to_string())))]
fn from(from: broker::EntryUpdate) -> Self {
Self {
path: from.path.unwrap_or_default(),
Expand Down
37 changes: 37 additions & 0 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,20 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

#[cfg_attr(feature="otel",tracing::instrument(name="val_set",skip(self, request), fields(trace_id, timestamp= chrono::Utc::now().to_string())))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add kuksa_val_v1 as prefix to the name so it is clear to which val file belongs? Kuksa.val.v2 also contain val.rs file

async fn set(
&self,
request: tonic::Request<proto::SetRequest>,
) -> Result<tonic::Response<proto::SetResponse>, tonic::Status> {
debug!(?request);

#[cfg(feature = "otel")]
let request = (|| {
let (trace_id, request) = read_incoming_trace_id(request);
tracing::Span::current().record("trace_id", &trace_id);
request
})();

let permissions = match request.extensions().get::<Permissions>() {
Some(permissions) => {
debug!(?permissions);
Expand Down Expand Up @@ -472,6 +481,7 @@ impl proto::val_server::Val for broker::DataBroker {
>,
>;

#[cfg_attr(feature="otel", tracing::instrument(name="subscribe", skip(self, request), fields(trace_id, timestamp=chrono::Utc::now().to_string())))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you get trace_id here from?
Please also add kuksa_val_v1_val_ as prefix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trace_id is not being used here, I will remove it.

async fn subscribe(
&self,
request: tonic::Request<proto::SubscribeRequest>,
Expand Down Expand Up @@ -666,6 +676,7 @@ async fn validate_entry_update(
Ok((id, update))
}

#[cfg_attr(feature="otel", tracing::instrument(name="val_convert_to_data_entry_error", skip(path, error), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError {
match error {
broker::UpdateError::NotFound => DataEntryError {
Expand Down Expand Up @@ -735,6 +746,7 @@ fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> Da
}
}

#[cfg_attr(feature="otel", tracing::instrument(name = "val_convert_to_proto_stream", skip(input), fields(timestamp=chrono::Utc::now().to_string())))]
fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
Expand Down Expand Up @@ -1051,7 +1063,32 @@ fn combine_view_and_fields(
combined
}

#[cfg(feature = "otel")]
#[cfg_attr(feature="otel", tracing::instrument(name="val_read_incoming_trace_id", skip(request), fields(timestamp=chrono::Utc::now().to_string())))]
fn read_incoming_trace_id(
request: tonic::Request<proto::SetRequest>,
) -> (String, tonic::Request<proto::SetRequest>) {
let mut trace_id: String = String::from("");
let request_copy = tonic::Request::new(request.get_ref().clone());
for request in request_copy.into_inner().updates {
match &request.entry {
Some(entry) => match &entry.metadata {
Some(metadata) => match &metadata.description {
Some(description) => {
trace_id = String::from(description);
Copy link
Contributor

@rafaeling rafaeling Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it used here description as trace_id instead of path?

Copy link
Author

@LikhithST LikhithST Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for using description to handle trace_id, could comment in proto file be used for this purpose?

trace_id is introduced as an effort to identify the request with its corresponding otel tracing result. could I use comment that is a member of message Metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about cloning the path from the entry request?

}
None => trace_id = String::from(""),
},
None => trace_id = String::from(""),
},
None => trace_id = String::from(""),
}
}
return (trace_id, request);
}

impl broker::EntryUpdate {
#[cfg_attr(feature="otel", tracing::instrument(name = "val_from_proto_entry_and_fields",skip(entry,fields), fields(timestamp=chrono::Utc::now().to_string())))]
fn from_proto_entry_and_fields(
entry: &proto::DataEntry,
fields: HashSet<proto::Field>,
Expand Down
Loading
Loading