Skip to content

Commit

Permalink
congestion: fine-tuning parameters in the model (#11009)
Browse files Browse the repository at this point in the history
This adds the ability to parametrize the NEP strategy.
Plus, it add sever model parametrizations for comparisons..
  • Loading branch information
jakmeier authored Apr 11, 2024
1 parent ad911b7 commit 21c108f
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 24 deletions.
98 changes: 97 additions & 1 deletion tools/congestion-model/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytesize::ByteSize;
use chrono::Utc;
use clap::Parser;
use congestion_model::strategy::{
Expand All @@ -8,7 +9,7 @@ use congestion_model::workload::{
AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer,
};
use congestion_model::{
summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS,
summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS, TGAS,
};
use std::time::Duration;
use tracing_subscriber::layer::SubscriberExt;
Expand Down Expand Up @@ -176,6 +177,86 @@ fn strategy(strategy_name: &str, num_shards: usize) -> Vec<Box<dyn CongestionStr
"New TX last" => Box::<NewTxLast>::default(),
"Traffic Light" => Box::<TrafficLight>::default(),
"NEP" => Box::<NepStrategy>::default(),
"NEP 200MB" => Box::new(
NepStrategy::default().with_memory_limits(ByteSize::mb(100), ByteSize::mb(100)),
),
"NEP 450/50MB" => Box::new(
// keep outgoing limit small
// (1) if we hit this, it's due to another shard's incoming congestion,
// so we are already in a second stage of congestion and should be more aggressive
// (2) this soft limit will be breached quite a bit anyway
// as we don't stop executing receipts
NepStrategy::default().with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)),
),
"NEP 1GB" => Box::new(
NepStrategy::default().with_memory_limits(ByteSize::mb(500), ByteSize::mb(500)),
),
"NEP 10 Pgas" => Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 10 * PGAS)),
"NEP 1 Pgas" => Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 10 * PGAS)),
"NEP 10/1 Pgas" => {
Box::new(NepStrategy::default().with_gas_limits(10 * PGAS, 1 * PGAS))
}
// NEP v2 takes results from memory and gas limits into account and fixes those
"NEPv2" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50)),
),
"NEPv2 1GB" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(900), ByteSize::mb(100)),
),
"NEPv2 early global stop" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_global_stop_limit(0.5),
),
"NEPv2 late global stop" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_global_stop_limit(1.0),
),
"NEPv2 less forwarding" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_send_gas_limit_range(PGAS / 2, 2 * PGAS),
),
"NEPv2 more forwarding" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_send_gas_limit_range(PGAS / 2, 100 * PGAS),
),
"NEPv2 less tx" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_tx_gas_limit_range(0, 100 * TGAS),
),
"NEPv2 more tx" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
.with_tx_gas_limit_range(5 * TGAS, 900 * TGAS),
),
// NEP v3 takes results from v2 into account
// it is still work in progress, just something to throw out there for now
// note: it showed weird behavior depending on number of shards,
// doing bad with 8 or 12 shards on balanced workloads but doing
// great with 10 shards.
"NEPv3" => Box::new(
NepStrategy::default()
.with_gas_limits(10 * PGAS, 1 * PGAS)
.with_memory_limits(ByteSize::mb(450), ByteSize::mb(50))
// less tx is generally better for all-to-one workloads, but balanced workloads need more
.with_tx_gas_limit_range(0, 500 * TGAS)
// less forwarding is generally quite good, also the
.with_send_gas_limit_range(0, 5 * PGAS),
),
_ => panic!("unknown strategy: {}", strategy_name),
};

