Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: job isolation done #204

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added metadata serialization and deserialization
- Added retry job endpoint for failed jobs
- readme: setup instructions added
- Added : Grafana dashboard
Expand Down Expand Up @@ -51,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: job isolation added, each job will have needed information from it's worker
- verify_job now handles VerificationTimeout status
- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ axum-macros = "0.4.1"
bincode = "1.3.3"
bytes = "1.7.2"
color-eyre = "0.6.2"
chrono = "0.4.0"
chrono = { version = "0.4", features = ["serde"] }
c-kzg = "1.0.3"
dotenvy = "0.15.7"
futures = "0.3.30"
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ pub const BLOB_DATA_FILE_NAME: &str = "blob_data.txt";
pub const SNOS_OUTPUT_FILE_NAME: &str = "snos_output.json";
pub const PROGRAM_OUTPUT_FILE_NAME: &str = "program_output.txt";
pub const CAIRO_PIE_FILE_NAME: &str = "cairo_pie.zip";

pub const JOB_METADATA_CAIRO_PIE_PATH: &str = "cairo_pie_path";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've a lot of metadata constants here. should we reuse this?

pub const JOB_METADATA_SNOS_OUTPUT_PATH: &str = "snos_output_path";
pub const JOB_METADATA_PROGRAM_OUTPUT_PATH: &str = "program_output_path";
pub const JOB_METADATA_CROSS_VERIFY: &str = "cross_verify";
pub const JOB_METADATA_SNOS_FULL_OUTPUT: &str = "snos_full_output";
pub const JOB_METADATA_BLOB_DATA_PATH: &str = "blob_data_path";
27 changes: 21 additions & 6 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,29 @@ impl Database for MongoDb {

tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type");

// Get the first (and only) result if it exists
match cursor.try_next().await? {
Some(doc) => {
let job: JobItem = mongodb::bson::from_document(doc)?;
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
// Add debug logging to see the raw document
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tracing::info!(raw_document = ?doc, "Raw document from MongoDB");
tracing::debug!(raw_document = ?doc, "Raw document from MongoDB");


// Try to deserialize and log any errors
match mongodb::bson::from_document::<JobItem>(doc.clone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're cloning here so that we can log in error later? if yes, should we explain that's why we're cloning it?

Ok(job) => {
tracing::info!(deserialized_job = ?job, "Successfully deserialized job");
let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type")];
let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);
Ok(Some(job))
}
Err(e) => {
tracing::error!(
error = %e,
document = ?doc,
"Failed to deserialize document into JobItem"
);
Err(eyre!("Failed to deserialize document: {}", e))
}
}
}
None => Ok(None),
}
Expand Down
80 changes: 64 additions & 16 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
use color_eyre::eyre::WrapErr;
use color_eyre::eyre::{eyre, WrapErr};
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
Expand All @@ -19,7 +19,7 @@ use uuid::Uuid;
use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;
use crate::jobs::metadata::{JobMetadata, JobSpecificMetadata};
use crate::jobs::state_update_job::utils::biguint_vec_to_u8_vec;

lazy_static! {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Job for DaJob {
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
metadata: JobMetadata,
) -> Result<JobItem, JobError> {
let job_id = Uuid::new_v4();
tracing::info!(log_type = "starting", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation started.");
Expand All @@ -91,7 +91,24 @@ impl Job for DaJob {
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "DA job processing started.");
tracing::info!(
log_type = "starting",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
"DA job processing started."
);

// Get DA-specific metadata
let da_metadata = match &job.metadata.specific {
JobSpecificMetadata::Da(metadata) => metadata,
_ => {
tracing::error!(job_id = ?job.id, "Invalid metadata type for DA job");
return Err(JobError::Other(OtherError(eyre!("Invalid metadata type for DA job"))));
}
};

let block_no = job.internal_id.parse::<u64>().wrap_err("Failed to parse u64".to_string()).map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to parse block number");
JobError::Other(OtherError(e))
Expand All @@ -115,13 +132,14 @@ impl Job for DaJob {
MaybePendingStateUpdate::Update(state_update) => state_update,
};
tracing::debug!(job_id = ?job.id, "Retrieved state update");

// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert state update to blob data");
JobError::Other(OtherError(e))
})?;

// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

Expand All @@ -130,16 +148,26 @@ impl Job for DaJob {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
// Get blob data path from metadata
let blob_data_path = da_metadata.blob_data_path.as_ref().ok_or_else(|| {
tracing::error!(job_id = ?job.id, "Blob data path not found in metadata");
JobError::Other(OtherError(eyre!("Blob data path not found in metadata")))
})?;

// Store the transformed data
store_blob_data(transformed_data.clone(), blob_data_path, config.clone()).await?;
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;
tracing::trace!(job_id = ?job.id, max_bytes_per_blob = max_bytes_per_blob, max_blob_per_txn = max_blob_per_txn, "Retrieved DA client configuration");
// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data
tracing::trace!(
job_id = ?job.id,
max_bytes_per_blob = max_bytes_per_blob,
max_blob_per_txn = max_blob_per_txn,
"Retrieved DA client configuration"
);

let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
Expand All @@ -152,9 +180,14 @@ impl Job for DaJob {
})?;
tracing::debug!(job_id = ?job.id, blob_count = current_blob_length, "Converted data to blobs");

// there is a limit on number of blobs per txn, checking that here
// Check blob limit
if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
tracing::warn!(
job_id = ?job.id,
current_blob_length = current_blob_length,
max_blob_per_txn = max_blob_per_txn,
"Exceeded maximum number of blobs per transaction"
);
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
Expand All @@ -163,13 +196,26 @@ impl Job for DaJob {
})?
}

