diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 9df750775e3..1abb2f3e73a 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -1,3 +1,4 @@ +use bytesize::ByteSize; use chrono::Utc; use clap::Parser; use congestion_model::strategy::{ @@ -8,7 +9,7 @@ use congestion_model::workload::{ AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer, }; use congestion_model::{ - summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS, + summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS, TGAS, }; use std::time::Duration; use tracing_subscriber::layer::SubscriberExt; @@ -176,6 +177,86 @@ fn strategy(strategy_name: &str, num_shards: usize) -> Vec Box::::default(), "Traffic Light" => Box::::default(), "NEP" => Box::::default(), + "NEP 200MB" => Box::new( + NepStrategy::default().with_memory_limits(ByteSize::mb(100), ByteSize::mb(100)), + ), + "NEP 450/50MB" => Box::new( + // keep outgoing limit small + // (1) if we hit this, it's due to another shard's incoming congestion, + // so we are already in a second stage of congestion and should be more aggressive + // (2) this soft limit will be breached quite a bit anyway + // as we don't stop executing receipts + NepStrategy::default().with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)), + ), + "NEP 1GB" => Box::new( + NepStrategy::default().with_memory_limits(ByteSize::mb(500), ByteSize::mb(500)), + ), + "NEP 10 Pgas" => Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 10 * PGAS)), + "NEP 1 Pgas" => Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 10 * PGAS)), + "NEP 10/1 Pgas" => { + Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 1 * PGAS)) + } + // NEP v2 takes results from memory and gas limits into account and fixes those + "NEPv2" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)), + ), + "NEPv2 1GB" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(900), ByteSize::mb(100)), + ), + "NEPv2 early global stop" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_global_stop_limit(0.5), + ), + "NEPv2 late global stop" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_global_stop_limit(1.0), + ), + "NEPv2 less forwarding" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_send_gas_limit_range(PGAS / 2, 2 * PGAS), + ), + "NEPv2 more forwarding" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_send_gas_limit_range(PGAS / 2, 100 * PGAS), + ), + "NEPv2 less tx" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_tx_gas_limit_range(0, 100 * TGAS), + ), + "NEPv2 more tx" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + .with_tx_gas_limit_range(5 * TGAS, 900 * TGAS), + ), + // NEP v3 takes results from v2 into account + // it is still work in progress, just something to throw out there for now + // note: it showed weird behavior depending on number of shards, + // doing bad with 8 or 12 shards on balanced workloads but doing + // great with 10 shards. + "NEPv3" => Box::new( + NepStrategy::default() + .with_gas_limits(10 * PGAS, 1 * PGAS) + .with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)) + // less tx is generally better for all-to-one workloads, but balanced workloads need more + .with_tx_gas_limit_range(0, 500 * TGAS) + // less forwarding is generally quite good, also the + .with_send_gas_limit_range(0, 5 * PGAS), + ), _ => panic!("unknown strategy: {}", strategy_name), }; @@ -221,6 +302,21 @@ fn parse_strategy_names(strategy_name: &str) -> Vec { "New TX last".to_string(), "Traffic Light".to_string(), "NEP".to_string(), + "NEP 200MB".to_string(), + "NEP 450/50MB".to_string(), + "NEP 1GB".to_string(), + "NEP 10 Pgas".to_string(), + "NEP 1 Pgas".to_string(), + "NEP 10/1 Pgas".to_string(), + "NEPv2".to_string(), + "NEPv2 1GB".to_string(), + "NEPv2 early global stop".to_string(), + "NEPv2 late global stop".to_string(), + "NEPv2 less forwarding".to_string(), + "NEPv2 more forwarding".to_string(), + "NEPv2 less tx".to_string(), + "NEPv2 more tx".to_string(), + "NEPv3".to_string(), ]; if strategy_name == "all" { diff --git a/tools/congestion-model/src/strategy/nep.rs b/tools/congestion-model/src/strategy/nep.rs index 1360f15ef68..42fd50b1cf1 100644 --- a/tools/congestion-model/src/strategy/nep.rs +++ b/tools/congestion-model/src/strategy/nep.rs @@ -4,7 +4,6 @@ use crate::model::ChunkExecutionContext; use crate::strategy::QueueFactory; use crate::{GGas, QueueId, Receipt, ShardId, TransactionId, GAS_LIMIT, PGAS, TGAS}; -#[derive(Default)] pub struct NepStrategy { pub shard_id: Option, pub all_shards: Vec, @@ -15,6 +14,41 @@ pub struct NepStrategy { // How much gas are we allowed to send to other shards. pub outgoing_gas_limit: BTreeMap, + + // numbers to fine-tune + pub min_tx_gas: GGas, + pub max_tx_gas: GGas, + pub min_send_limit: GGas, + pub max_send_limit: GGas, + pub global_outgoing_congestion_limit: f64, + pub max_incoming_gas: GGas, + pub max_incoming_congestion_memory: u64, + pub max_outgoing_congestion_memory: u64, + pub max_outgoing_gas: GGas, +} + +impl Default for NepStrategy { + fn default() -> Self { + Self { + // parameters which can be set by the model runner + min_tx_gas: 5 * TGAS, + max_tx_gas: 500 * TGAS, + min_send_limit: 0, + max_send_limit: 30 * PGAS, + global_outgoing_congestion_limit: 0.9, + max_incoming_gas: 100 * PGAS, + max_outgoing_gas: 100 * PGAS, + max_incoming_congestion_memory: 250_000_000, + max_outgoing_congestion_memory: 250_000_000, + + // init fills these + shard_id: Default::default(), + all_shards: Default::default(), + other_shards: Default::default(), + outgoing_queues: Default::default(), + outgoing_gas_limit: Default::default(), + } + } } #[derive(Default, Clone)] @@ -57,14 +91,11 @@ impl crate::CongestionStrategy for NepStrategy { impl NepStrategy { // Step 1: Compute bandwidth limits to other shards based on the congestion information fn init_send_limit(&mut self, ctx: &mut ChunkExecutionContext<'_>) { - let min_send_limit = 0; - let max_send_limit = 30 * PGAS; - self.outgoing_gas_limit.clear(); for shard_id in self.other_shards.clone() { let CongestedShardsInfo { incoming_congestion, .. } = self.get_info(ctx, &shard_id); - let send_limit = mix(max_send_limit, min_send_limit, incoming_congestion); + let send_limit = mix(self.max_send_limit, self.min_send_limit, incoming_congestion); self.outgoing_gas_limit.insert(shard_id, send_limit); } @@ -102,11 +133,8 @@ impl NepStrategy { // // The outgoing receipts are processed as in `process_outgoing_receipts`. fn process_new_transactions(&mut self, ctx: &mut ChunkExecutionContext<'_>) { - let min_gas = 5 * TGAS; - let max_gas = 500 * TGAS; - let incoming_congestion = self.get_incoming_congestion(ctx); - let tx_limit = mix(max_gas, min_gas, incoming_congestion); + let tx_limit = mix(self.max_tx_gas, self.min_tx_gas, incoming_congestion); while ctx.gas_burnt() < tx_limit { let Some(tx) = ctx.incoming_transactions().pop_front() else { @@ -132,11 +160,9 @@ impl NepStrategy { // // TODO consider smooth slow down fn get_global_stop(&mut self, ctx: &mut ChunkExecutionContext<'_>) -> bool { - let global_outgoing_congestion_limit = 0.9; - for shard_id in self.all_shards.clone() { let CongestedShardsInfo { outgoing_congestion, .. } = self.get_info(ctx, &shard_id); - if outgoing_congestion > global_outgoing_congestion_limit { + if outgoing_congestion > self.global_outgoing_congestion_limit { return true; } } @@ -201,17 +227,15 @@ impl NepStrategy { } fn incoming_memory_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 { - let max_congestion_memory_consumption = 250_000_000 as f64; - let memory_consumption = ctx.incoming_receipts().size(); - let memory_congestion = memory_consumption as f64 / max_congestion_memory_consumption; + let memory_congestion = + memory_consumption as f64 / self.max_incoming_congestion_memory as f64; f64::clamp(memory_congestion, 0.0, 1.0) } fn incoming_gas_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 { - let max_congestion_incoming_gas = (100 * PGAS) as f64; let gas_backlog = ctx.incoming_receipts().attached_gas() as f64; - f64::clamp(gas_backlog / max_congestion_incoming_gas, 0.0, 1.0) + f64::clamp(gas_backlog / self.max_incoming_gas as f64, 0.0, 1.0) } fn get_outgoing_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 { @@ -219,26 +243,23 @@ impl NepStrategy { } fn outgoing_memory_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 { - let max_congestion_memory_consumption = 250_000_000 as f64; - let mut memory_consumption = 0; for (_, queue_id) in &self.outgoing_queues { memory_consumption += ctx.queue(*queue_id).size(); } - let memory_congestion = memory_consumption as f64 / max_congestion_memory_consumption; + let memory_congestion = + memory_consumption as f64 / self.max_outgoing_congestion_memory as f64; f64::clamp(memory_congestion, 0.0, 1.0) } fn outgoing_gas_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 { - let max_gas_backlog = (100 * PGAS) as f64; - let mut gas_backlog = 0; for (_, queue_id) in &self.outgoing_queues { gas_backlog += ctx.queue(*queue_id).attached_gas(); } - let gas_congestion = gas_backlog as f64 / max_gas_backlog; + let gas_congestion = gas_backlog as f64 / self.max_outgoing_gas as f64; f64::clamp(gas_congestion, 0.0, 1.0) } @@ -278,6 +299,44 @@ impl NepStrategy { fn shard_id(&self) -> ShardId { self.shard_id.unwrap() } + + /// Define 100% congestion limit in gas. + pub fn with_gas_limits(mut self, incoming: GGas, outgoing: GGas) -> Self { + self.max_incoming_gas = incoming; + self.max_outgoing_gas = outgoing; + self + } + + /// Define 100% congestion limit in bytes. + pub fn with_memory_limits( + mut self, + incoming: bytesize::ByteSize, + outgoing: bytesize::ByteSize, + ) -> Self { + self.max_incoming_congestion_memory = incoming.as_u64(); + self.max_outgoing_congestion_memory = outgoing.as_u64(); + self + } + + /// Gas spent on new transactions. + pub fn with_tx_gas_limit_range(mut self, min: GGas, max: GGas) -> Self { + self.min_tx_gas = min; + self.max_tx_gas = max; + self + } + + /// Gas allowance to sent to other shards. + pub fn with_send_gas_limit_range(mut self, min: GGas, max: GGas) -> Self { + self.min_send_limit = min; + self.max_send_limit = max; + self + } + + /// At how much % congestion the global stop should kick in. + pub fn with_global_stop_limit(mut self, congestion_level: f64) -> Self { + self.global_outgoing_congestion_limit = congestion_level; + self + } } fn mix(x: u64, y: u64, a: f64) -> u64 {