Skip to content

Commit

Permalink
checkpoints and commiting
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 4, 2024
1 parent a1fbed3 commit cbd03c2
Show file tree
Hide file tree
Showing 22 changed files with 356 additions and 289 deletions.
51 changes: 26 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ impl<TPC: TwoPhaseCommitter> ArrowOperator for TwoPhaseCommitterOperator<TPC> {
);
tables
}

fn is_committing(&self) -> bool {
true
}

async fn on_start(&mut self, ctx: &mut OperatorContext) {
let tracking_key_state: &mut GlobalKeyedView<usize, TPC::DataRecovery> = ctx
Expand Down Expand Up @@ -184,14 +188,6 @@ impl<TPC: TwoPhaseCommitter> ArrowOperator for TwoPhaseCommitterOperator<TPC> {
.expect("record inserted");
}

async fn on_close(&mut self, final_message: &Option<SignalMessage>, ctx: &mut OperatorContext, collector: &mut dyn Collector) {
if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
self.handle_commit(epoch, commit_data, ctx).await;
} else {
warn!("no commit message received, not committing")
}
}

async fn handle_commit(
&mut self,
epoch: u32,
Expand Down
23 changes: 8 additions & 15 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ impl From<SinkCommitMode> for ConsistencyMode {
}

impl KafkaSinkFunc {
fn is_committing(&self) -> bool {
matches!(self.consistency_mode, ConsistencyMode::ExactlyOnce { .. })
}

fn set_timestamp_col(&mut self, schema: &ArroyoSchema) {
if let Some(f) = &self.timestamp_field {
if let Ok(f) = schema.schema.field_with_name(f) {
Expand Down Expand Up @@ -271,6 +267,10 @@ impl ArrowOperator for KafkaSinkFunc {
}
}

fn is_committing(&self) -> bool {
matches!(self.consistency_mode, ConsistencyMode::ExactlyOnce { .. })
}

async fn on_start(&mut self, ctx: &mut OperatorContext) {
self.set_timestamp_col(&ctx.in_schemas[0]);
self.set_key_col(&ctx.in_schemas[0]);
Expand Down Expand Up @@ -352,8 +352,10 @@ impl ArrowOperator for KafkaSinkFunc {
};

let Some(committing_producer) = producer_to_complete.take() else {
unimplemented!("received a commit message without a producer ready to commit. Restoring from commit phase not yet implemented");
error!("received a commit message without a producer ready to commit. Restoring from commit phase not yet implemented");
return;
};

let mut commits_attempted = 0;
loop {
if committing_producer
Expand Down Expand Up @@ -384,19 +386,10 @@ impl ArrowOperator for KafkaSinkFunc {

async fn on_close(
&mut self,
final_message: &Option<SignalMessage>,
_: &Option<SignalMessage>,
ctx: &mut OperatorContext,
_: &mut dyn Collector,
) {
self.flush(ctx).await;
if !self.is_committing() {
return;
}
// if let Some(ControlMessage::Commit { epoch, commit_data }) = ctx.control_rx.recv().await {
// self.handle_commit(epoch, &commit_data, ctx).await;
// } else {
// warn!("no commit message received, not committing")
// }
todo!("committing")
}
}
1 change: 1 addition & 0 deletions crates/arroyo-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ uuid = "1.3.3"
async-stream = "0.3.5"
base64 = "0.22"
rusqlite = { version = "0.31.0", features = ["serde_json", "time"] }
log = "0.4.22"

[build-dependencies]
cornucopia = { workspace = true }
Expand Down
16 changes: 0 additions & 16 deletions crates/arroyo-controller/src/job_controller/checkpointer.rs

This file was deleted.

21 changes: 17 additions & 4 deletions crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,26 @@ use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tonic::{transport::Channel, Request};
use tracing::{debug, error, info, warn};

use self::checkpointer::CheckpointingOrCommittingState;

mod checkpointer;
pub mod job_metrics;

const CHECKPOINTS_TO_KEEP: u32 = 4;
const CHECKPOINT_ROWS_TO_KEEP: u32 = 100;
const COMPACT_EVERY: u32 = 2;

pub enum CheckpointingOrCommittingState {
Checkpointing(CheckpointState),
Committing(CommittingState),
}

impl CheckpointingOrCommittingState {
pub(crate) fn done(&self) -> bool {
match self {
CheckpointingOrCommittingState::Checkpointing(checkpointing) => checkpointing.done(),
CheckpointingOrCommittingState::Committing(committing) => committing.done(),
}
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum WorkerState {
Running,
Expand Down Expand Up @@ -188,7 +199,8 @@ impl RunningJobModel {
CheckpointingOrCommittingState::Committing(committing_state) => {
if matches!(c.event_type(), TaskCheckpointEventType::FinishedCommit)
{
committing_state.subtask_committed(c.node_id, c.subtask_index);
committing_state
.subtask_committed(c.operator_id.clone(), c.subtask_index);
self.compact_state().await?;
} else {
warn!("unexpected checkpoint event type {:?}", c.event_type())
Expand Down Expand Up @@ -452,6 +464,7 @@ impl RunningJobModel {
DbCheckpointState::committing,
)
.await?;

let committing_data = committing_state.committing_data();
self.checkpoint_state =
Some(CheckpointingOrCommittingState::Committing(committing_state));
Expand Down
Loading

0 comments on commit cbd03c2

Please sign in to comment.