Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for flushing the event buffer #92

Merged
merged 8 commits into from
Nov 21, 2023
Merged

Conversation

CrabNejonas
Copy link
Contributor

@CrabNejonas CrabNejonas commented Nov 20, 2023

This adds a way for the Layer to wake up the Aggregator early when there is high pressure on the internal even queue. This helps to reduce the number of missed dropped events (in my testing it actually reduces the number of dropped events under heavy load to 0 which is awesome)

This also adds a console debug assertion for when we dropped some events (this only prints to the console but should be good enough for now)

resolves DR-591

Copy link

linear bot commented Nov 20, 2023

DR-591 fix: even buffer flushing and backpressue

Right now the event buffers will be emptied every 200ms. Events aren't uniformly distributed across time obviously though, instead often coming in bursts. Right now we miss a lot of data because of that.

We should introduce some way for the aggregator to flush its buffers when they get too full.

Copy link
Contributor

@tejas-crabnebula tejas-crabnebula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeScript side looks good!

@@ -310,7 +314,7 @@ impl<T, const CAP: usize> EventBuf<T, CAP> {
// TODO does it really make sense to track the dropped events here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we still tracking dropped events here? 😂

@@ -51,6 +51,11 @@ impl Layer {
dropped.fetch_add(1, Ordering::Release);
}
}

let capacity = self.tx.capacity();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure we need to check the capacity and not the len?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, the idea is to catch flush when the remaining capacity in the buffer drops below half. Also len doesn't exist on tokio::sync::mpsc::Sender 😉

@@ -83,6 +83,10 @@ impl Aggregator {
loop {
let should_publish = tokio::select! {
_ = interval.tick() => true,
_ = self.shared.flush.notified() => {
tracing::debug!("event buffer approaching capacity, flushing...");
false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this return true instead so we publish the events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the flush notification is about draining the events of the internal event queue (from the Layer to the Aggregator) not about the buffer inside the aggregator itself, that one is actually fine.

Copy link
Collaborator

@atila-crabnebula atila-crabnebula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a follow-up ticket to push dropped events and give user a decent feedback

DR-625: Report Dropped Events

@CrabNejonas CrabNejonas merged commit db5bf8a into main Nov 21, 2023
11 checks passed
@CrabNejonas CrabNejonas deleted the jonas/flush branch December 5, 2023 11:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants