Skip to content

Commit

Permalink
feat(init) BREAKING: separate start from init
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Pianka <[email protected]>
  • Loading branch information
seanpianka committed Jul 3, 2024
1 parent f5cdba5 commit 26f685e
Show file tree
Hide file tree
Showing 18 changed files with 134 additions and 38 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ async fn main() {
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();

// Start the workers and spawn the background async tasks on the current executor.
discord_worker.start();
slack_worker.start();

network_io(123).await;

// Shutdown the workers and ensure their message cache is flushed.
slack_worker.shutdown().await;
discord_worker.shutdown().await;
}
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tracing-layer-core"
version = "0.2.1"
version = "0.3.0"
edition = "2018"
license = "Apache-2.0"
description = "Send filtered tracing events to a webhook endpoint"
Expand Down Expand Up @@ -29,7 +29,7 @@ lambda-extension = { version = "0.10", optional = true}

debug_print = "1"
regex = "1"
reqwest = { version = "0.12.3", default-features = false, features = ["http2", "charset"] }
reqwest = { version = "0.12", default-features = false, features = ["http2", "charset"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", default-features = false, features = ["test-util", "sync", "macros", "rt-multi-thread"] }
Expand Down
36 changes: 20 additions & 16 deletions core/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::HashMap;
use std::error::Error;
use std::future::Future;
use std::process::Output;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -40,7 +43,7 @@ pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
/// - Negative: Exclude the event if its key does NOT MATCH a given regex.
event_by_field_filters: Option<EventFilters>,

/// Filter fields of events from being sent to Discord.
/// Filter fields of events from being sent to the webhook.
///
/// Filter type semantics:
/// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
Expand All @@ -62,13 +65,13 @@ pub struct WebhookLayer<C: Config, F: WebhookMessageFactory> {
}

impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
/// Create a new layer for forwarding messages to Discord, using a specified
/// configuration. This method spawns a task onto the tokio runtime to begin sending tracing
/// events to Discord.
/// Create a new layer for forwarding messages to the webhook, using a specified
/// configuration. The background worker must be started in order to spawn spawns
/// a task onto the tokio runtime to begin sending tracing events to the webhook.
///
/// Returns the tracing_subscriber::Layer impl to add to a registry, an unbounded-mpsc sender
/// used to shutdown the background worker, and a future to spawn as a task on a tokio runtime
/// to initialize the worker's processing and sending of HTTP requests to the Discord API.
/// to initialize the worker's processing and sending of HTTP requests to the webhook.
pub(crate) fn new(
app_name: String,
target_filters: EventFilters,
Expand All @@ -90,25 +93,26 @@ impl<C: Config, F: WebhookMessageFactory> WebhookLayer<C, F> {
factory: Default::default(),
sender: tx.clone(),
};
let worker = BackgroundWorker {
let background_worker = BackgroundWorker {
sender: tx,
handle: Arc::new(Mutex::new(Some(tokio::spawn(worker(rx))))),
handle: Arc::new(Mutex::new(None)),
rx: Arc::new(Mutex::new(rx)),
};
(layer, worker)
(layer, background_worker)
}

/// Create a new builder for DiscordLayer.
/// Create a new builder for the webhook layer.
pub fn builder(app_name: String, target_filters: EventFilters) -> WebhookLayerBuilder<C, F> {
WebhookLayerBuilder::new(app_name, target_filters)
}
}

/// A builder for creating a webhook layer.
///
/// The layer requires a regex for selecting events to be sent to Discord by their target. Specifying
/// The layer requires a regex for selecting events to be sent to webhook by their target. Specifying
/// no filter (e.g. ".*") will cause an explosion in the number of messages observed by the layer.
///
/// Several methods expose initialization of optional filtering mechanisms, along with Discord
/// Several methods expose initialization of optional filtering mechanisms, along with webhook
/// configuration that defaults to searching in the local environment variables.
pub struct WebhookLayerBuilder<C: Config, F: WebhookMessageFactory> {
factory: std::marker::PhantomData<F>,
Expand Down Expand Up @@ -155,7 +159,7 @@ impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
self
}

/// Filter fields of events from being sent to Discord.
/// Filter fields of events from being sent to the webhook.
///
/// Filter type semantics:
/// - Positive: Exclude event fields if the field's key MATCHES any provided regular expressions.
Expand All @@ -164,19 +168,19 @@ impl<C: Config, F: WebhookMessageFactory> WebhookLayerBuilder<C, F> {
self
}

/// Configure the layer's connection to the Discord Webhook API.
/// Configure the layer's connection to the webhook.
pub fn config(mut self, config: C) -> Self {
self.config = Some(config);
self
}

/// Configure which levels of events to send to Discord.
/// Configure which levels of events to send to the webhook.
pub fn level_filters(mut self, level_filters: String) -> Self {
self.level_filters = Some(level_filters);
self
}

/// Create a DiscordLayer and its corresponding background worker to (async) send the messages.
/// Create a webhook layer and its corresponding background worker to (async) send the messages.
pub fn build(self) -> (WebhookLayer<C, F>, BackgroundWorker) {
WebhookLayer::new(
self.app_name,
Expand Down Expand Up @@ -287,7 +291,7 @@ where
let result: Result<_, FilterError> = format();
if let Ok(formatted) = result {
if let Err(e) = self.sender.send(WorkerMessage::Data(Box::new(formatted))) {
println!("failed to send discord payload to given channel, err = {}", e)
println!("failed to send webhook payload to given channel, err = {}", e)
};
}
}
Expand Down
49 changes: 39 additions & 10 deletions core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,55 @@ use crate::{ChannelReceiver, ChannelSender, WebhookMessage};
const MAX_RETRIES: usize = 10;

/// This worker manages a background async task that schedules the network requests to send traces
/// to the Discord on the running tokio runtime.
/// to the webhook on the running tokio runtime.
///
/// Ensure to invoke `.startup()` before, and `.teardown()` after, your application code runs. This
/// Ensure to invoke `.start()` before, and `.teardown()` after, your application code runs. This
/// is required to ensure proper initialization and shutdown.
///
/// `tracing-layer-discord` synchronously generates payloads to send to the Discord API using the
/// `tracing-layer-core` synchronously generates payloads to send to the webhook using the
/// tracing events from the global subscriber. However, all network requests are offloaded onto
/// an unbuffered channel and processed by a provided future acting as an asynchronous worker.
#[derive(Clone)]
pub struct BackgroundWorker {
/// The sender used to send messages to the worker task.
///
/// This sender is used to send `WorkerMessage` instances to the worker for processing.
pub(crate) sender: ChannelSender,

/// A handle to the spawned worker task.
///
/// This handle is used to await the completion of the worker task when shutting down.
/// The handle is stored in a `tokio::sync::Mutex` to ensure safe access across asynchronous contexts.
pub(crate) handle: Arc<Mutex<Option<JoinHandle<()>>>>,

/// The receiver for messages to be processed by the worker task.
///
/// This receiver is wrapped in an `Arc<Mutex<>>` to allow shared mutable access
/// between the `start` function and the worker task.
pub(crate) rx: Arc<Mutex<ChannelReceiver>>,
}

impl BackgroundWorker {
/// Initiate the worker's shutdown sequence.
/// Starts the background worker.
///
/// This function should only be called once. Attempting to call `start` more than once
/// will lead to a deadlock, as the function internally locks the receiver mutex and
/// spawns a task to process messages.
pub async fn start(&self) {
let rx = self.rx.clone();
let future = async move {
let mut rx = rx.lock().await;
worker(&mut *rx).await;
};
let handle = tokio::spawn(future);
let mut guard = self.handle.lock().await;
*guard = Some(handle);
}

/// Initiates the shutdown of the background worker.
///
/// Without invoking`.teardown()`, your application may exit before all Discord messages can be
/// sent.
/// Sends a shutdown message to the worker and waits for the worker task to complete.
/// If the worker task handle has already been dropped, an error message will be printed.
pub async fn shutdown(self) {
match self.sender.send(WorkerMessage::Shutdown) {
Ok(..) => {
Expand All @@ -56,16 +86,15 @@ pub enum WorkerMessage {
Shutdown,
}

/// Provides a background worker task that sends the messages generated by the
/// layer.
pub(crate) async fn worker(mut rx: ChannelReceiver) {
/// Provides a background worker task that sends the messages generated by the layer.
pub(crate) async fn worker(rx: &mut ChannelReceiver) {
let client = reqwest::Client::new();
while let Some(message) = rx.recv().await {
match message {
WorkerMessage::Data(payload) => {
let webhook_url = payload.webhook_url();
let payload_json = payload.serialize();
println!("sending discord message: {}", &payload_json);
println!("sending webhook message: {}", &payload_json);

let mut retries = 0;
while retries < MAX_RETRIES {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(discord_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(discord_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(discord_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(discord_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
14 changes: 10 additions & 4 deletions examples/discord/examples/discord_simple.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tracing::{info, instrument, warn};
use tracing::{info, instrument, warn, warn_span};
use tracing_subscriber::{layer::SubscriberExt, Registry};

use tracing_layer_discord::DiscordLayer;
Expand All @@ -9,11 +9,12 @@ pub async fn create_user(id: u64) {
info!(param = id, "A user was created");
}

#[instrument]
#[instrument(fields(electric_utilityaccount_id))]
pub async fn app_users_webhook(id: u64) {
tracing::Span::current().record("electric_utilityaccount_id", id);
warn!(
met = r#"
Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc, quis gravida magna mi a libero. Fusce vulputate eleifend sapien. Vestibulum purus quam, scelerisque ut, mollis sed, nonummy id, metus. Nullam accumsan lorem in dui. Cras ultricies mi eu turpis hendrerit fringilla. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; In ac dui quis mi consectetuer lacinia. Nam pretium turpis et arcu. Duis arcu tortor, suscipit eget, imperdiet nec, imperdiet iaculis, ipsum. Sed aliquam ultrices mauris. Integer ante arcu, accumsan a, consectetuer eget, posuere ut, mauris. Praesent adipiscing. Phasellus ullamcorper ipsum rutrum nunc. Nunc nonummy metus. Vestibulum volutpat pretium libero. Cras id dui. Aenea
John Baker
"#,
r#"error parsing user event by webhook handler: failed to parse event metadata: none found"#
);
Expand All @@ -28,9 +29,14 @@ pub async fn controller() {

#[tokio::main]
async fn main() {
let formatting_layer = tracing_bunyan_formatter::BunyanFormattingLayer::new("tracing_demo".into(), std::io::stdout);
let (discord_layer, background_worker) = DiscordLayer::builder("test-app".to_string(), Default::default()).build();
let subscriber = Registry::default().with(discord_layer);
let subscriber = Registry::default()
.with(discord_layer)
.with(tracing_bunyan_formatter::JsonStorageLayer)
.with(formatting_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
controller().await;
background_worker.shutdown().await;
}
44 changes: 44 additions & 0 deletions examples/discord/examples/discord_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use tracing::{info, instrument, warn, warn_span};
use tracing_subscriber::{layer::SubscriberExt, Registry};

use tracing_layer_discord::DiscordLayer;

#[instrument]
pub async fn create_user(id: u64) {
app_users_webhook(id).await;
info!(param = id, "A user was created");
}

#[instrument(fields(electric_utilityaccount_id))]
pub async fn app_users_webhook(id: u64) {
tracing::Span::current().record("electric_utilityaccount_id", id);
warn!(
met = r#"
John Baker
"#,
r#"error parsing user event by webhook handler: failed to parse event metadata: none found"#
);
}

#[instrument]
pub async fn controller() {
info!("Orphan event without a parent span");
app_users_webhook(2).await;
// tokio::join!(create_user(2), create_user(4), create_user(6));
}

fn main() {
let formatting_layer = tracing_bunyan_formatter::BunyanFormattingLayer::new("tracing_demo".into(), std::io::stdout);
let (discord_layer, background_worker) = DiscordLayer::builder("test-app".to_string(), Default::default()).build();
let subscriber = Registry::default()
.with(discord_layer)
.with(tracing_bunyan_formatter::JsonStorageLayer)
.with(formatting_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();

tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async move {
background_worker.start().await;
controller().await;
background_worker.shutdown().await;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
1 change: 1 addition & 0 deletions examples/slack/examples/slack_filter_records_by_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async fn main() {
.build();
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();
background_worker.start().await;
handler().await;
background_worker.shutdown().await;
}
2 changes: 1 addition & 1 deletion examples/slack/examples/slack_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() {
let (slack_layer, background_worker) = SlackLayer::builder("test-app".to_string(), target_to_filter).build();
let subscriber = Registry::default().with(slack_layer);
tracing::subscriber::set_global_default(subscriber).unwrap();

background_worker.start().await;
controller().await;
background_worker.shutdown().await;
}
4 changes: 2 additions & 2 deletions layers/discord/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tracing-layer-discord"
version = "0.2.2"
version = "0.3.0"
edition = "2018"
license = "Apache-2.0"
description = "Send filtered tracing events to Discord"
Expand All @@ -23,7 +23,7 @@ native-tls = [ "tracing-layer-core/native-tls" ]
rustls = [ "tracing-layer-core/rustls" ]

[dependencies]
tracing-layer-core = { path = "../../core", version = "0.2.0" }
tracing-layer-core = { path = "../../core", version = "0.3.0" }

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
Loading

0 comments on commit 26f685e

Please sign in to comment.