Skip to content

Commit

Permalink
feat: command for merging snapshot files (#3462)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmih authored Sep 8, 2023
1 parent e0b5e87 commit 1bc3dce
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
`--output-path` already exists and a `--force` flag to suppress the prompt.
- [#3439](https://github.com/ChainSafe/forest/pull/3439): Add
`--consume-snapshot` option to `forest` command.
- [#3462](https://github.com/ChainSafe/forest/pull/3462): Add
`forest-tool archive merge` command.

### Changed

Expand Down
88 changes: 79 additions & 9 deletions src/tool/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::db::car::{AnyCar, RandomAccessFileReader};
use crate::ipld::{stream_graph, CidHashSet};
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain};
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY, EPOCH_DURATION_SECONDS};
use anyhow::{bail, Context as _};
use anyhow::{bail, Context as _, Result};
use chrono::NaiveDateTime;
use cid::Cid;
use clap::Subcommand;
use dialoguer::{theme::ColorfulTheme, Confirm};
use futures::TryStreamExt;
Expand All @@ -23,6 +24,7 @@ use itertools::Itertools;
use sha2::Sha256;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::{AsyncWriteExt, BufWriter};
use tracing::info;

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -65,10 +67,24 @@ pub enum ArchiveCommands {
#[arg(required = true)]
snapshot_files: Vec<PathBuf>,
},
/// Merge snapshot archives into a single file. The output snapshot refers
/// to the heaviest tipset in the input set.
Merge {
/// Snapshot input paths. Supports `.car`, `.car.zst`, and `.forest.car.zst`.
#[arg(required = true)]
snapshot_files: Vec<PathBuf>,
/// Snapshot output filename or directory. Defaults to
/// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
#[arg(short, long, default_value = ".", verbatim_doc_comment)]
output_path: PathBuf,
/// Overwrite output file without prompting.
#[arg(long, default_value_t = false)]
force: bool,
},
}

impl ArchiveCommands {
pub async fn run(self) -> anyhow::Result<()> {
pub async fn run(self) -> Result<()> {
match self {
Self::Info { snapshot } => {
println!(
Expand Down Expand Up @@ -103,6 +119,11 @@ impl ArchiveCommands {
Self::Checkpoints {
snapshot_files: snapshot,
} => print_checkpoints(snapshot),
Self::Merge {
snapshot_files,
output_path,
force,
} => merge_snapshots(snapshot_files, output_path, force).await,
}
}
}
Expand Down Expand Up @@ -130,17 +151,14 @@ impl std::fmt::Display for ArchiveInfo {
impl ArchiveInfo {
// Scan a CAR archive to identify which network it belongs to and how many
// tipsets/messages are available. Progress is rendered to stdout.
fn from_store(store: AnyCar<impl RandomAccessFileReader>) -> anyhow::Result<Self> {
fn from_store(store: AnyCar<impl RandomAccessFileReader>) -> Result<Self> {
Self::from_store_with(store, true)
}

// Scan a CAR archive to identify which network it belongs to and how many
// tipsets/messages are available. Progress is optionally rendered to
// stdout.
fn from_store_with(
store: AnyCar<impl RandomAccessFileReader>,
progress: bool,
) -> anyhow::Result<Self> {
fn from_store_with(store: AnyCar<impl RandomAccessFileReader>, progress: bool) -> Result<Self> {
let root = store.heaviest_tipset()?;
let root_epoch = root.epoch();

Expand Down Expand Up @@ -215,7 +233,7 @@ impl ArchiveInfo {

// Print a mapping of epochs to block headers in yaml format. This mapping can
// be used by Forest to quickly identify tipsets.
fn print_checkpoints(snapshot_files: Vec<PathBuf>) -> anyhow::Result<()> {
fn print_checkpoints(snapshot_files: Vec<PathBuf>) -> Result<()> {
let store = ManyCar::try_from(snapshot_files).context("couldn't read input CAR file")?;
let root = store.heaviest_tipset()?;

Expand Down Expand Up @@ -281,7 +299,7 @@ async fn do_export(
diff: Option<ChainEpoch>,
diff_depth: Option<ChainEpochDelta>,
force: bool,
) -> anyhow::Result<()> {
) -> Result<()> {
let ts = Arc::new(root);

let genesis = ts.genesis(&store)?;
Expand Down Expand Up @@ -362,6 +380,58 @@ async fn do_export(
Ok(())
}

// FIXME: Testing with diff snapshots can be significantly improved. Tracking
// issue: https://github.com/ChainSafe/forest/issues/3347
/// Merge a set of snapshots (diff snapshots or lite snapshots). The output
/// snapshot links to the heaviest tipset in the input set.
async fn merge_snapshots(
snapshot_files: Vec<PathBuf>,
output_path: PathBuf,
force: bool,
) -> Result<()> {
use crate::db::car::forest;

let store = ManyCar::try_from(snapshot_files)?;
let heaviest_tipset = store.heaviest_tipset()?;
let roots = Vec::<Cid>::from(&heaviest_tipset.key().cids);

if !force && output_path.exists() {
let have_permission = Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(format!(
"{} will be overwritten. Continue?",
output_path.to_string_lossy()
))
.default(false)
.interact()
// e.g not a tty (or some other error), so haven't got permission.
.unwrap_or(false);
if !have_permission {
return Ok(());
}
}

let mut writer = BufWriter::new(tokio::fs::File::create(&output_path).await.context(
format!(
"unable to create a snapshot - is the output path '{}' correct?",
output_path.to_str().unwrap_or_default()
),
)?);

// Stream all available blocks from heaviest_tipset to genesis.
let blocks = stream_graph(&store, heaviest_tipset.chain(&store), 0);

// Encode Ipld key-value pairs in zstd frames
let frames = forest::Encoder::compress_stream_default(blocks);

// Write zstd frames and include a skippable index
forest::Encoder::write(&mut writer, roots, frames).await?;

// Flush to ensure everything has been successfully written
writer.flush().await.context("failed to flush")?;

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 1bc3dce

Please sign in to comment.