Skip to content

Commit

Permalink
fix(collator): int queue routing fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and SmaGMan committed Jan 15, 2025
1 parent 54db32d commit 7f46b38
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 110 deletions.
39 changes: 20 additions & 19 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,35 +69,36 @@ impl Phase<FinalizeState> {
.cloned()
.unwrap_or_default();

let diffs = if self.state.collation_data.block_id_short.is_masterchain() {
let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() {
Some(
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.collect(),
)
} else {
None
};

// load diffs by top shard blocks
mq_adapter.get_diffs(top_shard_blocks.clone().unwrap_or_default())
// getting top shard blocks
let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() {
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.collect()
} else {
let blocks = self
let mut top_blocks: FastHashMap<ShardIdent, u32> = self
.state
.mc_data
.shards
.iter()
.filter(|(_, d)| d.top_sc_block_updated)
.filter(|(shard, descr)| {
descr.top_sc_block_updated && shard != &self.state.shard_id
})
.map(|(shard_ident, descr)| (*shard_ident, descr.seqno))
.collect();

mq_adapter.get_diffs(blocks)
top_blocks.insert(
self.state.mc_data.block_id.shard,
self.state.mc_data.block_id.seqno,
);

top_blocks
};

let diffs = mq_adapter.get_diffs(top_shard_blocks);

// get queue diff and check for pending internals
let create_queue_diff_elapsed;
let FinalizedMessagesReader {
Expand Down
80 changes: 42 additions & 38 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl MessagesReader {
aggregated_stats,
self.for_shard_id,
diffs,
);
)?;

Ok(FinalizedMessagesReader {
has_unprocessed_messages,
Expand All @@ -368,62 +368,66 @@ impl MessagesReader {
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
) {
) -> Result<()> {
// TODO: msgs-v3: store limit in msgs_exec_params
const MAX_PAR_0_MSGS_COUNT_LIMIT: u64 = 100_000;

for (int_address, msgs_count) in aggregated_stats {
let int_address_bytes = int_address.as_std().unwrap().address;
let acc_for_current_shard = for_shard_id.contains_account(&int_address_bytes);

let existing_partition = partition_router.get_partition(None, &int_address);
for (dest_int_address, msgs_count) in aggregated_stats {
let existing_partition = partition_router.get_partition(None, &dest_int_address);
if existing_partition != 0 {
continue;
}

if acc_for_current_shard {
if for_shard_id.contains_address(&dest_int_address) {
// if we have account for current shard then check if we need to move it to partition 1
// if we have less than MAX_PAR_0_MSGS_COUNT_LIMIT messages then keep it in partition 0
if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
partition_router.insert(RouterDirection::Dest, dest_int_address, 1)?;
}
} else {
// if we have account for another shard then take info from that shard
let acc_shard_diff_info = top_block_diffs
.iter()
.find(|(shard_id, _)| shard_id.contains_account(&int_address_bytes))
.find(|(shard_id, _)| shard_id.contains_address(&dest_int_address))
.map(|(_, diff)| diff);

if let Some(diff) = acc_shard_diff_info {
// if we found low priority partition in remote diff then copy it
let remote_shard_partition = diff.router.get_partition(None, &int_address);
if remote_shard_partition != 0 {
partition_router
.insert(RouterDirection::Dest, int_address, remote_shard_partition)
.unwrap();
continue;
// try to get remote partition from diff
let total_msgs = match acc_shard_diff_info {
// if we do not have diff then use aggregated stats
None => msgs_count,
Some(diff) => {
// getting remote shard partition from diff
let remote_shard_partition = diff
.router()
.get_partition(Default::default(), &dest_int_address);

if remote_shard_partition != 0 {
partition_router.insert(
RouterDirection::Dest,
dest_int_address,
remote_shard_partition,
)?;
continue;
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics().partition(0) {
None => 0,
Some(partition) => {
partition.get(&dest_int_address).copied().unwrap_or(0)
}
};

msgs_count + remote_msgs_count
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics.partition(0) {
None => 0,
Some(partition) => partition.get(&int_address).copied().unwrap_or(0),
};

let total = msgs_count + remote_msgs_count;

if total > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
}
} else if msgs_count > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router
.insert(RouterDirection::Dest, int_address, 1)
.unwrap();
};
if total_msgs > MAX_PAR_0_MSGS_COUNT_LIMIT {
partition_router.insert(RouterDirection::Dest, dest_int_address, 1)?;
}
}
}

Ok(())
}

pub fn last_read_to_anchor_chain_time(&self) -> Option<u64> {
Expand Down
5 changes: 4 additions & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,10 @@ impl CollatorStdImpl {
// return reader state to working state
let FinalizedMessagesReader {
mut reader_state, ..
} = messages_reader.finalize(0, vec![])?;
} = messages_reader.finalize(
0, // can pass 0 because new messages reader was not initialized in this case
vec![],
)?;
std::mem::swap(&mut working_state.reader_state, &mut reader_state);

working_state.has_unprocessed_messages = Some(has_pending_internals);
Expand Down
4 changes: 4 additions & 0 deletions collator/src/internal_queue/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ fn gc_task<V: InternalMessageValue>(
}
}
}

// the total number of entries in the GC state
let total_entries = gc_state.values().map(|map| map.len()).sum::<usize>();
metrics::gauge!("tycho_internal_queue_gc_state_size").set(total_entries as f64);
}

type GcRange = FastHashMap<QueuePartition, FastHashMap<ShardIdent, QueueKey>>;
Loading

0 comments on commit 7f46b38

Please sign in to comment.