From cbd03c24edab015989e33520a6c81422fb8405be Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Tue, 3 Dec 2024 16:55:55 -0800 Subject: [PATCH] checkpoints and commiting --- Cargo.lock | 51 ++-- .../filesystem/sink/two_phase_committer.rs | 12 +- .../arroyo-connectors/src/kafka/sink/mod.rs | 23 +- crates/arroyo-controller/Cargo.toml | 1 + .../src/job_controller/checkpointer.rs | 16 -- .../src/job_controller/mod.rs | 21 +- .../src/states/scheduling.rs | 146 +++++------ crates/arroyo-operator/src/context.rs | 32 ++- crates/arroyo-operator/src/lib.rs | 1 + crates/arroyo-operator/src/operator.rs | 244 +++++++++++------- crates/arroyo-planner/src/lib.rs | 3 +- crates/arroyo-rpc/default.toml | 2 +- crates/arroyo-rpc/proto/rpc.proto | 2 +- crates/arroyo-rpc/src/config.rs | 10 +- crates/arroyo-state/src/checkpoint_state.rs | 9 +- crates/arroyo-state/src/committing_state.rs | 17 +- .../src/tables/global_keyed_map.rs | 3 +- crates/arroyo-worker/src/arrow/async_udf.rs | 4 +- .../src/arrow/tumbling_aggregating_window.rs | 4 +- .../src/arrow/watermark_generator.rs | 16 +- crates/arroyo-worker/src/engine.rs | 13 + crates/arroyo-worker/src/lib.rs | 15 +- 22 files changed, 356 insertions(+), 289 deletions(-) delete mode 100644 crates/arroyo-controller/src/job_controller/checkpointer.rs diff --git a/Cargo.lock b/Cargo.lock index d5522f980..2b0783482 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -336,7 +336,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "53.2.0" -source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=53.2.0/json#24c93dff8203e766ea30cdd0461d06c10608f53c" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=53.2.0%2Fjson#24c93dff8203e766ea30cdd0461d06c10608f53c" dependencies = [ "arrow-array", "arrow-buffer", @@ -644,6 +644,7 @@ dependencies = [ "k8s-openapi", "kube", "lazy_static", + "log", "petgraph", "postgres", "postgres-types", @@ -2821,7 +2822,7 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" [[package]] name = "datafusion" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -2877,7 +2878,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow-schema", "async-trait", @@ -2891,7 +2892,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -2915,7 +2916,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "log", "tokio", @@ -2924,7 +2925,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "chrono", @@ -2944,7 +2945,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -2967,7 +2968,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "datafusion-common", @@ -2978,7 +2979,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "arrow-buffer", @@ -3004,7 +3005,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -3024,7 +3025,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -3048,7 +3049,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "arrow-array", @@ -3070,7 +3071,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "datafusion-common", "datafusion-expr", @@ -3084,7 +3085,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", @@ -3093,7 +3094,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "async-trait", @@ -3112,7 +3113,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -3139,7 +3140,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -3153,7 +3154,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "arrow-schema", @@ -3168,7 +3169,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "ahash", "arrow", @@ -3202,7 +3203,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "chrono", @@ -3217,7 +3218,7 @@ dependencies = [ [[package]] name = "datafusion-proto-common" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "chrono", @@ -3229,7 +3230,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "43.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0/arroyo#319f59b143208e025d7bf952748492d847068a39" +source = "git+https://github.com/ArroyoSystems/arrow-datafusion?branch=43.0.0%2Farroyo#319f59b143208e025d7bf952748492d847068a39" dependencies = [ "arrow", "arrow-array", @@ -6257,7 +6258,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.1" -source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.11.1/arroyo#4cfe48061503161e43cd3cd7960e74ce789bd3b9" +source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.11.1%2Farroyo#4cfe48061503161e43cd3cd7960e74ce789bd3b9" dependencies = [ "async-trait", "base64 0.22.1", @@ -6499,7 +6500,7 @@ dependencies = [ [[package]] name = "parquet" version = "53.2.0" -source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=53.2.0/parquet_bytes#424920f863e1b8286c3ce8261cce16f0360428c5" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=53.2.0%2Fparquet_bytes#424920f863e1b8286c3ce8261cce16f0360428c5" dependencies = [ "ahash", "arrow-array", diff --git a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs index b180765d9..d72b3e7ed 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs @@ -152,6 +152,10 @@ impl ArrowOperator for TwoPhaseCommitterOperator { ); tables } + + fn is_committing(&self) -> bool { + true + } async fn on_start(&mut self, ctx: &mut OperatorContext) { let tracking_key_state: &mut GlobalKeyedView = ctx @@ -184,14 +188,6 @@ impl ArrowOperator for TwoPhaseCommitterOperator { .expect("record inserted"); } - async fn on_close(&mut self, final_message: &Option, ctx: &mut OperatorContext, collector: &mut dyn Collector) { - if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await { - self.handle_commit(epoch, commit_data, ctx).await; - } else { - warn!("no commit message received, not committing") - } - } - async fn handle_commit( &mut self, epoch: u32, diff --git a/crates/arroyo-connectors/src/kafka/sink/mod.rs b/crates/arroyo-connectors/src/kafka/sink/mod.rs index b5c47c52e..ad3a5e6cf 100644 --- a/crates/arroyo-connectors/src/kafka/sink/mod.rs +++ b/crates/arroyo-connectors/src/kafka/sink/mod.rs @@ -75,10 +75,6 @@ impl From for ConsistencyMode { } impl KafkaSinkFunc { - fn is_committing(&self) -> bool { - matches!(self.consistency_mode, ConsistencyMode::ExactlyOnce { .. }) - } - fn set_timestamp_col(&mut self, schema: &ArroyoSchema) { if let Some(f) = &self.timestamp_field { if let Ok(f) = schema.schema.field_with_name(f) { @@ -271,6 +267,10 @@ impl ArrowOperator for KafkaSinkFunc { } } + fn is_committing(&self) -> bool { + matches!(self.consistency_mode, ConsistencyMode::ExactlyOnce { .. }) + } + async fn on_start(&mut self, ctx: &mut OperatorContext) { self.set_timestamp_col(&ctx.in_schemas[0]); self.set_key_col(&ctx.in_schemas[0]); @@ -352,8 +352,10 @@ impl ArrowOperator for KafkaSinkFunc { }; let Some(committing_producer) = producer_to_complete.take() else { - unimplemented!("received a commit message without a producer ready to commit. Restoring from commit phase not yet implemented"); + error!("received a commit message without a producer ready to commit. Restoring from commit phase not yet implemented"); + return; }; + let mut commits_attempted = 0; loop { if committing_producer @@ -384,19 +386,10 @@ impl ArrowOperator for KafkaSinkFunc { async fn on_close( &mut self, - final_message: &Option, + _: &Option, ctx: &mut OperatorContext, _: &mut dyn Collector, ) { self.flush(ctx).await; - if !self.is_committing() { - return; - } - // if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await { - // self.handle_commit(epoch, &commit_data, ctx).await; - // } else { - // warn!("no commit message received, not committing") - // } - todo!("committing") } } diff --git a/crates/arroyo-controller/Cargo.toml b/crates/arroyo-controller/Cargo.toml index 45fe8c1d8..2a0978f1b 100644 --- a/crates/arroyo-controller/Cargo.toml +++ b/crates/arroyo-controller/Cargo.toml @@ -66,6 +66,7 @@ uuid = "1.3.3" async-stream = "0.3.5" base64 = "0.22" rusqlite = { version = "0.31.0", features = ["serde_json", "time"] } +log = "0.4.22" [build-dependencies] cornucopia = { workspace = true } diff --git a/crates/arroyo-controller/src/job_controller/checkpointer.rs b/crates/arroyo-controller/src/job_controller/checkpointer.rs deleted file mode 100644 index 9fba2501b..000000000 --- a/crates/arroyo-controller/src/job_controller/checkpointer.rs +++ /dev/null @@ -1,16 +0,0 @@ -use arroyo_state::checkpoint_state::CheckpointState; -use arroyo_state::committing_state::CommittingState; - -pub enum CheckpointingOrCommittingState { - Checkpointing(CheckpointState), - Committing(CommittingState), -} - -impl CheckpointingOrCommittingState { - pub(crate) fn done(&self) -> bool { - match self { - CheckpointingOrCommittingState::Checkpointing(checkpointing) => checkpointing.done(), - CheckpointingOrCommittingState::Committing(committing) => committing.done(), - } - } -} diff --git a/crates/arroyo-controller/src/job_controller/mod.rs b/crates/arroyo-controller/src/job_controller/mod.rs index 48d709134..716b2c74e 100644 --- a/crates/arroyo-controller/src/job_controller/mod.rs +++ b/crates/arroyo-controller/src/job_controller/mod.rs @@ -33,15 +33,26 @@ use tokio::{sync::mpsc::Receiver, task::JoinHandle}; use tonic::{transport::Channel, Request}; use tracing::{debug, error, info, warn}; -use self::checkpointer::CheckpointingOrCommittingState; - -mod checkpointer; pub mod job_metrics; const CHECKPOINTS_TO_KEEP: u32 = 4; const CHECKPOINT_ROWS_TO_KEEP: u32 = 100; const COMPACT_EVERY: u32 = 2; +pub enum CheckpointingOrCommittingState { + Checkpointing(CheckpointState), + Committing(CommittingState), +} + +impl CheckpointingOrCommittingState { + pub(crate) fn done(&self) -> bool { + match self { + CheckpointingOrCommittingState::Checkpointing(checkpointing) => checkpointing.done(), + CheckpointingOrCommittingState::Committing(committing) => committing.done(), + } + } +} + #[derive(Debug, PartialEq, Eq)] pub enum WorkerState { Running, @@ -188,7 +199,8 @@ impl RunningJobModel { CheckpointingOrCommittingState::Committing(committing_state) => { if matches!(c.event_type(), TaskCheckpointEventType::FinishedCommit) { - committing_state.subtask_committed(c.node_id, c.subtask_index); + committing_state + .subtask_committed(c.operator_id.clone(), c.subtask_index); self.compact_state().await?; } else { warn!("unexpected checkpoint event type {:?}", c.event_type()) @@ -452,6 +464,7 @@ impl RunningJobModel { DbCheckpointState::committing, ) .await?; + let committing_data = committing_state.committing_data(); self.checkpoint_state = Some(CheckpointingOrCommittingState::Committing(committing_state)); diff --git a/crates/arroyo-controller/src/states/scheduling.rs b/crates/arroyo-controller/src/states/scheduling.rs index d61641198..7bd62bdb1 100644 --- a/crates/arroyo-controller/src/states/scheduling.rs +++ b/crates/arroyo-controller/src/states/scheduling.rs @@ -362,79 +362,79 @@ impl State for Scheduling { } metadata.min_epoch = min_epoch; if needs_commits { - // let mut commit_subtasks = HashSet::new(); - // // (node_id => (table_name => (subtask => data))) - // let mut committing_data: HashMap>>> = - // HashMap::new(); - // for operator_id in &metadata.operator_ids { - // let operator_metadata = - // StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch) - // .await - // .map_err(|err| { - // fatal( - // format!( - // "Failed to restore job; operator metadata for {} not found.", - // operator_id - // ), - // err, - // ) - // })?; - // let Some(operator_metadata) = operator_metadata else { - // return Err(fatal( - // "missing operator metadata", - // anyhow!( - // "operator metadata for {} not found for job {}", - // operator_id, - // ctx.config.id - // ), - // )); - // }; - // for (table_name, table_metadata) in &operator_metadata.table_checkpoint_metadata - // { - // let config = - // operator_metadata - // .table_configs - // .get(table_name) - // .ok_or_else(|| { - // fatal( - // format!( - // "Failed to restore job; table config for {} not found.", - // table_name - // ), - // anyhow!("table config for {} not found", table_name), - // ) - // })?; - // if let Some(commit_data) = match config.table_type() { - // arroyo_rpc::grpc::rpc::TableEnum::MissingTableType => { - // return Err(fatal( - // "Missing table type", - // anyhow!("table type not found"), - // )); - // } - // arroyo_rpc::grpc::rpc::TableEnum::GlobalKeyValue => { - // GlobalKeyedTable::committing_data(config.clone(), table_metadata) - // } - // arroyo_rpc::grpc::rpc::TableEnum::ExpiringKeyedTimeTable => None, - // } { - // committing_data - // .entry(operator_id.clone()) - // .or_default() - // .insert(table_name.to_string(), commit_data); - // let program_node = ctx - // .program - // .graph - // .node_weights() - // .find(|node| node.node_id == *operator_id) - // .unwrap(); - // for subtask_index in 0..program_node.parallelism { - // commit_subtasks.insert((operator_id.clone(), subtask_index as u32)); - // } - // } - // } - // - // committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data)); - // } - todo!("committing") + let mut commit_subtasks = HashSet::new(); + // (operator_id => (table_name => (subtask => data))) + let mut committing_data: HashMap>>> = + HashMap::new(); + for operator_id in &metadata.operator_ids { + let operator_metadata = + StateBackend::load_operator_metadata(&ctx.config.id, operator_id, epoch) + .await + .map_err(|err| { + fatal( + format!( + "Failed to restore job; operator metadata for {} not found.", + operator_id + ), + err, + ) + })?; + let Some(operator_metadata) = operator_metadata else { + return Err(fatal( + "missing operator metadata", + anyhow!( + "operator metadata for {} not found for job {}", + operator_id, + ctx.config.id + ), + )); + }; + for (table_name, table_metadata) in &operator_metadata.table_checkpoint_metadata + { + let config = + operator_metadata + .table_configs + .get(table_name) + .ok_or_else(|| { + fatal( + format!( + "Failed to restore job; table config for {} not found.", + table_name + ), + anyhow!("table config for {} not found", table_name), + ) + })?; + if let Some(commit_data) = match config.table_type() { + arroyo_rpc::grpc::rpc::TableEnum::MissingTableType => { + return Err(fatal( + "Missing table type", + anyhow!("table type not found"), + )); + } + arroyo_rpc::grpc::rpc::TableEnum::GlobalKeyValue => { + GlobalKeyedTable::committing_data(config.clone(), table_metadata) + } + arroyo_rpc::grpc::rpc::TableEnum::ExpiringKeyedTimeTable => None, + } { + committing_data + .entry(operator_id.clone()) + .or_default() + .insert(table_name.to_string(), commit_data); + let program_node = ctx + .program + .graph + .node_weights() + .find(|node| { + node.operator_chain.first().operator_id == *operator_id + }) + .unwrap(); + for subtask_index in 0..program_node.parallelism { + commit_subtasks.insert((operator_id.clone(), subtask_index as u32)); + } + } + } + } + committing_state = Some(CommittingState::new(id, commit_subtasks, committing_data)); } StateBackend::write_checkpoint_metadata(metadata) .await diff --git a/crates/arroyo-operator/src/context.rs b/crates/arroyo-operator/src/context.rs index 23f23535f..4485eaea7 100644 --- a/crates/arroyo-operator/src/context.rs +++ b/crates/arroyo-operator/src/context.rs @@ -533,7 +533,7 @@ impl ErrorReporter { #[async_trait] pub trait Collector: Send { async fn collect(&mut self, batch: RecordBatch); - async fn broadcast(&mut self, message: SignalMessage); + async fn broadcast_watermark(&mut self, watermark: Watermark); } #[derive(Clone)] @@ -649,19 +649,8 @@ impl Collector for ArrowCollector { } } - async fn broadcast(&mut self, message: SignalMessage) { - for out_node in &self.out_qs { - for q in out_node { - q.send(ArrowMessage::Signal(message.clone())) - .await - .unwrap_or_else(|e| { - panic!( - "failed to broadcast message <{:?}> for operator {}: {}", - message, self.chain_info, e - ) - }); - } - } + async fn broadcast_watermark(&mut self, watermark: Watermark) { + self.broadcast(SignalMessage::Watermark(watermark)).await; } } @@ -709,6 +698,21 @@ impl ArrowCollector { tx_queue_bytes_gauges, } } + + pub async fn broadcast(&mut self, message: SignalMessage) { + for out_node in &self.out_qs { + for q in out_node { + q.send(ArrowMessage::Signal(message.clone())) + .await + .unwrap_or_else(|e| { + panic!( + "failed to broadcast message <{:?}> for operator {}: {}", + message, self.chain_info, e + ) + }); + } + } + } } impl OperatorContext { diff --git a/crates/arroyo-operator/src/lib.rs b/crates/arroyo-operator/src/lib.rs index 8ac0eacab..f33cf6aa0 100644 --- a/crates/arroyo-operator/src/lib.rs +++ b/crates/arroyo-operator/src/lib.rs @@ -64,6 +64,7 @@ pub enum ControlOutcome { Stop, StopAndSendStop, Finish, + StopAfterCommit, } #[derive(Debug)] diff --git a/crates/arroyo-operator/src/operator.rs b/crates/arroyo-operator/src/operator.rs index 888ec8c52..d349da594 100644 --- a/crates/arroyo-operator/src/operator.rs +++ b/crates/arroyo-operator/src/operator.rs @@ -151,8 +151,18 @@ impl OperatorNode { } } + pub fn operator_ids(&self) -> Vec { + match self { + OperatorNode::Source(s) => vec![s.context.task_info.operator_id.clone()], + OperatorNode::Chained(s) => s + .iter() + .map(|(_, ctx)| ctx.task_info.operator_id.clone()) + .collect(), + } + } + async fn run_behavior( - mut self, + self, chain_info: &Arc, control_tx: Sender, control_rx: Receiver, @@ -287,9 +297,8 @@ async fn run_checkpoint( task_info: &TaskInfo, watermark: Option, table_manager: &mut TableManager, - collector: &mut dyn Collector, control_tx: &Sender, -) -> bool { +) { table_manager .checkpoint(checkpoint_barrier, watermark) .await; @@ -301,12 +310,6 @@ async fn run_checkpoint( TaskCheckpointEventType::FinishedSync, ) .await; - - collector - .broadcast(SignalMessage::Barrier(checkpoint_barrier)) - .await; - - checkpoint_barrier.then_stop } #[async_trait] @@ -348,10 +351,15 @@ pub trait SourceOperator: Send + 'static { &ctx.task_info, ctx.watermarks.last_present_watermark(), &mut ctx.table_manager, - &mut collector.collector, &ctx.control_tx, ) - .await + .await; + + collector + .broadcast(SignalMessage::Barrier(checkpoint_barrier)) + .await; + + checkpoint_barrier.then_stop } } @@ -383,9 +391,35 @@ macro_rules! call_and_recurse { }; } +macro_rules! call_with_collector { + ($self:expr, $final_collector:expr, $name:ident, $arg:expr) => { + match &mut $self.next { + Some(next) => { + let mut collector = ChainedCollector { + cur: next, + index: 0, + in_partitions: 1, + final_collector: $final_collector, + }; + + $self + .operator + .$name($arg, &mut $self.context, &mut collector) + .await; + } + None => { + $self + .operator + .$name($arg, &mut $self.context, $final_collector) + .await; + } + } + }; +} + pub struct ChainedCollector<'a, 'b> { cur: &'a mut ChainedOperator, - final_collector: &'b mut dyn Collector, + final_collector: &'b mut ArrowCollector, index: usize, in_partitions: usize, } @@ -430,17 +464,10 @@ where }; } - async fn broadcast(&mut self, message: SignalMessage) { - match message { - SignalMessage::Watermark(w) => { - self.cur - .handle_watermark(w, self.index, self.final_collector) - .await; - } - m => { - todo!("Unsupported signal message: {:?}", m); - } - } + async fn broadcast_watermark(&mut self, watermark: Watermark) { + self.cur + .handle_watermark(watermark, self.index, self.final_collector) + .await; } } @@ -513,10 +540,30 @@ impl Future for IndexedFuture { } impl ChainedOperator { - async fn handle_controller_message(&mut self, control_message: &ControlMessage) { + async fn handle_controller_message( + &mut self, + control_message: &ControlMessage, + shutdown_after_commit: bool, + ) -> bool { for (op, ctx) in self.iter_mut() { - op.handle_controller_message(control_message, ctx).await; + match control_message { + ControlMessage::Checkpoint(_) => { + error!("shouldn't receive checkpoint") + } + ControlMessage::Stop { .. } => { + error!("shouldn't receive stop") + } + ControlMessage::Commit { epoch, commit_data } => { + op.handle_commit(*epoch, &commit_data, ctx).await; + return shutdown_after_commit; + } + ControlMessage::LoadCompacted { compacted } => { + ctx.load_compacted(compacted).await; + } + ControlMessage::NoOp => {} + } } + false } pub fn iter(&self) -> ChainIterator { @@ -553,7 +600,7 @@ impl ChainedOperator { index: usize, in_partitions: usize, batch: RecordBatch, - final_collector: &'b mut dyn Collector, + final_collector: &'b mut ArrowCollector, ) where 'a: 'b, { @@ -605,7 +652,7 @@ impl ChainedOperator { in_partitions: usize, control_tx: &Sender, chain_info: &ChainInfo, - collector: &mut dyn Collector, + collector: &mut ArrowCollector, ) -> ControlOutcome { match message { SignalMessage::Barrier(t) => { @@ -628,44 +675,18 @@ impl ChainedOperator { if counter.mark(idx, t) { debug!("Checkpointing {chain_info}"); - send_checkpoint_event( - control_tx, - &self.context.task_info, - *t, - TaskCheckpointEventType::StartedCheckpointing, - ) - .await; - - self.handle_checkpoint(*t, collector).await; + self.run_checkpoint(t, control_tx, collector).await; - send_checkpoint_event( - control_tx, - &self.context.task_info, - *t, - TaskCheckpointEventType::FinishedOperatorSetup, - ) - .await; + collector.broadcast(SignalMessage::Barrier(*t)).await; - // we want the watermark from the last op in the chain, as that will be the smallest - // and this is used to determine whether when we can drop the checkpoint file - let last_watermark = self - .iter() - .last() - .unwrap() - .1 - .watermarks - .last_present_watermark(); - if run_checkpoint( - *t, - &self.context.task_info, - last_watermark, - &mut self.context.table_manager, - collector, - control_tx, - ) - .await - { - return ControlOutcome::Stop; + if t.then_stop { + // if this is a committing operator, we need to wait for the commit message + // before shutting down; otherwise we just stop + return if self.operator.is_committing() { + ControlOutcome::StopAfterCommit + } else { + return ControlOutcome::Stop; + }; } } } @@ -694,7 +715,7 @@ impl ChainedOperator { &mut self, watermark: Watermark, index: usize, - final_collector: &mut dyn Collector, + final_collector: &mut ArrowCollector, ) { trace!( "handling watermark {:?} for {}", @@ -752,7 +773,7 @@ impl ChainedOperator { &mut self, op_index: usize, result: Box, - final_collector: &mut dyn Collector, + final_collector: &mut ArrowCollector, ) { let mut op = self; for _ in 0..op_index { @@ -782,31 +803,69 @@ impl ChainedOperator { } } - async fn handle_checkpoint( + async fn run_checkpoint( &mut self, - b: CheckpointBarrier, - final_collector: &mut dyn Collector, + t: &CheckpointBarrier, + control_tx: &Sender, + final_collector: &mut ArrowCollector, ) { - call_and_recurse!(self, final_collector, handle_checkpoint, b) + send_checkpoint_event( + control_tx, + &self.context.task_info, + *t, + TaskCheckpointEventType::StartedCheckpointing, + ) + .await; + + call_with_collector!(self, final_collector, handle_checkpoint, *t); + + send_checkpoint_event( + control_tx, + &self.context.task_info, + *t, + TaskCheckpointEventType::FinishedOperatorSetup, + ) + .await; + + let last_watermark = self.context.watermarks.last_present_watermark(); + + run_checkpoint( + *t, + &self.context.task_info, + last_watermark, + &mut self.context.table_manager, + control_tx, + ) + .await; + + if let Some(next) = &mut self.next { + Box::pin(next.run_checkpoint(t, control_tx, final_collector)).await; + } } async fn handle_commit( &mut self, epoch: u32, commit_data: &HashMap>>, - ctx: &mut OperatorContext, ) { - todo!() + assert_eq!( + self.iter().count(), + 1, + "commits can only be applied to sinks" + ); + self.operator + .handle_commit(epoch, commit_data, &mut self.context) + .await; } - async fn handle_tick(&mut self, tick: u64, final_collector: &mut dyn Collector) { + async fn handle_tick(&mut self, tick: u64, final_collector: &mut ArrowCollector) { call_and_recurse!(self, final_collector, handle_tick, tick) } async fn on_close( &mut self, final_message: &Option, - final_collector: &mut dyn Collector, + final_collector: &mut ArrowCollector, ) { match &mut self.next { Some(next) => { @@ -878,11 +937,15 @@ async fn operator_run_behavior( interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut shutdown_after_commit = false; + loop { let operator_future: OptionFuture<_> = this.future_to_poll().into(); tokio::select! { Some(control_message) = control_rx.recv() => { - this.handle_controller_message(&control_message).await; + if this.handle_controller_message(&control_message, shutdown_after_commit).await { + break; + } } p = sel.next() => { @@ -914,6 +977,9 @@ async fn operator_run_behavior( // a final checkpoint break; } + ControlOutcome::StopAfterCommit => { + shutdown_after_commit = true; + } ControlOutcome::Finish => { final_message = Some(SignalMessage::EndOfData); break; @@ -939,7 +1005,9 @@ async fn operator_run_behavior( } None => { info!("[{}] Stream completed", chain_info); - break; + if !shutdown_after_commit { + break; + } } } } @@ -1025,34 +1093,16 @@ pub struct DisplayableOperator<'a> { #[async_trait::async_trait] pub trait ArrowOperator: Send + 'static { - async fn handle_controller_message( - &mut self, - control_message: &ControlMessage, - ctx: &mut OperatorContext, - ) { - match control_message { - ControlMessage::Checkpoint(_) => { - error!("shouldn't receive checkpoint") - } - ControlMessage::Stop { .. } => { - error!("shouldn't receive stop") - } - ControlMessage::Commit { epoch, commit_data } => { - self.handle_commit(*epoch, &commit_data, ctx).await; - } - ControlMessage::LoadCompacted { compacted } => { - ctx.load_compacted(compacted).await; - } - ControlMessage::NoOp => {} - } - } - fn name(&self) -> String; fn tables(&self) -> HashMap { HashMap::new() } + fn is_committing(&self) -> bool { + false + } + fn tick_interval(&self) -> Option { None } diff --git a/crates/arroyo-planner/src/lib.rs b/crates/arroyo-planner/src/lib.rs index 895b07e6b..db4d5f95f 100644 --- a/crates/arroyo-planner/src/lib.rs +++ b/crates/arroyo-planner/src/lib.rs @@ -60,7 +60,6 @@ use arrow::compute::kernels::cast_utils::parse_interval_day_time; use arroyo_datastream::logical::LogicalProgram; use arroyo_datastream::optimizers::ChainingOptimizer; use arroyo_operator::connector::Connection; -use arroyo_rpc::config::config; use arroyo_rpc::df::ArroyoSchema; use arroyo_rpc::TIMESTAMP_FIELD; use arroyo_udf_host::parse::{inner_type, UdfDef}; @@ -814,7 +813,7 @@ pub async fn parse_and_get_arrow_program( }, ); - if arroyo_rpc::config::config().pipeline.enable_chaining { + if arroyo_rpc::config::config().pipeline.chaining.enabled { program.optimize(&ChainingOptimizer {}); } diff --git a/crates/arroyo-rpc/default.toml b/crates/arroyo-rpc/default.toml index 32b8b0039..2efa34838 100644 --- a/crates/arroyo-rpc/default.toml +++ b/crates/arroyo-rpc/default.toml @@ -10,7 +10,7 @@ worker-heartbeat-timeout = "30s" healthy-duration = "2m" worker-startup-time = "10m" task-startup-time = "2m" -enable-chaining = false +chaining.enabled = true [pipeline.compaction] enabled = false diff --git a/crates/arroyo-rpc/proto/rpc.proto b/crates/arroyo-rpc/proto/rpc.proto index dd6096716..a921d43f0 100644 --- a/crates/arroyo-rpc/proto/rpc.proto +++ b/crates/arroyo-rpc/proto/rpc.proto @@ -352,7 +352,7 @@ message CheckpointResp { message CommitReq { uint32 epoch = 1; - map committing_data = 2; + map committing_data = 2; } message CommitResp { diff --git a/crates/arroyo-rpc/src/config.rs b/crates/arroyo-rpc/src/config.rs index 9f2b38932..0a3d43c2a 100644 --- a/crates/arroyo-rpc/src/config.rs +++ b/crates/arroyo-rpc/src/config.rs @@ -427,12 +427,18 @@ pub struct PipelineConfig { #[serde(default)] pub default_sink: DefaultSink, - /// Whether to enable operator chaining - pub enable_chaining: bool, + pub chaining: ChainingConfig, pub compaction: CompactionConfig, } +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +pub struct ChainingConfig { + /// Whether to enable operator chaining + pub enabled: bool, +} + #[derive(Debug, Deserialize, Serialize, Eq, PartialEq, Clone)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] pub enum DatabaseType { diff --git a/crates/arroyo-state/src/checkpoint_state.rs b/crates/arroyo-state/src/checkpoint_state.rs index e8f6dfed6..80dd2b23c 100644 --- a/crates/arroyo-state/src/checkpoint_state.rs +++ b/crates/arroyo-state/src/checkpoint_state.rs @@ -37,9 +37,9 @@ pub struct CheckpointState { operators: usize, operators_checkpointed: usize, operator_state: HashMap, - subtasks_to_commit: HashSet<(u32, u32)>, + subtasks_to_commit: HashSet<(String, u32)>, // map of operator_id -> table_name -> subtask_index -> Data - commit_data: HashMap>>>, + commit_data: HashMap>>>, // Used for the web ui -- eventually should be replaced with some other way of tracking / reporting // this data @@ -316,10 +316,11 @@ impl CheckpointState { } } { for i in 0..operator_state.subtasks_checkpointed { - self.subtasks_to_commit.insert((c.node_id, i as u32)); + self.subtasks_to_commit + .insert((c.operator_id.clone(), i as u32)); } self.commit_data - .entry(c.node_id) + .entry(c.operator_id.clone()) .or_default() .insert(table.clone(), committing_data); } diff --git a/crates/arroyo-state/src/committing_state.rs b/crates/arroyo-state/src/committing_state.rs index 7ff0065bf..12e5138c1 100644 --- a/crates/arroyo-state/src/committing_state.rs +++ b/crates/arroyo-state/src/committing_state.rs @@ -4,15 +4,15 @@ use arroyo_rpc::grpc::rpc::{OperatorCommitData, TableCommitData}; pub struct CommittingState { checkpoint_id: String, - subtasks_to_commit: HashSet<(u32, u32)>, - committing_data: HashMap>>>, + subtasks_to_commit: HashSet<(String, u32)>, + committing_data: HashMap>>>, } impl CommittingState { pub fn new( checkpoint_id: String, - subtasks_to_commit: HashSet<(u32, u32)>, - committing_data: HashMap>>>, + subtasks_to_commit: HashSet<(String, u32)>, + committing_data: HashMap>>>, ) -> Self { Self { checkpoint_id, @@ -25,19 +25,20 @@ impl CommittingState { &self.checkpoint_id } - pub fn subtask_committed(&mut self, node_id: u32, subtask_index: u32) { - self.subtasks_to_commit.remove(&(node_id, subtask_index)); + pub fn subtask_committed(&mut self, operator_id: String, subtask_index: u32) { + self.subtasks_to_commit + .remove(&(operator_id, subtask_index)); } pub fn done(&self) -> bool { self.subtasks_to_commit.is_empty() } - pub fn committing_data(&self) -> HashMap { + pub fn committing_data(&self) -> HashMap { let operators_to_commit: HashSet<_> = self .subtasks_to_commit .iter() - .map(|(node_id, _subtask_id)| *node_id) + .map(|(operator_id, _subtask_id)| operator_id.clone()) .collect(); operators_to_commit diff --git a/crates/arroyo-state/src/tables/global_keyed_map.rs b/crates/arroyo-state/src/tables/global_keyed_map.rs index b09638817..ac240c09b 100644 --- a/crates/arroyo-state/src/tables/global_keyed_map.rs +++ b/crates/arroyo-state/src/tables/global_keyed_map.rs @@ -194,12 +194,13 @@ impl Table for GlobalKeyedTable { ) -> Result> { Ok(checkpoint.files.into_iter().collect()) } + fn committing_data( config: Self::ConfigMessage, table_metadata: Self::TableCheckpointMessage, ) -> Option>> { if config.uses_two_phase_commit { - Some(table_metadata.commit_data_by_subtask.clone()) + Some(table_metadata.commit_data_by_subtask) } else { None } diff --git a/crates/arroyo-worker/src/arrow/async_udf.rs b/crates/arroyo-worker/src/arrow/async_udf.rs index 7acdc7ba0..ecfd89a34 100644 --- a/crates/arroyo-worker/src/arrow/async_udf.rs +++ b/crates/arroyo-worker/src/arrow/async_udf.rs @@ -486,9 +486,7 @@ impl AsyncUdfOperator { if watermark_id <= oldest_unprocessed { // we've processed everything before this watermark, we can emit and drop it - collector - .broadcast(SignalMessage::Watermark(watermark)) - .await; + collector.broadcast_watermark(watermark).await; } else { // we still have messages preceding this watermark to work on self.watermarks.push_front((watermark_id, watermark)); diff --git a/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs b/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs index 98a573e06..91ce2af92 100644 --- a/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs +++ b/crates/arroyo-worker/src/arrow/tumbling_aggregating_window.rs @@ -416,9 +416,7 @@ impl ArrowOperator for TumblingAggregatingWindowFunc { let data: Box> = result.downcast().expect("invalid data in future"); if let Some((bin, batch_option)) = *data { match batch_option { - None => { - debug!("future for {} was finished elsewhere", print_time(bin)); - } + None => {} Some((batch, future)) => match self.execs.get_mut(&bin) { Some(exec) => { exec.finished_batches diff --git a/crates/arroyo-worker/src/arrow/watermark_generator.rs b/crates/arroyo-worker/src/arrow/watermark_generator.rs index e0d4ad268..af253b511 100644 --- a/crates/arroyo-worker/src/arrow/watermark_generator.rs +++ b/crates/arroyo-worker/src/arrow/watermark_generator.rs @@ -135,17 +135,17 @@ impl ArrowOperator for WatermarkGenerator { async fn on_close( &mut self, final_message: &Option, - ctx: &mut OperatorContext, + _: &mut OperatorContext, collector: &mut dyn Collector, ) { if let Some(SignalMessage::EndOfData) = final_message { // send final watermark on close collector - .broadcast(SignalMessage::Watermark( + .broadcast_watermark( // this is in the year 2554, far enough out be close to inifinity, // but can still be formatted. Watermark::EventTime(from_nanos(u64::MAX as u128)), - )) + ) .await; } } @@ -193,7 +193,7 @@ impl ArrowOperator for WatermarkGenerator { to_millis(watermark) ); collector - .broadcast(SignalMessage::Watermark(Watermark::EventTime(watermark))) + .broadcast_watermark(Watermark::EventTime(watermark)) .await; self.state_cache.last_watermark_emitted_at = max_timestamp; self.idle = false; @@ -202,7 +202,7 @@ impl ArrowOperator for WatermarkGenerator { async fn handle_checkpoint( &mut self, - b: CheckpointBarrier, + _: CheckpointBarrier, ctx: &mut OperatorContext, _: &mut dyn Collector, ) { @@ -217,7 +217,7 @@ impl ArrowOperator for WatermarkGenerator { async fn handle_tick( &mut self, - t: u64, + _: u64, ctx: &mut OperatorContext, collector: &mut dyn Collector, ) { @@ -227,9 +227,7 @@ impl ArrowOperator for WatermarkGenerator { "Setting partition {} to idle after {:?}", ctx.task_info.task_index, idle_time ); - collector - .broadcast(SignalMessage::Watermark(Watermark::Idle)) - .await; + collector.broadcast_watermark(Watermark::Idle).await; self.idle = true; } } diff --git a/crates/arroyo-worker/src/engine.rs b/crates/arroyo-worker/src/engine.rs index 79663575c..0fff1f2db 100644 --- a/crates/arroyo-worker/src/engine.rs +++ b/crates/arroyo-worker/src/engine.rs @@ -81,6 +81,7 @@ impl Debug for SubtaskNode { pub struct QueueNode { task_info: Arc, + operator_ids: Vec, tx: Sender, } @@ -128,6 +129,7 @@ impl SubtaskOrQueueNode { let n = SubtaskOrQueueNode::QueueNode(QueueNode { task_info: sn.node.task_info().clone(), + operator_ids: sn.node.operator_ids(), tx, }); @@ -434,6 +436,17 @@ impl RunningEngine { controls } + + pub fn operator_to_node(&self) -> HashMap { + let program = self.program.graph.read().unwrap(); + let mut result = HashMap::new(); + for n in program.node_weights() { + for id in &n.as_queue().operator_ids { + result.insert(id.clone(), n.as_queue().task_info.node_id); + } + } + result + } } impl Engine { diff --git a/crates/arroyo-worker/src/lib.rs b/crates/arroyo-worker/src/lib.rs index 9ea192132..9ffbb0b17 100644 --- a/crates/arroyo-worker/src/lib.rs +++ b/crates/arroyo-worker/src/lib.rs @@ -94,7 +94,8 @@ impl Debug for LogicalNode { struct EngineState { sources: Vec>, sinks: Vec>, - operator_controls: HashMap>>, // operator_id -> vec of control tx + operator_to_node: HashMap, + operator_controls: HashMap>>, // node_id -> vec of control tx shutdown_guard: ShutdownGuard, } @@ -491,11 +492,13 @@ impl WorkerGrpc for WorkerServer { let sources = engine.source_controls(); let sinks = engine.sink_controls(); let operator_controls = engine.operator_controls(); + let operator_to_node = engine.operator_to_node(); let mut state = self.state.lock().unwrap(); *state = Some(EngineState { sources, sinks, + operator_to_node, operator_controls, shutdown_guard: self.shutdown_guard.child("engine-state"), }); @@ -564,6 +567,7 @@ impl WorkerGrpc for WorkerServer { async fn commit(&self, request: Request) -> Result, Status> { let req = request.into_inner(); debug!("received commit request {:?}", req); + let sender_commit_map_pairs = { let state_mutex = self.state.lock().unwrap(); let Some(state) = state_mutex.as_ref() else { @@ -571,9 +575,13 @@ impl WorkerGrpc for WorkerServer { "Worker has not yet started execution", )); }; + let mut sender_commit_map_pairs = vec![]; - for (node_id, commit_operator) in req.committing_data { - let nodes = state.operator_controls.get(&node_id).unwrap().clone(); + for (operator_id, commit_operator) in req.committing_data { + let node_id = state.operator_to_node.get(&operator_id).unwrap_or_else(|| { + panic!("Could not find node for operator id {}", operator_id) + }); + let nodes = state.operator_controls.get(node_id).unwrap().clone(); let commit_map: HashMap<_, _> = commit_operator .committing_data .into_iter() @@ -583,6 +591,7 @@ impl WorkerGrpc for WorkerServer { } sender_commit_map_pairs }; + for (senders, commit_map) in sender_commit_map_pairs { for sender in senders { sender