Expand Down Expand Up @@ -221,6 +302,21 @@ fn parse_strategy_names(strategy_name: &str) -> Vec<String> {
"New TX last".to_string(),
"Traffic Light".to_string(),
"NEP".to_string(),
"NEP 200MB".to_string(),
"NEP 450/50MB".to_string(),
"NEP 1GB".to_string(),
"NEP 10 Pgas".to_string(),
"NEP 1 Pgas".to_string(),
"NEP 10/1 Pgas".to_string(),
"NEPv2".to_string(),
"NEPv2 1GB".to_string(),
"NEPv2 early global stop".to_string(),
"NEPv2 late global stop".to_string(),
"NEPv2 less forwarding".to_string(),
"NEPv2 more forwarding".to_string(),
"NEPv2 less tx".to_string(),
"NEPv2 more tx".to_string(),
"NEPv3".to_string(),
];

if strategy_name == "all" {
Expand Down
105 changes: 82 additions & 23 deletions tools/congestion-model/src/strategy/nep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::model::ChunkExecutionContext;
use crate::strategy::QueueFactory;
use crate::{GGas, QueueId, Receipt, ShardId, TransactionId, GAS_LIMIT, PGAS, TGAS};

#[derive(Default)]
pub struct NepStrategy {
pub shard_id: Option<ShardId>,
pub all_shards: Vec<ShardId>,
Expand All @@ -15,6 +14,41 @@ pub struct NepStrategy {

// How much gas are we allowed to send to other shards.
pub outgoing_gas_limit: BTreeMap<ShardId, GGas>,

// numbers to fine-tune
pub min_tx_gas: GGas,
pub max_tx_gas: GGas,
pub min_send_limit: GGas,
pub max_send_limit: GGas,
pub global_outgoing_congestion_limit: f64,
pub max_incoming_gas: GGas,
pub max_incoming_congestion_memory: u64,
pub max_outgoing_congestion_memory: u64,
pub max_outgoing_gas: GGas,
}

impl Default for NepStrategy {
fn default() -> Self {
Self {
// parameters which can be set by the model runner
min_tx_gas: 5 * TGAS,
max_tx_gas: 500 * TGAS,
min_send_limit: 0,
max_send_limit: 30 * PGAS,
global_outgoing_congestion_limit: 0.9,
max_incoming_gas: 100 * PGAS,
max_outgoing_gas: 100 * PGAS,
max_incoming_congestion_memory: 250_000_000,
max_outgoing_congestion_memory: 250_000_000,

// init fills these
shard_id: Default::default(),
all_shards: Default::default(),
other_shards: Default::default(),
outgoing_queues: Default::default(),
outgoing_gas_limit: Default::default(),
}
}
}

#[derive(Default, Clone)]
Expand Down Expand Up @@ -57,14 +91,11 @@ impl crate::CongestionStrategy for NepStrategy {
impl NepStrategy {
// Step 1: Compute bandwidth limits to other shards based on the congestion information
fn init_send_limit(&mut self, ctx: &mut ChunkExecutionContext<'_>) {
let min_send_limit = 0;
let max_send_limit = 30 * PGAS;

self.outgoing_gas_limit.clear();

for shard_id in self.other_shards.clone() {
let CongestedShardsInfo { incoming_congestion, .. } = self.get_info(ctx, &shard_id);
let send_limit = mix(max_send_limit, min_send_limit, incoming_congestion);
let send_limit = mix(self.max_send_limit, self.min_send_limit, incoming_congestion);

self.outgoing_gas_limit.insert(shard_id, send_limit);
}
Expand Down Expand Up @@ -102,11 +133,8 @@ impl NepStrategy {
//
// The outgoing receipts are processed as in `process_outgoing_receipts`.
fn process_new_transactions(&mut self, ctx: &mut ChunkExecutionContext<'_>) {
let min_gas = 5 * TGAS;
let max_gas = 500 * TGAS;

let incoming_congestion = self.get_incoming_congestion(ctx);
let tx_limit = mix(max_gas, min_gas, incoming_congestion);
let tx_limit = mix(self.max_tx_gas, self.min_tx_gas, incoming_congestion);

while ctx.gas_burnt() < tx_limit {
let Some(tx) = ctx.incoming_transactions().pop_front() else {
Expand All @@ -132,11 +160,9 @@ impl NepStrategy {
//
// TODO consider smooth slow down
fn get_global_stop(&mut self, ctx: &mut ChunkExecutionContext<'_>) -> bool {
let global_outgoing_congestion_limit = 0.9;

for shard_id in self.all_shards.clone() {
let CongestedShardsInfo { outgoing_congestion, .. } = self.get_info(ctx, &shard_id);
if outgoing_congestion > global_outgoing_congestion_limit {
if outgoing_congestion > self.global_outgoing_congestion_limit {
return true;
}
}
Expand Down Expand Up @@ -201,44 +227,39 @@ impl NepStrategy {
}

fn incoming_memory_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 {
let max_congestion_memory_consumption = 250_000_000 as f64;

let memory_consumption = ctx.incoming_receipts().size();
let memory_congestion = memory_consumption as f64 / max_congestion_memory_consumption;
let memory_congestion =
memory_consumption as f64 / self.max_incoming_congestion_memory as f64;
f64::clamp(memory_congestion, 0.0, 1.0)
}

fn incoming_gas_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 {
let max_congestion_incoming_gas = (100 * PGAS) as f64;
let gas_backlog = ctx.incoming_receipts().attached_gas() as f64;
f64::clamp(gas_backlog / max_congestion_incoming_gas, 0.0, 1.0)
f64::clamp(gas_backlog / self.max_incoming_gas as f64, 0.0, 1.0)
}

fn get_outgoing_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 {
f64::max(self.outgoing_memory_congestion(ctx), self.outgoing_gas_congestion(ctx))
}

fn outgoing_memory_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 {
let max_congestion_memory_consumption = 250_000_000 as f64;

let mut memory_consumption = 0;
for (_, queue_id) in &self.outgoing_queues {
memory_consumption += ctx.queue(*queue_id).size();
}

let memory_congestion = memory_consumption as f64 / max_congestion_memory_consumption;
let memory_congestion =
memory_consumption as f64 / self.max_outgoing_congestion_memory as f64;
f64::clamp(memory_congestion, 0.0, 1.0)
}

fn outgoing_gas_congestion(&self, ctx: &mut ChunkExecutionContext) -> f64 {
let max_gas_backlog = (100 * PGAS) as f64;

let mut gas_backlog = 0;
for (_, queue_id) in &self.outgoing_queues {
gas_backlog += ctx.queue(*queue_id).attached_gas();
}

let gas_congestion = gas_backlog as f64 / max_gas_backlog;
let gas_congestion = gas_backlog as f64 / self.max_outgoing_gas as f64;
f64::clamp(gas_congestion, 0.0, 1.0)
}

Expand Down Expand Up @@ -278,6 +299,44 @@ impl NepStrategy {
fn shard_id(&self) -> ShardId {
self.shard_id.unwrap()
}

/// Define 100% congestion limit in gas.
pub fn with_gas_limits(mut self, incoming: GGas, outgoing: GGas) -> Self {
self.max_incoming_gas = incoming;
self.max_outgoing_gas = outgoing;
self
}

/// Define 100% congestion limit in bytes.
pub fn with_memory_limits(
mut self,
incoming: bytesize::ByteSize,
outgoing: bytesize::ByteSize,
) -> Self {
self.max_incoming_congestion_memory = incoming.as_u64();
self.max_outgoing_congestion_memory = outgoing.as_u64();
self
}

/// Gas spent on new transactions.
pub fn with_tx_gas_limit_range(mut self, min: GGas, max: GGas) -> Self {
self.min_tx_gas = min;
self.max_tx_gas = max;
self
}

/// Gas allowance to sent to other shards.
pub fn with_send_gas_limit_range(mut self, min: GGas, max: GGas) -> Self {
self.min_send_limit = min;
self.max_send_limit = max;
self
}

/// At how much % congestion the global stop should kick in.
pub fn with_global_stop_limit(mut self, congestion_level: f64) -> Self {
self.global_outgoing_congestion_limit = congestion_level;
self
}
}

fn mix(x: u64, y: u64, a: f64) -> u64 {
Expand Down

0 comments on commit 21c108f

Please sign in to comment.