Skip to content

Commit

Permalink
fix(collator): message buffer partition 0 clear
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and SmaGMan committed Jan 28, 2025
1 parent b3e73e0 commit ba240db
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 24 deletions.
11 changes: 11 additions & 0 deletions collator/src/collator/messages_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ impl MessagesBuffer {
}
}

pub fn remove_messages_by_accounts(&mut self, addresses_to_remove: &FastHashSet<HashBytes>) {
self.msgs.retain(|k, v| {
if addresses_to_remove.contains(k) {
self.int_count -= v.len();
false
} else {
true
}
});
}

/// Returns queue keys of collected internal queue messages.
pub fn fill_message_group<F>(
&mut self,
Expand Down
82 changes: 58 additions & 24 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result};
use everscale_types::cell::HashBytes;
use everscale_types::models::{MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::FastHashSet;

use self::externals_reader::*;
use self::internals_reader::*;
Expand All @@ -20,7 +22,7 @@ use crate::internal_queue::types::{
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{BlockSeqno, Lt};
use crate::types::DebugIter;
use crate::types::{DebugIter, IntAdrExt, ProcessedTo};

mod externals_reader;
mod internals_reader;
Expand Down Expand Up @@ -309,6 +311,21 @@ impl MessagesReader {
}
}

fn get_min_internals_processed_to_by_shards(&self) -> ProcessedTo {
let mut min_internals_processed_to = ProcessedTo::default();

for par_reader in self.internals_partition_readers.values() {
for (shard_id, key) in &par_reader.reader_state().processed_to {
min_internals_processed_to
.entry(*shard_id)
.and_modify(|min_key| *min_key = std::cmp::min(*min_key, *key))
.or_insert(*key);
}
}

min_internals_processed_to
}

pub fn finalize(
mut self,
current_next_lt: u64,
Expand All @@ -324,7 +341,7 @@ impl MessagesReader {

// collect internals partition readers states
let mut internals_reader_state = InternalsReaderState::default();
for (par_id, mut par_reader) in self.internals_partition_readers {
for (par_id, par_reader) in self.internals_partition_readers.iter_mut() {
// collect aggregated messages stats
for range_reader in par_reader.range_readers().values() {
if range_reader.fully_read && range_reader.reader_state.buffer.msgs_count() == 0 {
Expand All @@ -350,36 +367,21 @@ impl MessagesReader {
.get_last_range_reader()?
.1
.reader_state()
.get_state_by_partition(par_id)?;
.get_state_by_partition(*par_id)?;

if last_int_range_reader.reader_state.skip_offset
== last_ext_range_reader.skip_offset
{
par_reader.drop_processing_offset(true)?;
self.externals_reader.drop_processing_offset(par_id, true)?;
self.externals_reader
.drop_processing_offset(*par_id, true)?;
}
}
}

let par_reader_state = par_reader.finalize(current_next_lt)?;
internals_reader_state
.partitions
.insert(par_id, par_reader_state);
}

// collect externals reader state
let FinalizedExternalsReader {
externals_reader_state,
anchors_cache,
} = self.externals_reader.finalize()?;

let reader_state = ReaderState {
externals: externals_reader_state,
internals: internals_reader_state,
};

// build queue diff
let min_internals_processed_to = reader_state.internals.get_min_processed_to_by_shards();
let min_internals_processed_to = self.get_min_internals_processed_to_by_shards();
let mut queue_diff_with_msgs = self
.new_messages
.into_queue_diff_with_messages(min_internals_processed_to);
Expand All @@ -390,14 +392,43 @@ impl MessagesReader {

// reset queue diff partition router
// according to actual aggregated stats
Self::reset_partition_router_by_stats(
let moved_from_par_0_accounts = Self::reset_partition_router_by_stats(
&self.msgs_exec_params,
&mut queue_diff_with_msgs.partition_router,
aggregated_stats,
self.for_shard_id,
diffs,
)?;

// remove moved accounts from partition 0 buffer
let par_reader = self.internals_partition_readers.get_mut(&0).unwrap();
if let Ok(last_int_range_reader) = par_reader.get_last_range_reader_mut() {
if last_int_range_reader.kind == InternalsRangeReaderKind::NewMessages {
last_int_range_reader
.reader_state
.buffer
.remove_messages_by_accounts(&moved_from_par_0_accounts);
}
}

// collect internals reader state
for (par_id, par_reader) in self.internals_partition_readers {
internals_reader_state
.partitions
.insert(par_id, par_reader.finalize(current_next_lt)?);
}

// collect externals reader state
let FinalizedExternalsReader {
externals_reader_state,
anchors_cache,
} = self.externals_reader.finalize()?;

let reader_state = ReaderState {
externals: externals_reader_state,
internals: internals_reader_state,
};

Ok(FinalizedMessagesReader {
has_unprocessed_messages,
reader_state,
Expand All @@ -412,8 +443,9 @@ impl MessagesReader {
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
) -> Result<()> {
) -> Result<FastHashSet<HashBytes>> {
let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64;
let mut moved_from_par_0_accounts = FastHashSet::default();

for (dest_int_address, msgs_count) in aggregated_stats {
let existing_partition = partition_router.get_partition(None, &dest_int_address);
Expand All @@ -435,6 +467,7 @@ impl MessagesReader {
dest_int_address, msgs_count,
);
partition_router.insert_dst(&dest_int_address, 1)?;
moved_from_par_0_accounts.insert(dest_int_address.get_address());
}
} else {
tracing::trace!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -513,11 +546,12 @@ impl MessagesReader {
dest_int_address, total_msgs,
);
partition_router.insert_dst(&dest_int_address, 1)?;
moved_from_par_0_accounts.insert(dest_int_address.get_address());
}
}
}

Ok(())
Ok(moved_from_par_0_accounts)
}

pub fn last_read_to_anchor_chain_time(&self) -> Option<u64> {
Expand Down

0 comments on commit ba240db

Please sign in to comment.