Skip to content

Commit

Permalink
dispatch the block to the workers by the selected parent
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Sep 14, 2024
1 parent df3ed29 commit ef9acf7
Showing 1 changed file with 49 additions and 49 deletions.
98 changes: 49 additions & 49 deletions sync/src/parallel/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ impl<'a> DagBlockSender<'a> {
for executor in &mut self.executors {
match &executor.state {
ExecuteState::Executing(header_id) => {
if *header_id == block.header().parent_hash()
|| block.header.parents_hash().contains(header_id)
{
if *header_id == block.header().parent_hash() {
executor.state = ExecuteState::Executing(block.id());
info!("send block {:?} to executor {:p}", block.id(), &executor);
executor
Expand All @@ -80,23 +78,25 @@ impl<'a> DagBlockSender<'a> {
}
}

for executor in &mut self.executors {
match &executor.state {
ExecuteState::Executed(_) => {
executor.state = ExecuteState::Executing(block.id());
info!("send block {:?} to executor {:p}", block.id(), &executor);
executor
.sender_to_executor
.send(Some(block.clone()))
.await?;
return anyhow::Ok(true);
}

ExecuteState::Executing(_) | ExecuteState::Error(_) | ExecuteState::Closed => {
continue;
}
}
}
// for executor in &mut self.executors {
// match &executor.state {
// ExecuteState::Executed(executed_block) => {
// if executed_block.block().header().id() == block.header().parent_hash() {
// executor.state = ExecuteState::Executing(block.id());
// info!("send block {:?} to executor {:p}", block.id(), &executor);
// executor
// .sender_to_executor
// .send(Some(block.clone()))
// .await?;
// return anyhow::Ok(true);
// }
// }

// ExecuteState::Executing(_) | ExecuteState::Error(_) | ExecuteState::Closed => {
// continue;
// }
// }
// }

anyhow::Ok(false)
}
Expand All @@ -112,7 +112,7 @@ impl<'a> DagBlockSender<'a> {

// Finding the executing state is the priority
if self.dispatch_to_worker(&block).await? {
self.flush_executor_state().await?;
// self.flush_executor_state().await?;
continue;
}

Expand All @@ -137,7 +137,7 @@ impl<'a> DagBlockSender<'a> {

sender_to_worker.send(Some(block)).await?;

self.flush_executor_state().await?;
// self.flush_executor_state().await?;
}

self.wait_for_finish().await?;
Expand All @@ -146,33 +146,33 @@ impl<'a> DagBlockSender<'a> {
Ok(())
}

async fn flush_executor_state(&mut self) -> anyhow::Result<()> {
for worker in &mut self.executors {
match worker.receiver_from_executor.try_recv() {
Ok(state) => {
if let ExecuteState::Executed(executed_block) = state {
info!("finish to execute block {:?}", executed_block.header());
self.notifier.notify((*executed_block).clone())?;
worker.state = ExecuteState::Executed(executed_block);
}
}
Err(e) => match e {
mpsc::error::TryRecvError::Empty => (),
mpsc::error::TryRecvError::Disconnected => worker.state = ExecuteState::Closed,
},
}
}

let len = self.executors.len();
self.executors
.retain(|worker| !matches!(worker.state, ExecuteState::Closed));

if len != self.executors.len() {
info!("sync workers count: {:?}", self.executors.len());
}

anyhow::Ok(())
}
// async fn flush_executor_state(&mut self) -> anyhow::Result<()> {
// for worker in &mut self.executors {
// match worker.receiver_from_executor.try_recv() {
// Ok(state) => {
// if let ExecuteState::Executed(executed_block) = state {
// info!("finish to execute block {:?}", executed_block.header());
// self.notifier.notify((*executed_block).clone())?;
// worker.state = ExecuteState::Executed(executed_block);
// }
// }
// Err(e) => match e {
// mpsc::error::TryRecvError::Empty => (),
// mpsc::error::TryRecvError::Disconnected => worker.state = ExecuteState::Closed,
// },
// }
// }

// let len = self.executors.len();
// self.executors
// .retain(|worker| !matches!(worker.state, ExecuteState::Closed));

// if len != self.executors.len() {
// info!("sync workers count: {:?}", self.executors.len());
// }

// anyhow::Ok(())
// }

async fn wait_for_finish(mut self) -> anyhow::Result<()> {
// tell the workers to exit
Expand Down

0 comments on commit ef9acf7

Please sign in to comment.