-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(infra): concurrent materializer tests #1243
base: main
Are you sure you want to change the base?
Conversation
@@ -59,13 +54,14 @@ fn read_manifest(file_name: &str) -> anyhow::Result<Manifest> { | |||
|
|||
/// Parse a manifest file in the `manifests` directory, clean up any corresponding | |||
/// testnet resources, then materialize a testnet and run some tests. | |||
pub async fn with_testnet<F, G>(manifest_file_name: &str, alter: G, test: F) -> anyhow::Result<()> | |||
pub async fn with_testnet<F, G>(manifest_file_name: &str, concurrency: Option<concurrency::Config>, alter: G, test: F) -> anyhow::Result<()> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I consider this inversion of control (IoC) as convenient utility -- rather bespoke -- that the original developer of the materializer used to create an initial batch of tests. I would not glorify it and turn it into a focal entrypoint for every future test.
- There isn't a single clear cut way we'll want all potentially tests that require some concurrency to behave, so I'm not sold on introducing concurrency as framework feature.
Instead, I think we're better served if we introduced a non-IoC API here:
- Extract the logic that actually materializes the definition into a separate function that returns the Manifest, DockerMaterializer, DockerTestnet.
- The test can now call this function to materialize a definition, do whatever it wants (using whatever concurrency it desires).
- Something needs to have a drop guard here that destroys the materialized testnet, probably the DockerTestnet? Not sure if that's implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've inverted the inversion, as you suggested, now providing a cleanup function. I don't think it's helpful to maintain 2 patterns, so I migrated prev tests.
It also helped to untangle concurrency from the framework, making it a simple test lib utility.
} | ||
|
||
pub async fn record(&mut self, label: String) { | ||
let duration = self.start_time.unwrap().elapsed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel expect here if better if you assume caller know calling "start" should happen first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revise this API once the reporting summary is more solid.
Ok(bencher) => (Some(bencher), None), | ||
Err(err) => (None, Some(err)), | ||
}; | ||
step_results.lock().await.push(TestResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this triggers a lot of threads, then everyone is waiting on this as well, also as step_results gets big, so allocation might take some time. Just curious, if step_results
are not updated at all, will there be a big difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unlikely to be a bottleneck, it can only impose a small delay in the after-test lifecycle of the future, which isn't recorded nor is time sensitive. But I'll double check that once I'll get to high max concurrency figures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this slows down then maybe tokio::sync::mpsc
might be a better solution. You can then collect all the messages after all steps has finished.
|
||
#[derive(Default)] | ||
pub struct NonceManager { | ||
nonces: Arc<Mutex<HashMap<H160, U256>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bottom neck as well, every address is waiting on the same lock. Maybe this might help: https://github.com/xacrimon/dashmap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is just a temporary solution, I was hoping to remove it entirely. If not, I'll optimize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to try building in NonceManager from Ethers. Was there any problem with that?
I have a suggestion about how to improve the framework design to make it cleaner and more intuitive. While the current implementation works, there are some areas where terminology and structure can be refined to improve clarity and usability. Consider the following approach:
The Orchestrate the entire benchmarking process. Execute each BenchmarkStep sequentially within the specified duration limit. struct BenchmarkRunner {
steps: Vec<BenchmarkStep>,
max_duration: Duration,
}
impl BenchmarkRunner {
fn new(steps: Vec<BenchmarkStep>, max_duration: Duration) -> Self;
fn run(&self) -> BenchmarkResult;
}
The struct BenchmarkStep<F>
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static {
concurrency: usize, // Number of concurrent test executions (N)
run_duration: Duration, // Execution time duration (in seconds)
test_fn: Arc<F>,
}
impl<F> BenchmarkStep<F>
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static
{
fn execute(&self, stop_flag: Arc<AtomicBool>) -> StepResult;
} The
The TestInput structure can remain as it is, without the current struct TestResult {
pub test_id: usize,
pub step_id: usize,
pub tx_hash: Option<H256>,
pub tx_tracker: TransactionTracker,
pub err: Option<anyhow::Error>,
}
Instead of the existing The new method should automatically set the submission time to ensure correct tracking without requiring manual intervention. struct TransactionTracker {
submission_time: Instant,
mempool_time: Option<Instant>,
block_time: Option<Instant>,
}
impl TransactionTracker {
fn new() -> Self;
fn mark_mempool(&mut self);
fn mark_block(&mut self);
fn get_mempool_latency(&self) -> Option<Duration>;
fn get_block_latency(&self) -> Option<Duration>;
}
The struct StepResult {
step_id: usize,
tests: Vec<TestResult>,
avg_mempool_latency: Duration,
avg_block_latency: Duration,
// Additional useful statistics for the step
}
impl StepResult {
fn new(results: Vec<TestResult>) -> Self;
}
The execution engine should support concurrent execution of the test function for a specified duration, allowing precise control over execution time. fn run_concurrent<F>(concurrency: usize, run_duration: Duration, test_fn: F, stop_flag: Arc<AtomicBool>)
where
F: Fn(TestInput) -> TestResult + Send + Sync + 'static; The stop_flag ensures the execution stops when the total benchmark duration is reached or when other termination conditions occur.
The struct BenchmarkResult {
steps: Vec<StepResult>,
} ConclusionThis revised design primarily improves terminology and clarity, making the framework more cohesive and intuitive. The key benefits of the proposed approach include: Encapsulation: Each BenchmarkStep holds its own test function, making it easier to run varied tests within a single benchmark. Clarity: Replacing Intuitive Structure: The separation of responsibilities across BenchmarkRunner, BenchmarkStep, and TransactionTracker makes the design easier to understand and maintain. Overall, this proposal aligns closely with the current design but improves cohesion, intuitiveness, and robustness. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the first major review batch (1/2). Tomorrow, a smaller set of reviews will follow.
Outstanding reviews:
- The tests in
benches.rs
- Thoroughly review
summary.rs
|
||
#[derive(Default)] | ||
pub struct NonceManager { | ||
nonces: Arc<Mutex<HashMap<H160, U256>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to try building in NonceManager from Ethers. Was there any problem with that?
Ok(bencher) => (Some(bencher), None), | ||
Err(err) => (None, Some(err)), | ||
}; | ||
step_results.lock().await.push(TestResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this slows down then maybe tokio::sync::mpsc
might be a better solution. You can then collect all the messages after all steps has finished.
let mut results = Vec::new(); | ||
for (step_id, step) in cfg.steps.iter().enumerate() { | ||
let semaphore = Arc::new(Semaphore::new(step.max_concurrency)); | ||
let mut handles = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using FuturesUnordered
might be beneficial here? Since we do not care about order.
|
||
pub async fn execute<F>(cfg: config::Execution, test_factory: F) -> Vec<Vec<TestResult>> | ||
where | ||
F: Fn(TestInput) -> Pin<Box<dyn Future<Output = anyhow::Result<TestOutput>> + Send>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to get rid of the Pin<Box<dyn Future....>
? Maybe Fut: Future<Output = anyhow::Result<TestOutput>> + Send,
or something like that?
let step_results = Arc::new(tokio::sync::Mutex::new(Vec::new())); | ||
let execution_start = Instant::now(); | ||
loop { | ||
if execution_start.elapsed() > step.duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe?
while execution_start.elapsed() < step.duration {
sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap()); | ||
|
||
let count = sorted_data.len(); | ||
let mean: f64 = sorted_data.iter().sum::<f64>() / count as f64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let (sum, min, max) = data.iter().fold((0.0, f64::INFINITY, f64::NEG_INFINITY), |(sum, min, max), &x| {
(sum + x, sum::min(min, x), sum::max(max, x))
});
let mean = sum / count;
let max = *sorted_data.last().unwrap(); | ||
let min = *sorted_data.first().unwrap(); | ||
|
||
let percentile_90_index = ((count as f64) * 0.9).ceil() as usize - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure it won't panic. Maybe something like this might be useful?
let percentile_90_index = ((count * 0.9).ceil() as usize).min(data.len() - 1);
let percentile_90_index = ((count as f64) * 0.9).ceil() as usize - 1; | ||
let percentile_90 = sorted_data[percentile_90_index]; | ||
|
||
Metrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be beneficial to use an external library like statrs to handle these calculations, especially if we plan to introduce more complex ones in the future.
} | ||
} | ||
|
||
pub fn calc_metrics(data: Vec<f64>) -> Metrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: calculate_metrics
blocks: HashMap<u64, Block<H256>>, | ||
results: Vec<Vec<TestResult>>, | ||
) -> Self { | ||
let step_txs = Self::map_results_to_txs(&results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT:txs_by_steps
.await | ||
.unwrap(); | ||
tx = tx.gas(gas_estimation); | ||
assert!(gas_estimation <= max_tx_gas_limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this fail to whole test run?
} | ||
input.bencher.mempool(); | ||
|
||
let receipt = pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a timeout here in case it isn't included?
text_tables::render(&mut io::stdout(), data).unwrap(); | ||
} | ||
|
||
fn map_results_to_txs(results: &[Vec<TestResult>]) -> Vec<Vec<H256>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract_step_transaction_hashes
?
}) | ||
.collect() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment might be useful?
/// Group blocks by the "latest" step that contributed a TX to that block.
let offset = 1; // The first block is skipped for lacking a block interval. | ||
for i in offset..blocks.len() { | ||
let prev_block = &blocks[i - 1]; | ||
let curr_block = &blocks[i]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for window in blocks.windows(2) {
let prev = &window[0];
let curr = &window[1];
...
}
seems cleaner.
let interval = curr_block.timestamp.saturating_sub(prev_block.timestamp); | ||
|
||
if interval.le(&U256::zero()) { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let interval = curr.timestamp.saturating_sub(prev.timestamp);
if interval.is_zero() {
continue;
}
this is correct as the subtraction which saturates at zero.
Both reviews (2/2) are now complete. That should be all for now :) |
// Copyright 2022-2024 Protocol Labs | ||
// SPDX-License-Identifier: Apache-2.0, MIT | ||
|
||
pub struct Signal(tokio::sync::Semaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uses a loom::sync::Mutex
internally and is hence not signal safe, since pthread_mutex_lock
is not signal safe. Please correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was just browsing code base, but i was curious what is your concern here, e.g what signal safety means
it is generally working pattern to hold blocking locks that guard short sections executed in async context. blocking primitives are used consistently in the tokio codebase, one of the examples https://github.com/tokio-rs/tokio/blob/4b3da20c9847b202cf110f7b7772fd4674edaecf/tokio/src/sync/barrier.rs#L142-L148 , and some info here https://tokio.rs/tokio/tutorial/shared-state under Tasks, threads, and contention paragraph
specifically in semaphore it guards a section that doesn't yield by itself, and should be very fast to complete (i expect that to be sub 1us), so preemption by os is very unlikely.
that said, there is actually no waiting in this wrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess you meant that if this is used directly in the signal interrupt handler, then it doesn't protect from re-entrancy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Signal
needs some more context. My assumption was handling UNIX signals from doing a single pass.
Introducing concurrent tests in
materializer
, enabling the generation of traceable workloads without significantly altering how test scenarios are written.Existing tests refactored to use
make_testnet
instead ofwith_testnet
.Progress
docket_tests::benches::test_native_coin_transfer
)docket_tests::benches::test_contract_deployment
)docket_tests::benches::test_contract_call
)NonceManager
was introduced due toget_transaction_count
not being reliable. need to double check thatFor follow-up PRs
This change is![Reviewable](https://camo.githubusercontent.com/1541c4039185914e83657d3683ec25920c672c6c5c7ab4240ee7bff601adec0b/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)