Skip to content

Commit

Permalink
feat: enable Snapshotter to create archives in subdirectories
Browse files Browse the repository at this point in the history
Co-authored-by: Sébastien Fauvel <[email protected]>
  • Loading branch information
dlachaume and sfauvel committed Jan 6, 2025
1 parent cf84d27 commit 7c925ff
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ impl AncillaryArtifactBuilder {
self.compression_algorithm.tar_file_extension()
);

let ancillary_archive_path = Path::new("cardano-database")
.join("ancillary")
.join(&archive_name);

let snapshot = self
.snapshotter
.snapshot_subset(&archive_name, paths_to_include)
.snapshot_subset(&ancillary_archive_path, paths_to_include)
.with_context(|| {
format!(
"Failed to create ancillary archive for immutable file number: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Context;
use async_trait::async_trait;
use semver::Version;
use slog::{debug, warn, Logger};
use std::path::Path;
use std::sync::Arc;
use thiserror::Error;

Expand Down Expand Up @@ -74,7 +75,7 @@ impl CardanoImmutableFilesFullArtifactBuilder {
// spawn a separate thread to prevent blocking
let ongoing_snapshot =
tokio::task::spawn_blocking(move || -> StdResult<OngoingSnapshot> {
snapshotter.snapshot_all(&snapshot_name)
snapshotter.snapshot_all(Path::new(&snapshot_name))
})
.await??;

Expand Down
75 changes: 42 additions & 33 deletions mithril-aggregator/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ use crate::ZstandardCompressionParameters;

/// Define the ability to create snapshots.
pub trait Snapshotter: Sync + Send {
/// Create a new snapshot with the given archive name.
fn snapshot_all(&self, archive_name: &str) -> StdResult<OngoingSnapshot>;
/// Create a new snapshot with the given filepath.
fn snapshot_all(&self, filepath: &Path) -> StdResult<OngoingSnapshot>;

/// Create a new snapshot with the given archive name from a subset of directories and files.
fn snapshot_subset(
&self,
archive_name: &str,
files: Vec<PathBuf>,
) -> StdResult<OngoingSnapshot>;
/// Create a new snapshot with the given filepath from a subset of directories and files.
fn snapshot_subset(&self, filepath: &Path, files: Vec<PathBuf>) -> StdResult<OngoingSnapshot>;
}

/// Compression algorithm and parameters of the [CompressedArchiveSnapshotter].
Expand Down Expand Up @@ -161,17 +157,17 @@ pub enum SnapshotError {
}

impl Snapshotter for CompressedArchiveSnapshotter {
fn snapshot_all(&self, archive_name: &str) -> StdResult<OngoingSnapshot> {
fn snapshot_all(&self, filepath: &Path) -> StdResult<OngoingSnapshot> {
let appender = AppenderDirAll {
db_directory: self.db_directory.clone(),
};

self.snapshot(archive_name, appender)
self.snapshot(filepath, appender)
}

fn snapshot_subset(
&self,
archive_name: &str,
filepath: &Path,
entries: Vec<PathBuf>,
) -> StdResult<OngoingSnapshot> {
if entries.is_empty() {
Expand All @@ -183,7 +179,7 @@ impl Snapshotter for CompressedArchiveSnapshotter {
entries,
};

self.snapshot(archive_name, appender)
self.snapshot(filepath, appender)
}
}

Expand Down Expand Up @@ -222,12 +218,16 @@ impl CompressedArchiveSnapshotter {
})
}

fn snapshot<T: TarAppender>(
&self,
archive_name: &str,
appender: T,
) -> StdResult<OngoingSnapshot> {
let archive_path = self.ongoing_snapshot_directory.join(archive_name);
fn snapshot<T: TarAppender>(&self, filepath: &Path, appender: T) -> StdResult<OngoingSnapshot> {
let archive_path = self.ongoing_snapshot_directory.join(filepath);
if let Some(archive_dir) = archive_path.parent() {
fs::create_dir_all(archive_dir).with_context(|| {
format!(
"Can not create archive directory: '{}'",
archive_dir.display()
)
})?;
}
let filesize = self.create_and_verify_archive(&archive_path, appender).inspect_err(|_err| {
if archive_path.exists() {
if let Err(remove_error) = fs::remove_file(&archive_path) {
Expand Down Expand Up @@ -261,7 +261,11 @@ impl CompressedArchiveSnapshotter {
archive_path.display()
);

let tar_file = File::create(archive_path).map_err(SnapshotError::CreateArchiveError)?;
let tar_file = File::create(archive_path)
.map_err(SnapshotError::CreateArchiveError)
.with_context(|| {
format!("Error while creating the archive with path: {archive_path:?}")
})?;

match self.compression_algorithm {
SnapshotterCompressionAlgorithm::Gzip => {
Expand Down Expand Up @@ -466,13 +470,13 @@ impl Default for DumbSnapshotter {
}

impl Snapshotter for DumbSnapshotter {
fn snapshot_all(&self, archive_name: &str) -> StdResult<OngoingSnapshot> {
fn snapshot_all(&self, archive_name: &Path) -> StdResult<OngoingSnapshot> {
let mut value = self
.last_snapshot
.write()
.map_err(|e| SnapshotError::UploadFileError(e.to_string()))?;
let snapshot = OngoingSnapshot {
filepath: Path::new(archive_name).to_path_buf(),
filepath: archive_name.to_path_buf(),
filesize: 0,
};
*value = Some(snapshot.clone());
Expand All @@ -482,7 +486,7 @@ impl Snapshotter for DumbSnapshotter {

fn snapshot_subset(
&self,
archive_name: &str,
archive_name: &Path,
_files: Vec<PathBuf>,
) -> StdResult<OngoingSnapshot> {
self.snapshot_all(archive_name)
Expand Down Expand Up @@ -535,13 +539,15 @@ mod tests {
#[test]
fn test_dumb_snapshotter_snasphot_return_archive_name_with_size_0() {
let snapshotter = DumbSnapshotter::new();
let snapshot = snapshotter.snapshot_all("archive.tar.gz").unwrap();
let snapshot = snapshotter
.snapshot_all(Path::new("archive.tar.gz"))
.unwrap();

assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path());
assert_eq!(0, *snapshot.get_file_size());

let snapshot = snapshotter
.snapshot_subset("archive.tar.gz", vec![PathBuf::from("whatever")])
.snapshot_subset(Path::new("archive.tar.gz"), vec![PathBuf::from("whatever")])
.unwrap();
assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path());
assert_eq!(0, *snapshot.get_file_size());
Expand All @@ -556,7 +562,7 @@ mod tests {
.is_none());

let snapshot = snapshotter
.snapshot_all("whatever")
.snapshot_all(Path::new("whatever"))
.expect("Dumb snapshotter::snapshot should not fail.");
assert_eq!(
Some(snapshot),
Expand All @@ -566,7 +572,7 @@ mod tests {
);

let snapshot = snapshotter
.snapshot_subset("another_whatever", vec![PathBuf::from("subdir")])
.snapshot_subset(Path::new("another_whatever"), vec![PathBuf::from("subdir")])
.expect("Dumb snapshotter::snapshot should not fail.");
assert_eq!(
Some(snapshot),
Expand Down Expand Up @@ -641,7 +647,7 @@ mod tests {
File::create(pending_snapshot_directory.join("other-process.file")).unwrap();

let _ = snapshotter
.snapshot_all("whatever.tar.gz")
.snapshot_all(Path::new("whatever.tar.gz"))
.expect_err("Snapshotter::snapshot should fail if the db is empty.");
let remaining_files: Vec<String> = fs::read_dir(&pending_snapshot_directory)
.unwrap()
Expand Down Expand Up @@ -687,7 +693,7 @@ mod tests {
.expect("verify_archive should not fail");

snapshotter
.snapshot_all(pending_snapshot_archive_file)
.snapshot_all(Path::new(pending_snapshot_archive_file))
.expect("Snapshotter::snapshot should not fail.");
}

Expand Down Expand Up @@ -728,7 +734,7 @@ mod tests {
.expect("verify_archive should not fail");

snapshotter
.snapshot_all(pending_snapshot_archive_file)
.snapshot_all(Path::new(pending_snapshot_archive_file))
.expect("Snapshotter::snapshot should not fail.");
}

Expand All @@ -753,7 +759,7 @@ mod tests {

let snapshot = snapshotter
.snapshot_subset(
&random_archive_name(),
Path::new(&random_archive_name()),
vec![
directory_to_archive_path.clone(),
file_to_archive_path.clone(),
Expand Down Expand Up @@ -784,7 +790,10 @@ mod tests {
.unwrap();

snapshotter
.snapshot_subset(&random_archive_name(), vec![PathBuf::from("not_exist")])
.snapshot_subset(
Path::new(&random_archive_name()),
vec![PathBuf::from("not_exist")],
)
.expect_err("snapshot_subset should return error when file or directory not exist");
}

Expand All @@ -803,7 +812,7 @@ mod tests {
.unwrap();

snapshotter
.snapshot_subset(&random_archive_name(), vec![])
.snapshot_subset(Path::new(&random_archive_name()), vec![])
.expect_err("snapshot_subset should return error when entries is empty");
}

Expand All @@ -826,7 +835,7 @@ mod tests {

let snapshot = snapshotter
.snapshot_subset(
&random_archive_name(),
Path::new(&random_archive_name()),
vec![
directory_to_archive_path.clone(),
directory_to_archive_path.clone(),
Expand Down

0 comments on commit 7c925ff

Please sign in to comment.