// making the txn to the DA layer
// Publish to DA layer
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to publish state diff to DA layer");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, external_id = ?external_id, "Successfully published state diff to DA layer.");
// Update DA metadata with successful results
if let JobSpecificMetadata::Da(metadata) = &mut job.metadata.specific {
metadata.tx_hash = Some(external_id.clone());
}

tracing::info!(
log_type = "completed",
category = "da",
function_type = "process_job",
job_id = ?job.id,
block_no = %internal_id,
external_id = ?external_id,
"Successfully published state diff to DA layer."
);
Ok(external_id)
}

Expand Down Expand Up @@ -344,14 +390,16 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc<Config>) -> Result<(), JobError> {
async fn store_blob_data(blob_data: Vec<BigUint>, blob_data_path: &str, config: Arc<Config>) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;

let blob_data_vec_u8 = biguint_vec_to_u8_vec(blob_data.as_slice());

if !blob_data_vec_u8.is_empty() {
storage_client.put_data(blob_data_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
storage_client
.put_data(blob_data_vec_u8.into(), blob_data_path)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
86 changes: 86 additions & 0 deletions crates/orchestrator/src/jobs/metadata.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also types to be precise. maybe we should add this to types.rs OR create a types folder with jobs.rs and metadata.rs?

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct CommonMetadata {
pub process_attempt_no: u64,
pub process_retry_attempt_no: u64,
pub verification_attempt_no: u64,
pub verification_retry_attempt_no: u64,
#[serde(with = "chrono::serde::ts_seconds_option")]
pub process_completed_at: Option<DateTime<Utc>>,
#[serde(with = "chrono::serde::ts_seconds_option")]
pub verification_completed_at: Option<DateTime<Utc>>,
pub failure_reason: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SnosMetadata {
// Required fields
pub block_number: u64,
pub full_output: bool,

// Optional fields populated during processing
pub cairo_pie_path: Option<String>,
pub snos_output_path: Option<String>,
pub program_output_path: Option<String>,
pub snos_fact: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct StateUpdateMetadata {
// Required fields
pub blocks_to_settle: Vec<u64>,
// Paths for data
pub snos_output_paths: Vec<String>,
pub program_output_paths: Vec<String>,
pub blob_data_paths: Vec<String>,

// State tracking
pub last_failed_block_no: Option<u64>,
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub tx_hashes: Vec<String>, // key: attempt_no, value: comma-separated tx hashes
pub tx_hashes: Vec<String>

}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ProvingMetadata {
// Required fields
pub block_number: u64,
pub cairo_pie_path: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's required it should not be optional right?
why does proving job care about block_number?
cross_verify can be renamed to ensure_on_chain_registration or something more understandable
let's delete verification_key_path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make an input field which is

enum Input {
Pie()...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove snos_fact and club it with ensure_on_chain_registration using Optional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comments to explain what are inputs in metadata and what is added by jobs on the fly


pub cross_verify: bool,
pub download_proof: bool,

pub snos_fact: String,

// Proof related fields
pub proof_path: Option<String>,
pub verification_key_path: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DaMetadata {
// Required fields
pub block_number: u64,

// DA specific fields
pub blob_data_path: Option<String>,
pub blob_commitment: Option<String>,
pub blob_proof: Option<String>,
pub tx_hash: Option<String>,
pub blob_versioned_hash: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep what's needed for now.

}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum JobSpecificMetadata {
Snos(SnosMetadata),
StateUpdate(StateUpdateMetadata),
Proving(ProvingMetadata),
Da(DaMetadata),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct JobMetadata {
pub common: CommonMetadata,
pub specific: JobSpecificMetadata,
}
Loading
Loading