Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for flushing the event buffer #92

Merged
merged 8 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion devtools/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl Aggregator {
loop {
let should_publish = tokio::select! {
_ = interval.tick() => true,
_ = self.shared.flush.notified() => {
tracing::debug!("event buffer approaching capacity, flushing...");
false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this return true instead so we publish the events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the flush notification is about draining the events of the internal event queue (from the Layer to the Aggregator) not about the buffer inside the aggregator itself, that one is actually fine.

},
cmd = self.cmds.recv() => {
if let Some(Command::Instrument(watcher)) = cmd {
self.attach_watcher(watcher).await;
Expand Down Expand Up @@ -307,7 +311,7 @@ impl<T, const CAP: usize> EventBuf<T, CAP> {
// TODO does it really make sense to track the dropped events here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we still tracking dropped events here? 😂

pub fn push_overwrite(&mut self, item: T) {
if self.inner.push_overwrite(item).is_some() {
self.sent -= 1;
self.sent = self.sent.saturating_sub(1);
}
}

Expand Down
7 changes: 6 additions & 1 deletion devtools/src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::visitors::{EventVisitor, FieldVisitor};
use crate::{Event, Shared};
use crate::{Event, Shared, EVENT_BUFFER_CAPACITY};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -51,6 +51,11 @@ impl Layer {
dropped.fetch_add(1, Ordering::Release);
}
}

let capacity = self.tx.capacity();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure we need to check the capacity and not the len?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, the idea is to catch flush when the remaining capacity in the buffer drops below half. Also len doesn't exist on tokio::sync::mpsc::Sender 😉

if capacity <= EVENT_BUFFER_CAPACITY / 2 {
self.shared.flush.notify_one();
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion devtools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use tauri::Runtime;
use tauri_devtools_wire_format::{instrument, Field};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Notify};

const EVENT_BUFFER_CAPACITY: usize = 512;

pub(crate) type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -73,6 +75,7 @@ pub fn try_init<R: Runtime>() -> Result<tauri::plugin::TauriPlugin<R>> {
pub(crate) struct Shared {
dropped_log_events: AtomicUsize,
dropped_span_events: AtomicUsize,
flush: Notify,
}

/// Data sent from the `Layer` to the `Aggregator`
Expand Down
11 changes: 11 additions & 0 deletions web-client/src/views/dashboard/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ export default function Layout() {
setMonitorData("metadata", (prev) => updateSpanMetadata(prev, update));
}

if (update.logsUpdate && update.spansUpdate) {
console.assert(
update.logsUpdate.droppedEvents == 0n,
"Dropped log events because the internal event buffer was at capacity. This is a bug, please report!"
);
console.assert(
update.spansUpdate.droppedEvents == 0n,
"Dropped span events because the internal event buffer was at capacity. This is a bug, please report!"
);
}

const logsUpdate = update.logsUpdate;
if (logsUpdate && logsUpdate.logEvents.length > 0) {
setMonitorData("logs", (prev) => [...prev, ...logsUpdate.logEvents]);
Expand Down
Loading