diff --git a/CHANGELOG.md b/CHANGELOG.md index 78fe643b59..0f72dc8a88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ As a minor extension, we have adopted a slightly different versioning convention - Implement the artifact routes of the aggregator for the signed entity type `CardanoDatabase`. - Implement the immutable file digests route in the aggregator. - Implement the artifact ancillary builder in the aggregator. + - Implement the artifact immutable builder in the aggregator. + - Implement the artifact digest builder in the aggregator. - Crates versions: diff --git a/Cargo.lock b/Cargo.lock index 2b0d50b6f6..734b56aae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.6.12" +version = "0.6.13" dependencies = [ "anyhow", "async-trait", @@ -3600,6 +3600,7 @@ dependencies = [ "paste", "prometheus", "rayon", + "regex", "reqwest 0.12.12", "semver", "serde", @@ -3738,7 +3739,7 @@ dependencies = [ [[package]] name = "mithril-common" -version = "0.4.104" +version = "0.4.105" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index df2b0ddbfa..ad6c478af3 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.6.12" +version = "0.6.13" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } @@ -35,6 +35,7 @@ openssl-probe = { version = "0.1.5", optional = true } paste = "1.0.15" prometheus = "0.13.4" rayon = "1.10.0" +regex = "1.11.1" reqwest = { version = "0.12.12", features = ["json"] } semver = "1.0.24" serde = { version = "1.0.217", features = ["derive"] } diff --git a/mithril-aggregator/src/artifact_builder/cardano_database.rs b/mithril-aggregator/src/artifact_builder/cardano_database.rs index 9208b69769..970b375e37 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database.rs @@ -17,11 +17,15 @@ use mithril_common::{ use crate::artifact_builder::{AncillaryArtifactBuilder, ArtifactBuilder}; +use super::{DigestArtifactBuilder, ImmutableArtifactBuilder}; + pub struct CardanoDatabaseArtifactBuilder { db_directory: PathBuf, cardano_node_version: Version, compression_algorithm: CompressionAlgorithm, ancillary_builder: Arc, + immutable_builder: Arc, + digest_builder: Arc, } impl CardanoDatabaseArtifactBuilder { @@ -30,12 +34,16 @@ impl CardanoDatabaseArtifactBuilder { cardano_node_version: &Version, compression_algorithm: CompressionAlgorithm, ancillary_builder: Arc, + immutable_builder: Arc, + digest_builder: Arc, ) -> Self { Self { db_directory, cardano_node_version: cardano_node_version.clone(), compression_algorithm, ancillary_builder, + immutable_builder, + digest_builder, } } } @@ -62,11 +70,16 @@ impl ArtifactBuilder for CardanoDataba let total_db_size_uncompressed = compute_uncompressed_database_size(&self.db_directory)?; let ancillary_locations = self.ancillary_builder.upload(&beacon).await?; + let immutables_locations = self + .immutable_builder + .upload(beacon.immutable_file_number) + .await?; + let digest_locations = self.digest_builder.upload().await?; let locations = ArtifactsLocations { ancillary: ancillary_locations, - digests: vec![], - immutables: vec![], + digests: digest_locations, + immutables: immutables_locations, }; let cardano_database = CardanoDatabaseSnapshot::new( @@ -109,17 +122,24 @@ fn compute_uncompressed_database_size(path: &Path) -> StdResult { #[cfg(test)] mod tests { - use std::path::PathBuf; + use std::{collections::BTreeMap, path::PathBuf}; use mithril_common::{ digesters::DummyCardanoDbBuilder, - entities::{AncillaryLocation, ProtocolMessage, ProtocolMessagePartKey}, + entities::{ + AncillaryLocation, DigestLocation, ImmutablesLocation, MultiFilesUri, ProtocolMessage, + ProtocolMessagePartKey, TemplateUri, + }, test_utils::{fake_data, TempDir}, CardanoNetwork, }; + use reqwest::Url; use crate::{ - artifact_builder::MockAncillaryFileUploader, test_tools::TestLogger, DumbSnapshotter, + artifact_builder::{MockAncillaryFileUploader, MockImmutableFilesUploader}, + immutable_file_digest_mapper::MockImmutableFileDigestMapper, + test_tools::TestLogger, + DumbSnapshotter, }; use super::*; @@ -155,6 +175,7 @@ mod tests { async fn should_compute_valid_artifact() { let test_dir = get_test_directory("should_compute_valid_artifact"); + let beacon = fake_data::beacon(); let immutable_trio_file_size = 777; let ledger_file_size = 6666; let volatile_file_size = 99; @@ -168,29 +189,73 @@ mod tests { .build(); let expected_total_size = immutable_trio_file_size + ledger_file_size + volatile_file_size; - let mut ancillary_uploader = MockAncillaryFileUploader::new(); - ancillary_uploader.expect_upload().return_once(|_| { - Ok(AncillaryLocation::CloudStorage { - uri: "ancillary_uri".to_string(), - }) - }); + let ancillary_artifact_builder = { + let mut ancillary_uploader = MockAncillaryFileUploader::new(); + ancillary_uploader.expect_upload().return_once(|_| { + Ok(AncillaryLocation::CloudStorage { + uri: "ancillary_uri".to_string(), + }) + }); + + AncillaryArtifactBuilder::new( + vec![Arc::new(ancillary_uploader)], + Arc::new(DumbSnapshotter::new()), + CardanoNetwork::DevNet(123), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap() + }; + + let immutable_artifact_builder = { + let number_of_immutable_file_loaded = fake_data::beacon().immutable_file_number; + let mut immutable_uploader = MockImmutableFilesUploader::new(); + immutable_uploader + .expect_batch_upload() + .withf(move |paths| paths.len() == number_of_immutable_file_loaded as usize) + .return_once(|_| { + Ok(ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri( + "immutable_template_uri".to_string(), + )), + }) + }); + + ImmutableArtifactBuilder::new( + vec![Arc::new(immutable_uploader)], + Arc::new(DumbSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap() + }; + + let digest_artifact_builder = { + let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new(); + + immutable_file_digest_mapper + .expect_get_immutable_file_digest_map() + .returning(|| Ok(BTreeMap::new())); + + DigestArtifactBuilder::new( + Url::parse("http://aggregator_uri").unwrap(), + vec![], + test_dir.join("digests"), + Arc::new(immutable_file_digest_mapper), + TestLogger::stdout(), + ) + .unwrap() + }; + let cardano_database_artifact_builder = CardanoDatabaseArtifactBuilder::new( test_dir, &Version::parse("1.0.0").unwrap(), CompressionAlgorithm::Zstandard, - Arc::new( - AncillaryArtifactBuilder::new( - vec![Arc::new(ancillary_uploader)], - Arc::new(DumbSnapshotter::new()), - CardanoNetwork::DevNet(123), - CompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap(), - ), + Arc::new(ancillary_artifact_builder), + Arc::new(immutable_artifact_builder), + Arc::new(digest_artifact_builder), ); - let beacon = fake_data::beacon(); let certificate_with_merkle_root = { let mut protocol_message = ProtocolMessage::new(); protocol_message.set_message_part( @@ -211,14 +276,23 @@ mod tests { let expected_ancillary_locations = vec![AncillaryLocation::CloudStorage { uri: "ancillary_uri".to_string(), }]; + + let expected_immutables_locations = vec![ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri("immutable_template_uri".to_string())), + }]; + + let expected_digest_locations = vec![DigestLocation::Aggregator { + uri: "http://aggregator_uri/artifact/cardano-database/digests".to_string(), + }]; + let artifact_expected = CardanoDatabaseSnapshot::new( "merkleroot".to_string(), beacon, expected_total_size, ArtifactsLocations { ancillary: expected_ancillary_locations, - digests: vec![], - immutables: vec![], + digests: expected_digest_locations, + immutables: expected_immutables_locations, }, CompressionAlgorithm::Zstandard, &Version::parse("1.0.0").unwrap(), diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs index 15d36c6582..de63f322e0 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/ancillary.rs @@ -183,8 +183,9 @@ mod tests { use mithril_common::{ digesters::{DummyCardanoDbBuilder, IMMUTABLE_DIR, LEDGER_DIR, VOLATILE_DIR}, - test_utils::TempDir, + test_utils::{assert_equivalent, TempDir}, }; + use uuid::Uuid; use crate::{ test_tools::TestLogger, CompressedArchiveSnapshotter, DumbSnapshotter, @@ -309,11 +310,11 @@ mod tests { .await .unwrap(); - assert_eq!( + assert_equivalent( locations, vec![AncillaryLocation::CloudStorage { - uri: "an_uri".to_string() - }] + uri: "an_uri".to_string(), + }], ); } @@ -339,22 +340,22 @@ mod tests { .await .unwrap(); - assert_eq!( + assert_equivalent( locations, vec![ AncillaryLocation::CloudStorage { - uri: "an_uri".to_string() + uri: "an_uri".to_string(), }, AncillaryLocation::CloudStorage { - uri: "another_uri".to_string() - } - ] + uri: "another_uri".to_string(), + }, + ], ); } #[tokio::test] async fn create_archive_should_embed_ledger_volatile_directories_and_last_immutables() { - let test_dir = "cardano_database/create_archive"; + let test_dir = "create_archive/cardano_database"; let cardano_db = DummyCardanoDbBuilder::new(test_dir) .with_immutables(&[1, 2, 3]) .with_ledger_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"]) @@ -363,15 +364,14 @@ mod tests { std::fs::create_dir(cardano_db.get_dir().join("whatever")).unwrap(); let db_directory = cardano_db.get_dir().to_path_buf(); - let snapshotter = { - CompressedArchiveSnapshotter::new( - db_directory.clone(), - db_directory.parent().unwrap().join("snapshot_dest"), - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap() - }; + let mut snapshotter = CompressedArchiveSnapshotter::new( + db_directory.clone(), + db_directory.parent().unwrap().join("snapshot_dest"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + snapshotter.set_sub_temp_dir(Uuid::new_v4().to_string()); let builder = AncillaryArtifactBuilder::new( vec![Arc::new(MockAncillaryFileUploader::new())], @@ -422,15 +422,14 @@ mod tests { #[tokio::test] async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() { - let snapshotter = { - CompressedArchiveSnapshotter::new( - PathBuf::from("directory_not_existing"), - PathBuf::from("whatever"), - SnapshotterCompressionAlgorithm::Gzip, - TestLogger::stdout(), - ) - .unwrap() - }; + let mut snapshotter = CompressedArchiveSnapshotter::new( + PathBuf::from("directory_not_existing"), + PathBuf::from("whatever"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + snapshotter.set_sub_temp_dir(Uuid::new_v4().to_string()); let mut uploader = MockAncillaryFileUploader::new(); uploader.expect_upload().never(); diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/digest.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/digest.rs new file mode 100644 index 0000000000..d2c33e922c --- /dev/null +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/digest.rs @@ -0,0 +1,406 @@ +use std::{ + fs, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::Context; +use async_trait::async_trait; +use mithril_common::{ + entities::DigestLocation, logging::LoggerExtensions, + messages::CardanoDatabaseDigestListItemMessage, StdResult, +}; +use reqwest::Url; +use slog::{error, Logger}; + +use crate::ImmutableFileDigestMapper; + +/// The [DigestFileUploader] trait allows identifying uploaders that return locations for digest files. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait DigestFileUploader: Send + Sync { + /// Uploads the file at the given filepath and returns the location of the uploaded file. + async fn upload(&self, filepath: &Path) -> StdResult; +} + +pub struct DigestArtifactBuilder { + /// Aggregator URL prefix + aggregator_url_prefix: Url, + /// Uploaders + uploaders: Vec>, + + digests_dir: PathBuf, + + immutable_file_digest_mapper: Arc, + + logger: Logger, +} + +impl DigestArtifactBuilder { + /// Creates a new [DigestArtifactBuilder]. + pub fn new( + aggregator_url_prefix: Url, + uploaders: Vec>, + digests_dir: PathBuf, + immutable_file_digest_mapper: Arc, + logger: Logger, + ) -> StdResult { + Ok(Self { + aggregator_url_prefix, + uploaders, + digests_dir, + immutable_file_digest_mapper, + logger: logger.new_with_component_name::(), + }) + } + + pub async fn upload(&self) -> StdResult> { + let digest_path = self.create_digest_file().await?; + + let locations = self.upload_digest_file(&digest_path).await; + fs::remove_file(&digest_path).with_context(|| { + format!("Could not remove digest file: '{}'", digest_path.display()) + })?; + + locations + } + + async fn create_digest_file(&self) -> StdResult { + let immutable_file_digest_map = self + .immutable_file_digest_mapper + .get_immutable_file_digest_map() + .await? + .into_iter() + .map( + |(immutable_file_name, digest)| CardanoDatabaseDigestListItemMessage { + immutable_file_name, + digest, + }, + ) + .collect::>(); + + let digests_file_path = DigestArtifactBuilder::get_digests_file_path(&self.digests_dir); + + if let Some(digests_dir) = digests_file_path.parent() { + fs::create_dir_all(digests_dir).with_context(|| { + format!( + "Can not create digests directory: '{}'", + digests_dir.display() + ) + })?; + } + + let digest_file = fs::File::create(digests_file_path.clone())?; + serde_json::to_writer(digest_file, &immutable_file_digest_map)?; + + Ok(digests_file_path) + } + + /// Uploads the digest file and returns the locations of the uploaded files. + async fn upload_digest_file(&self, digest_filepath: &Path) -> StdResult> { + let mut locations = Vec::::new(); + for uploader in &self.uploaders { + let result = uploader.upload(digest_filepath).await; + match result { + Ok(location) => { + locations.push(location); + } + Err(e) => { + error!( + self.logger, + "Failed to upload digest file"; + "error" => e.to_string() + ); + } + } + } + + locations.push(self.aggregator_digests_route_location()?); + + Ok(locations) + } + + fn aggregator_digests_route_location(&self) -> StdResult { + Ok(DigestLocation::Aggregator { + uri: self + .aggregator_url_prefix + .join("artifact/cardano-database/digests")? + .to_string(), + }) + } + + fn get_digests_file_path>(digests_dir: P) -> PathBuf { + digests_dir.as_ref().join("digests.json") + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, fs::read_to_string}; + + use crate::{ + immutable_file_digest_mapper::MockImmutableFileDigestMapper, test_tools::TestLogger, + }; + use anyhow::anyhow; + use mithril_common::{ + messages::{CardanoDatabaseDigestListItemMessage, CardanoDatabaseDigestListMessage}, + test_utils::{assert_equivalent, TempDir}, + }; + + use super::*; + + fn fake_uploader_returning_error() -> MockDigestFileUploader { + let mut uploader = MockDigestFileUploader::new(); + uploader + .expect_upload() + .return_once(|_| Err(anyhow!("Failure while uploading..."))); + + uploader + } + + fn fake_uploader(location_uri: &str) -> MockDigestFileUploader { + let uri = location_uri.to_string(); + let mut uploader = MockDigestFileUploader::new(); + uploader + .expect_upload() + .times(1) + .return_once(|_| Ok(DigestLocation::CloudStorage { uri })); + + uploader + } + + #[tokio::test] + async fn digest_artifact_builder_return_digests_route_on_aggregator() { + let temp_dir = TempDir::create( + "digest", + "digest_artifact_builder_return_digests_route_on_aggregator", + ); + let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new(); + immutable_file_digest_mapper + .expect_get_immutable_file_digest_map() + .returning(|| Ok(BTreeMap::new())); + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + vec![], + temp_dir, + Arc::new(immutable_file_digest_mapper), + TestLogger::stdout(), + ) + .unwrap(); + + let locations = builder.upload().await.unwrap(); + assert_eq!( + vec!(DigestLocation::Aggregator { + uri: "https://aggregator/artifact/cardano-database/digests".to_string() + }), + locations + ); + } + + #[tokio::test] + async fn upload_digest_file_should_log_upload_errors() { + let log_path = TempDir::create("digest", "upload_digest_file_should_log_upload_errors") + .join("test.log"); + + let mut uploader = MockDigestFileUploader::new(); + uploader + .expect_upload() + .return_once(|_| Err(anyhow!("Failure while uploading..."))); + + { + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + vec![Arc::new(uploader)], + PathBuf::from("/tmp/whatever"), + Arc::new(MockImmutableFileDigestMapper::new()), + TestLogger::file(&log_path), + ) + .unwrap(); + + let _ = builder.upload_digest_file(Path::new("digest_file")).await; + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + assert!(logs.contains("Failure while uploading...")); + } + + #[tokio::test] + async fn upload_digest_file_should_not_error_even_if_no_location_returned_from_uploaders() { + let uploader = fake_uploader_returning_error(); + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + vec![Arc::new(uploader)], + PathBuf::from("/tmp/whatever"), + Arc::new(MockImmutableFileDigestMapper::new()), + TestLogger::stdout(), + ) + .unwrap(); + + let locations = builder + .upload_digest_file(Path::new("digest_file")) + .await + .unwrap(); + + assert!(!locations.is_empty()); + } + + #[tokio::test] + async fn upload_digest_file_should_return_location_even_with_uploaders_errors() { + let first_uploader = fake_uploader_returning_error(); + let second_uploader = fake_uploader("an_uri"); + let third_uploader = fake_uploader_returning_error(); + + let uploaders: Vec> = vec![ + Arc::new(first_uploader), + Arc::new(second_uploader), + Arc::new(third_uploader), + ]; + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + uploaders, + PathBuf::from("/tmp/whatever"), + Arc::new(MockImmutableFileDigestMapper::new()), + TestLogger::stdout(), + ) + .unwrap(); + + let locations = builder + .upload_digest_file(Path::new("digest_file")) + .await + .unwrap(); + + assert_equivalent( + locations, + vec![ + DigestLocation::CloudStorage { + uri: "an_uri".to_string(), + }, + DigestLocation::Aggregator { + uri: "https://aggregator/artifact/cardano-database/digests".to_string(), + }, + ], + ); + } + + #[tokio::test] + async fn upload_digest_file_should_return_all_uploaders_returned_locations() { + let first_uploader = fake_uploader("an_uri"); + let second_uploader = fake_uploader("another_uri"); + + let uploaders: Vec> = + vec![Arc::new(first_uploader), Arc::new(second_uploader)]; + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + uploaders, + PathBuf::from("/tmp/whatever"), + Arc::new(MockImmutableFileDigestMapper::new()), + TestLogger::stdout(), + ) + .unwrap(); + + let locations = builder + .upload_digest_file(Path::new("digest_file")) + .await + .unwrap(); + + assert_equivalent( + locations, + vec![ + DigestLocation::CloudStorage { + uri: "an_uri".to_string(), + }, + DigestLocation::CloudStorage { + uri: "another_uri".to_string(), + }, + DigestLocation::Aggregator { + uri: "https://aggregator/artifact/cardano-database/digests".to_string(), + }, + ], + ); + } + + #[tokio::test] + async fn create_digest_file_should_create_json_file_with_all_digests() { + let temp_dir = TempDir::create( + "digest", + "create_digest_file_should_create_json_file_with_all_digests", + ); + let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new(); + immutable_file_digest_mapper + .expect_get_immutable_file_digest_map() + .returning(|| { + Ok(BTreeMap::from([( + "06685.chunk".to_string(), + "0af556ab2620dd9363bf76963a231abe8948a500ea6be31b131d87907ab09b1e".to_string(), + )])) + }); + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + vec![], + temp_dir, + Arc::new(immutable_file_digest_mapper), + TestLogger::stdout(), + ) + .unwrap(); + + let digest_file = builder.create_digest_file().await.unwrap(); + + let file_content = read_to_string(digest_file).unwrap(); + let digest_content: CardanoDatabaseDigestListMessage = + serde_json::from_str(&file_content).unwrap(); + + assert_eq!( + digest_content, + vec![CardanoDatabaseDigestListItemMessage { + immutable_file_name: "06685.chunk".to_string(), + digest: "0af556ab2620dd9363bf76963a231abe8948a500ea6be31b131d87907ab09b1e" + .to_string(), + }] + ); + } + + #[tokio::test] + async fn upload_should_call_upload_with_created_digest_file_and_delete_the_file() { + let digests_dir = TempDir::create( + "digests", + "upload_should_call_upload_with_created_digest_file_and_delete_the_file", + ); + let mut immutable_file_digest_mapper = MockImmutableFileDigestMapper::new(); + immutable_file_digest_mapper + .expect_get_immutable_file_digest_map() + .returning(|| Ok(BTreeMap::new())); + + let mut digest_file_uploader = MockDigestFileUploader::new(); + + let digest_file = DigestArtifactBuilder::get_digests_file_path(&digests_dir); + + let digest_file_clone = digest_file.clone(); + digest_file_uploader + .expect_upload() + .withf(move |path| path == digest_file_clone && path.exists()) + .times(1) + .return_once(|_| { + Ok(DigestLocation::CloudStorage { + uri: "an_uri".to_string(), + }) + }); + + let builder = DigestArtifactBuilder::new( + Url::parse("https://aggregator/").unwrap(), + vec![Arc::new(digest_file_uploader)], + digests_dir, + Arc::new(immutable_file_digest_mapper), + TestLogger::stdout(), + ) + .unwrap(); + + let _locations = builder.upload().await.unwrap(); + + assert!(!digest_file.exists()); + } +} diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs new file mode 100644 index 0000000000..8cb3935132 --- /dev/null +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs @@ -0,0 +1,809 @@ +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::anyhow; +use async_trait::async_trait; +use regex::Regex; +use slog::{error, Logger}; + +use mithril_common::{ + digesters::IMMUTABLE_DIR, + entities::{CompressionAlgorithm, ImmutableFileNumber, ImmutablesLocation, MultiFilesUri}, + logging::LoggerExtensions, + StdResult, +}; + +use crate::{file_uploaders::LocalUploader, FileUploader, Snapshotter}; + +fn immmutable_file_number_extractor(file_uri: &str) -> StdResult> { + let regex = Regex::new(r".*(\d{5})")?; + + Ok(regex + .captures(file_uri) + .and_then(|mat| mat.get(1)) + .map(|immutable_match| { + let mut template = file_uri.to_string(); + template.replace_range(immutable_match.range(), "{immutable_file_number}"); + + template + })) +} + +/// The [ImmutableFilesUploader] trait allows identifying uploaders that return locations for immutable files archive. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait ImmutableFilesUploader: Send + Sync { + /// Uploads the archives at the given filepaths and returns the location of the uploaded file. + async fn batch_upload(&self, filepaths: &[PathBuf]) -> StdResult; +} + +#[async_trait] +impl ImmutableFilesUploader for LocalUploader { + async fn batch_upload(&self, filepaths: &[PathBuf]) -> StdResult { + let mut file_uris = Vec::new(); + for filepath in filepaths { + file_uris.push(self.upload(filepath).await?.into()); + } + + let template_uri = + MultiFilesUri::extract_template_from_uris(file_uris, immmutable_file_number_extractor)? + .ok_or_else(|| anyhow!("No matching template found in the uploaded files"))?; + + Ok(ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(template_uri), + }) + } +} + +pub struct ImmutableArtifactBuilder { + uploaders: Vec>, + snapshotter: Arc, + compression_algorithm: CompressionAlgorithm, + logger: Logger, +} + +impl ImmutableArtifactBuilder { + pub fn new( + uploaders: Vec>, + snapshotter: Arc, + compression_algorithm: CompressionAlgorithm, + logger: Logger, + ) -> StdResult { + if uploaders.is_empty() { + return Err(anyhow!( + "At least one uploader is required to create an 'ImmutableArtifactBuilder'" + )); + } + + Ok(Self { + uploaders, + snapshotter, + compression_algorithm, + logger: logger.new_with_component_name::(), + }) + } + + pub async fn upload( + &self, + up_to_immutable_file_number: ImmutableFileNumber, + ) -> StdResult> { + let archives_paths = + self.immutable_archives_paths_creating_the_missing_ones(up_to_immutable_file_number)?; + let locations = self.upload_immutable_archives(&archives_paths).await?; + + Ok(locations) + } + + pub fn immutable_archives_paths_creating_the_missing_ones( + &self, + up_to_immutable_file_number: ImmutableFileNumber, + ) -> StdResult> { + fn immutable_trio_names(immutable_file_number: ImmutableFileNumber) -> Vec { + vec![ + format!("{:05}.chunk", immutable_file_number), + format!("{:05}.primary", immutable_file_number), + format!("{:05}.secondary", immutable_file_number), + ] + } + + let immutable_archive_dir_path = Path::new("cardano-database").join("immutable"); + + let mut archive_paths = vec![]; + for immutable_file_number in 1..=up_to_immutable_file_number { + let files_to_archive = immutable_trio_names(immutable_file_number) + .iter() + .map(|filename| PathBuf::from(IMMUTABLE_DIR).join(filename)) + .collect(); + + let archive_name = format!( + "{:05}.{}", + immutable_file_number, + self.compression_algorithm.tar_file_extension() + ); + + let immutable_archive_file_path = immutable_archive_dir_path.join(archive_name); + if !self + .snapshotter + .does_snapshot_exist(&immutable_archive_file_path) + { + self.snapshotter + .snapshot_subset(&immutable_archive_file_path, files_to_archive)?; + } + archive_paths.push(self.snapshotter.get_file_path(&immutable_archive_file_path)); + } + + Ok(archive_paths) + } + + async fn upload_immutable_archives( + &self, + archive_paths: &[PathBuf], + ) -> StdResult> { + let mut locations = Vec::new(); + for uploader in &self.uploaders { + let result = uploader.batch_upload(archive_paths).await; + match result { + Ok(location) => { + locations.push(location); + } + Err(e) => { + error!( + self.logger, + "Failed to upload immutable archive"; + "error" => e.to_string() + ); + } + } + } + + if locations.is_empty() { + return Err(anyhow!( + "Failed to upload immutable archive with all uploaders" + )); + } + + Ok(locations) + } +} + +#[cfg(test)] +mod tests { + use mithril_common::{ + digesters::DummyCardanoDbBuilder, + entities::TemplateUri, + test_utils::{assert_equivalent, equivalent_to}, + }; + use mockall::predicate::{always, eq}; + use uuid::Uuid; + + use crate::{ + snapshotter::{MockSnapshotter, OngoingSnapshot}, + test_tools::TestLogger, + CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotterCompressionAlgorithm, + }; + + use super::*; + + fn fake_uploader(archive_paths: Vec<&str>, location_uri: &str) -> MockImmutableFilesUploader { + let uri = location_uri.to_string(); + let archive_paths: Vec<_> = archive_paths.into_iter().map(String::from).collect(); + + let mut uploader = MockImmutableFilesUploader::new(); + uploader + .expect_batch_upload() + .withf(move |p| { + let paths: Vec<_> = p.iter().map(|s| s.to_string_lossy().into_owned()).collect(); + + equivalent_to(paths, archive_paths.clone()) + }) + .times(1) + .return_once(|_| { + Ok(ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri(uri)), + }) + }); + + uploader + } + + fn fake_uploader_returning_error() -> MockImmutableFilesUploader { + let mut uploader = MockImmutableFilesUploader::new(); + uploader + .expect_batch_upload() + .return_once(|_| Err(anyhow!("Failure while uploading..."))); + + uploader + } + + fn dummy_ongoing_snapshot() -> OngoingSnapshot { + OngoingSnapshot::new(PathBuf::from("/whatever"), 0) + } + + #[tokio::test] + async fn upload_call_archive_creation_and_upload_to_retrieve_locations() { + let snapshotter = { + let mut snapshotter = MockSnapshotter::new(); + snapshotter + .expect_does_snapshot_exist() + .returning(|_| false); + snapshotter + .expect_get_file_path() + .returning(move |f| Path::new("/destination").join(f)); + + snapshotter + .expect_snapshot_subset() + .times(1) + .with( + eq(Path::new("cardano-database/immutable/00001.tar.gz")), + always(), + ) + .returning(|_, _| Ok(dummy_ongoing_snapshot())); + + snapshotter + .expect_snapshot_subset() + .times(1) + .with( + eq(Path::new("cardano-database/immutable/00002.tar.gz")), + always(), + ) + .returning(|_, _| Ok(dummy_ongoing_snapshot())); + snapshotter + }; + + let uploader = fake_uploader( + vec![ + "/destination/cardano-database/immutable/00001.tar.gz", + "/destination/cardano-database/immutable/00002.tar.gz", + ], + "archive.tar.gz", + ); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(uploader)], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder.upload(2).await.unwrap(); + + assert_equivalent( + archive_paths, + vec![ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri("archive.tar.gz".to_string())), + }], + ) + } + + mod create_archive { + + use super::*; + + #[test] + fn snapshot_immutables_files_up_to_the_given_immutable_file_number() { + let mut snapshotter = MockSnapshotter::new(); + snapshotter + .expect_get_file_path() + .returning(move |f| Path::new("/tmp").join(f)); + + snapshotter + .expect_does_snapshot_exist() + .returning(|_| false); + + snapshotter + .expect_snapshot_subset() + .times(1) + .with( + eq(Path::new("cardano-database/immutable/00001.tar.gz")), + eq(vec![ + PathBuf::from(IMMUTABLE_DIR).join("00001.chunk"), + PathBuf::from(IMMUTABLE_DIR).join("00001.primary"), + PathBuf::from(IMMUTABLE_DIR).join("00001.secondary"), + ]), + ) + .returning(|_, _| Ok(dummy_ongoing_snapshot())); + + snapshotter + .expect_snapshot_subset() + .times(1) + .with( + eq(Path::new("cardano-database/immutable/00002.tar.gz")), + eq(vec![ + PathBuf::from(IMMUTABLE_DIR).join("00002.chunk"), + PathBuf::from(IMMUTABLE_DIR).join("00002.primary"), + PathBuf::from(IMMUTABLE_DIR).join("00002.secondary"), + ]), + ) + .returning(|_, _| Ok(dummy_ongoing_snapshot())); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder + .immutable_archives_paths_creating_the_missing_ones(2) + .unwrap(); + + assert_equivalent( + archive_paths, + vec![ + PathBuf::from("/tmp/cardano-database/immutable/00001.tar.gz"), + PathBuf::from("/tmp/cardano-database/immutable/00002.tar.gz"), + ], + ) + } + + #[test] + fn return_error_when_one_of_the_three_immutable_files_is_missing() { + let test_dir = + "error_when_one_of_the_three_immutable_files_is_missing/cardano_database"; + let cardano_db = DummyCardanoDbBuilder::new(test_dir) + .with_immutables(&[1, 2]) + .build(); + + let file_to_remove = cardano_db.get_immutable_dir().join("00002.chunk"); + std::fs::remove_file(file_to_remove).unwrap(); + + let db_directory = cardano_db.get_dir().to_path_buf(); + let mut snapshotter = CompressedArchiveSnapshotter::new( + db_directory.clone(), + db_directory.parent().unwrap().join("snapshot_dest"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + snapshotter.set_sub_temp_dir(Uuid::new_v4().to_string()); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + builder + .immutable_archives_paths_creating_the_missing_ones(2) + .expect_err( + "Should return an error when one of the three immutable files is missing", + ); + } + + #[test] + fn return_error_when_an_immutable_file_trio_is_missing() { + let test_dir = "error_when_an_immutable_file_trio_is_missing/cardano_database"; + let cardano_db = DummyCardanoDbBuilder::new(test_dir) + .with_immutables(&[1, 3]) + .build(); + + let db_directory = cardano_db.get_dir().to_path_buf(); + let mut snapshotter = CompressedArchiveSnapshotter::new( + db_directory.clone(), + db_directory.parent().unwrap().join("snapshot_dest"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + snapshotter.set_sub_temp_dir(Uuid::new_v4().to_string()); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + builder + .immutable_archives_paths_creating_the_missing_ones(3) + .expect_err("Should return an error when an immutable file trio is missing"); + } + + #[test] + fn return_error_when_immutable_file_number_is_not_produced_yet() { + let test_dir = "error_when_up_to_immutable_file_number_is_missing/cardano_database"; + let cardano_db = DummyCardanoDbBuilder::new(test_dir) + .with_immutables(&[1, 2]) + .build(); + + let db_directory = cardano_db.get_dir().to_path_buf(); + let mut snapshotter = CompressedArchiveSnapshotter::new( + db_directory.clone(), + db_directory.parent().unwrap().join("snapshot_dest"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + snapshotter.set_sub_temp_dir(Uuid::new_v4().to_string()); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + builder + .immutable_archives_paths_creating_the_missing_ones(3) + .expect_err("Should return an error when an immutable file trio is missing"); + } + + #[test] + fn return_all_archives_but_not_rebuild_archives_already_compressed() { + let mut snapshotter = MockSnapshotter::new(); + snapshotter + .expect_get_file_path() + .returning(move |f| Path::new("/tmp").join(f)); + + snapshotter + .expect_does_snapshot_exist() + .times(1) + .with(eq(Path::new("cardano-database/immutable/00001.tar.gz"))) + .returning(|_| true); + + snapshotter + .expect_does_snapshot_exist() + .times(1) + .with(eq(Path::new("cardano-database/immutable/00002.tar.gz"))) + .returning(|_| true); + + snapshotter + .expect_does_snapshot_exist() + .times(1) + .with(eq(Path::new("cardano-database/immutable/00003.tar.gz"))) + .returning(|_| false); + snapshotter + .expect_snapshot_subset() + .times(1) + .with( + eq(Path::new("cardano-database/immutable/00003.tar.gz")), + eq(vec![ + PathBuf::from(IMMUTABLE_DIR).join("00003.chunk"), + PathBuf::from(IMMUTABLE_DIR).join("00003.primary"), + PathBuf::from(IMMUTABLE_DIR).join("00003.secondary"), + ]), + ) + .returning(|_, _| Ok(dummy_ongoing_snapshot())); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder + .immutable_archives_paths_creating_the_missing_ones(3) + .unwrap(); + + assert_equivalent( + archive_paths, + vec![ + PathBuf::from("/tmp/cardano-database/immutable/00001.tar.gz"), + PathBuf::from("/tmp/cardano-database/immutable/00002.tar.gz"), + PathBuf::from("/tmp/cardano-database/immutable/00003.tar.gz"), + ], + ) + } + + #[test] + fn return_all_archives_paths_even_if_all_archives_already_exist() { + let mut snapshotter = MockSnapshotter::new(); + snapshotter + .expect_get_file_path() + .returning(move |f| Path::new("/tmp").join(f)); + snapshotter.expect_does_snapshot_exist().returning(|_| true); + snapshotter.expect_snapshot_subset().never(); + + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(MockImmutableFilesUploader::new())], + Arc::new(snapshotter), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder + .immutable_archives_paths_creating_the_missing_ones(3) + .unwrap(); + + assert_equivalent( + archive_paths, + vec![ + PathBuf::from("/tmp/cardano-database/immutable/00001.tar.gz"), + PathBuf::from("/tmp/cardano-database/immutable/00002.tar.gz"), + PathBuf::from("/tmp/cardano-database/immutable/00003.tar.gz"), + ], + ) + } + } + + mod upload { + use mithril_common::test_utils::TempDir; + + use super::MockImmutableFilesUploader; + + use super::*; + + #[test] + fn create_immutable_builder_should_error_when_no_uploader() { + let result = ImmutableArtifactBuilder::new( + vec![], + Arc::new(DumbSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ); + + assert!(result.is_err(), "Should return an error when no uploaders") + } + + #[tokio::test] + async fn upload_immutable_archives_should_log_upload_errors() { + let log_path = TempDir::create( + "immutable", + "upload_immutable_archives_should_log_upload_errors", + ) + .join("test.log"); + + let mut uploader = MockImmutableFilesUploader::new(); + uploader + .expect_batch_upload() + .return_once(|_| Err(anyhow!("Failure while uploading..."))); + + { + let builder = ImmutableArtifactBuilder::new( + vec![Arc::new(uploader)], + Arc::new(MockSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::file(&log_path), + ) + .unwrap(); + + let _ = builder + .upload_immutable_archives(&[ + PathBuf::from("01.tar.gz"), + PathBuf::from("02.tar.gz"), + ]) + .await; + } + + let logs = std::fs::read_to_string(&log_path).unwrap(); + assert!(logs.contains("Failure while uploading...")); + } + + #[tokio::test] + async fn upload_immutable_archives_should_error_when_no_location_is_returned() { + let uploaders: Vec> = + vec![Arc::new(fake_uploader_returning_error())]; + + let builder = ImmutableArtifactBuilder::new( + uploaders, + Arc::new(MockSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let result = builder + .upload_immutable_archives(&[ + PathBuf::from("01.tar.gz"), + PathBuf::from("02.tar.gz"), + ]) + .await; + + assert!( + result.is_err(), + "Should return an error when no location is returned" + ); + } + + #[tokio::test] + async fn upload_immutable_archives_should_return_location_even_with_uploaders_errors() { + let uploaders: Vec> = vec![ + Arc::new(fake_uploader_returning_error()), + Arc::new(fake_uploader( + vec!["01.tar.gz", "02.tar.gz"], + "archive_2.tar.gz", + )), + Arc::new(fake_uploader_returning_error()), + ]; + + let builder = ImmutableArtifactBuilder::new( + uploaders, + Arc::new(MockSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder + .upload_immutable_archives(&[ + PathBuf::from("01.tar.gz"), + PathBuf::from("02.tar.gz"), + ]) + .await + .unwrap(); + + assert_equivalent( + archive_paths, + vec![ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri("archive_2.tar.gz".to_string())), + }], + ) + } + + #[tokio::test] + async fn upload_immutable_archives_should_return_all_uploaders_returned_locations() { + let uploaders: Vec> = vec![ + Arc::new(fake_uploader( + vec!["01.tar.gz", "02.tar.gz"], + "archive_1.tar.gz", + )), + Arc::new(fake_uploader( + vec!["01.tar.gz", "02.tar.gz"], + "archive_2.tar.gz", + )), + ]; + + let builder = ImmutableArtifactBuilder::new( + uploaders, + Arc::new(MockSnapshotter::new()), + CompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let archive_paths = builder + .upload_immutable_archives(&[ + PathBuf::from("01.tar.gz"), + PathBuf::from("02.tar.gz"), + ]) + .await + .unwrap(); + + assert_equivalent( + archive_paths, + vec![ + ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri("archive_1.tar.gz".to_string())), + }, + ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri("archive_2.tar.gz".to_string())), + }, + ], + ) + } + } + + mod batch_upload { + use std::fs::File; + use std::io::Write; + + use mithril_common::test_utils::TempDir; + use reqwest::Url; + + use super::*; + + fn create_fake_archive(dir: &Path, name: &str) -> PathBuf { + let file_path = dir.join(name); + let mut file = File::create(&file_path).unwrap(); + writeln!( + file, + "I swear, this is an archive, not a temporary test file." + ) + .unwrap(); + + file_path + } + + #[tokio::test] + async fn extract_archive_name_to_deduce_template_location() { + let source_dir = TempDir::create( + "immutable", + "extract_archive_name_to_deduce_template_location_source", + ); + let target_dir = TempDir::create( + "immutable", + "extract_archive_name_to_deduce_template_location_target", + ); + + let archive_1 = create_fake_archive(&source_dir, "00001.tar.gz"); + let archive_2 = create_fake_archive(&source_dir, "00002.tar.gz"); + + let url_prefix = Url::parse("http://test.com:8080/base-root").unwrap(); + let uploader = + LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); + let location = ImmutableFilesUploader::batch_upload( + &uploader, + &[archive_1.clone(), archive_2.clone()], + ) + .await + .expect("local upload should not fail"); + + assert!(target_dir.join(archive_1.file_name().unwrap()).exists()); + assert!(target_dir.join(archive_2.file_name().unwrap()).exists()); + + let expected_location = ImmutablesLocation::CloudStorage { + uri: MultiFilesUri::Template(TemplateUri( + "http://test.com:8080/base-root/artifact/snapshot/{immutable_file_number}.tar.gz" + .to_string(), + ))}; + assert_eq!(expected_location, location); + } + + #[tokio::test] + async fn returns_error_when_uploaded_filename_not_templatable_without_5_digits() { + let source_dir = TempDir::create( + "immutable", + "returns_error_when_uploaded_filename_not_templatable", + ); + let target_dir = TempDir::create( + "immutable", + "returns_error_when_uploaded_filename_not_templatable", + ); + + let archive = create_fake_archive(&source_dir, "not-templatable.tar.gz"); + + let url_prefix = Url::parse("http://test.com:8080/base-root").unwrap(); + let uploader = + LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); + + ImmutableFilesUploader::batch_upload(&uploader, &[archive]) + .await + .expect_err("Should return an error when not template found"); + } + } + + mod immutable_file_number_extractor { + use super::*; + + #[test] + fn returns_none_when_not_templatable_without_5_digits() { + let template = immmutable_file_number_extractor("not-templatable.tar.gz").unwrap(); + + assert!(template.is_none()); + } + + #[test] + fn returns_template() { + let template = + immmutable_file_number_extractor("http://whatever/00001.tar.gz").unwrap(); + + assert_eq!( + template, + Some("http://whatever/{immutable_file_number}.tar.gz".to_string()) + ); + } + + #[test] + fn replaces_last_occurence_of_5_digits() { + let template = + immmutable_file_number_extractor("http://00001/whatever/00001.tar.gz").unwrap(); + + assert_eq!( + template, + Some("http://00001/whatever/{immutable_file_number}.tar.gz".to_string()) + ); + } + + #[test] + fn replaces_last_occurence_when_more_than_5_digits() { + let template = + immmutable_file_number_extractor("http://whatever/123456789.tar.gz").unwrap(); + + assert_eq!( + template, + Some("http://whatever/1234{immutable_file_number}.tar.gz".to_string()) + ); + } + } +} diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/mod.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/mod.rs index 0537f92220..4210d9cbee 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/mod.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/mod.rs @@ -1,4 +1,8 @@ //! The module is responsible for creating and uploading the archives of the Cardano database artifacts. mod ancillary; +mod digest; +mod immutable; pub use ancillary::*; +pub use digest::*; +pub use immutable::*; diff --git a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs index cbfa9430ff..91a068138b 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs @@ -1,12 +1,13 @@ use anyhow::Context; use async_trait::async_trait; +use mithril_common::entities::FileUri; use semver::Version; use slog::{debug, warn, Logger}; use std::path::Path; use std::sync::Arc; use thiserror::Error; -use crate::{file_uploaders::FileUri, snapshotter::OngoingSnapshot, FileUploader, Snapshotter}; +use crate::{snapshotter::OngoingSnapshot, FileUploader, Snapshotter}; use super::ArtifactBuilder; use mithril_common::logging::LoggerExtensions; diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 7aa1036ccd..e0b1f1ecf2 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -55,7 +55,8 @@ use crate::{ artifact_builder::{ AncillaryArtifactBuilder, CardanoDatabaseArtifactBuilder, CardanoImmutableFilesFullArtifactBuilder, CardanoStakeDistributionArtifactBuilder, - CardanoTransactionsArtifactBuilder, MithrilStakeDistributionArtifactBuilder, + CardanoTransactionsArtifactBuilder, DigestArtifactBuilder, ImmutableArtifactBuilder, + MithrilStakeDistributionArtifactBuilder, }, configuration::ExecutionEnvironment, database::repository::{ @@ -1229,6 +1230,7 @@ impl DependenciesBuilder { logger: &Logger, cardano_node_version: Version, snapshotter: Arc, + immutable_file_digest_mapper: Arc, ) -> Result { let artifacts_dir = Path::new("cardano-database").join("ancillary"); let snapshot_dir = self @@ -1246,17 +1248,37 @@ impl DependenciesBuilder { LocalUploader::new(self.get_server_url_prefix()?, &snapshot_dir, logger.clone())?; let ancillary_builder = Arc::new(AncillaryArtifactBuilder::new( vec![Arc::new(local_uploader)], - snapshotter, + snapshotter.clone(), self.configuration.get_network()?, self.configuration.snapshot_compression_algorithm, logger.clone(), )?); + let local_uploader = + LocalUploader::new(self.get_server_url_prefix()?, &snapshot_dir, logger.clone())?; + let immutable_builder = Arc::new(ImmutableArtifactBuilder::new( + vec![Arc::new(local_uploader)], + snapshotter, + self.configuration.snapshot_compression_algorithm, + logger.clone(), + )?); + + let digests_dir = Path::new("cardano-database").join("digests"); + let digest_builder = Arc::new(DigestArtifactBuilder::new( + self.get_server_url_prefix()?, + vec![], + self.configuration.get_snapshot_dir()?.join(digests_dir), + immutable_file_digest_mapper, + logger.clone(), + )?); + Ok(CardanoDatabaseArtifactBuilder::new( self.configuration.db_directory.clone(), &cardano_node_version, self.configuration.snapshot_compression_algorithm, ancillary_builder, + immutable_builder, + digest_builder, )) } @@ -1287,11 +1309,13 @@ impl DependenciesBuilder { let stake_store = self.get_stake_store().await?; let cardano_stake_distribution_artifact_builder = Arc::new(CardanoStakeDistributionArtifactBuilder::new(stake_store)); + let immutable_file_digest_mapper = self.get_immutable_file_digest_mapper().await?; let cardano_database_artifact_builder = Arc::new(self.create_cardano_database_artifact_builder( &logger, cardano_node_version, snapshotter, + immutable_file_digest_mapper, )?); let dependencies = SignedEntityServiceArtifactsDependencies::new( mithril_stake_distribution_artifact_builder, @@ -1807,7 +1831,9 @@ impl DependenciesBuilder { mod tests { use mithril_common::{entities::SignedEntityTypeDiscriminants, test_utils::TempDir}; - use crate::test_tools::TestLogger; + use crate::{ + immutable_file_digest_mapper::MockImmutableFileDigestMapper, test_tools::TestLogger, + }; use super::*; @@ -1870,11 +1896,14 @@ mod tests { assert!(!ancillary_dir.exists()); + let immutable_file_digest_mapper = MockImmutableFileDigestMapper::new(); + dep_builder .create_cardano_database_artifact_builder( &TestLogger::stdout(), Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), + Arc::new(immutable_file_digest_mapper), ) .unwrap(); diff --git a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs index 0ad7fef61b..08e6ded8d0 100644 --- a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs @@ -1,9 +1,9 @@ use anyhow::anyhow; use async_trait::async_trait; -use mithril_common::StdResult; +use mithril_common::{entities::FileUri, StdResult}; use std::{path::Path, sync::RwLock}; -use crate::file_uploaders::{FileUploader, FileUri}; +use crate::file_uploaders::FileUploader; /// Dummy uploader for test purposes. /// diff --git a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs index a7f2824b2d..92fb0bc051 100644 --- a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs @@ -8,9 +8,9 @@ use slog::{info, Logger}; use std::{env, path::Path}; use tokio_util::codec::{BytesCodec, FramedRead}; -use mithril_common::{logging::LoggerExtensions, StdResult}; +use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult}; -use crate::{file_uploaders::FileUri, FileUploader}; +use crate::FileUploader; /// GcpUploader represents a Google Cloud Platform file uploader interactor pub struct GcpUploader { diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs index a46334aa5b..eda9e0c7ea 100644 --- a/mithril-aggregator/src/file_uploaders/interface.rs +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -1,17 +1,7 @@ use async_trait::async_trait; -use mithril_common::StdResult; +use mithril_common::{entities::FileUri, StdResult}; use std::path::Path; -/// FileUri represents a file URI used to identify the file's location -#[derive(Debug, PartialEq, Clone)] -pub struct FileUri(pub String); - -impl From for String { - fn from(file_uri: FileUri) -> Self { - file_uri.0 - } -} - /// FileUploader represents a file uploader interactor #[cfg_attr(test, mockall::automock)] #[async_trait] diff --git a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs index b53928b526..960891bb77 100644 --- a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs @@ -1,5 +1,6 @@ use anyhow::Context; use async_trait::async_trait; +use mithril_common::entities::FileUri; use reqwest::Url; use slog::{debug, Logger}; use std::path::{Path, PathBuf}; @@ -7,7 +8,7 @@ use std::path::{Path, PathBuf}; use mithril_common::logging::LoggerExtensions; use mithril_common::StdResult; -use crate::file_uploaders::{url_sanitizer::sanitize_url_path, FileUploader, FileUri}; +use crate::file_uploaders::{url_sanitizer::sanitize_url_path, FileUploader}; use crate::tools; // It's only used by the legacy snapshot that uploads the entire Cardano database. @@ -70,7 +71,7 @@ mod tests { use std::path::{Path, PathBuf}; use tempfile::tempdir; - use crate::file_uploaders::{FileUploader, FileUri}; + use crate::file_uploaders::FileUploader; use crate::test_tools::TestLogger; use super::*; diff --git a/mithril-aggregator/src/file_uploaders/local_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs index 21e2a1157b..8f7609f5b3 100644 --- a/mithril-aggregator/src/file_uploaders/local_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -4,10 +4,10 @@ use reqwest::Url; use slog::{debug, Logger}; use std::path::{Path, PathBuf}; -use mithril_common::logging::LoggerExtensions; use mithril_common::StdResult; +use mithril_common::{entities::FileUri, logging::LoggerExtensions}; -use crate::file_uploaders::{url_sanitizer::sanitize_url_path, FileUploader, FileUri}; +use crate::file_uploaders::{url_sanitizer::sanitize_url_path, FileUploader}; /// LocalUploader is a file uploader working using local files pub struct LocalUploader { @@ -102,8 +102,7 @@ mod tests { let url_prefix = Url::parse("http://test.com:8080/base-root").unwrap(); let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()).unwrap(); - let location = uploader - .upload(&archive) + let location = FileUploader::upload(&uploader, &archive) .await .expect("local upload should not fail"); @@ -128,7 +127,7 @@ mod tests { TestLogger::stdout(), ) .unwrap(); - uploader.upload(&archive).await.unwrap(); + FileUploader::upload(&uploader, &archive).await.unwrap(); assert!(target_dir.join(archive.file_name().unwrap()).exists()); } @@ -149,8 +148,7 @@ mod tests { TestLogger::stdout(), ) .unwrap(); - uploader - .upload(&source_dir) + FileUploader::upload(&uploader, &source_dir) .await .expect_err("Uploading a directory should fail"); } diff --git a/mithril-aggregator/src/file_uploaders/mod.rs b/mithril-aggregator/src/file_uploaders/mod.rs index 428d385136..f98c412d41 100644 --- a/mithril-aggregator/src/file_uploaders/mod.rs +++ b/mithril-aggregator/src/file_uploaders/mod.rs @@ -8,7 +8,6 @@ pub mod url_sanitizer; pub use dumb_uploader::*; pub use gcp_uploader::GcpUploader; pub use interface::FileUploader; -pub use interface::FileUri; pub use local_snapshot_uploader::LocalSnapshotUploader; pub use local_uploader::LocalUploader; diff --git a/mithril-aggregator/src/snapshotter.rs b/mithril-aggregator/src/snapshotter.rs index d624ac3b5f..eb3558fb76 100644 --- a/mithril-aggregator/src/snapshotter.rs +++ b/mithril-aggregator/src/snapshotter.rs @@ -16,6 +16,7 @@ use mithril_common::StdResult; use crate::dependency_injection::DependenciesBuilderError; use crate::ZstandardCompressionParameters; +#[cfg_attr(test, mockall::automock)] /// Define the ability to create snapshots. pub trait Snapshotter: Sync + Send { /// Create a new snapshot with the given filepath. @@ -23,6 +24,12 @@ pub trait Snapshotter: Sync + Send { /// Create a new snapshot with the given filepath from a subset of directories and files. fn snapshot_subset(&self, filepath: &Path, files: Vec) -> StdResult; + + /// Check if the snapshot exists. + fn does_snapshot_exist(&self, filepath: &Path) -> bool; + + /// Give the full target path for the filepath. + fn get_file_path(&self, filepath: &Path) -> PathBuf; } /// Compression algorithm and parameters of the [CompressedArchiveSnapshotter]. @@ -109,6 +116,9 @@ pub struct CompressedArchiveSnapshotter { /// Compression algorithm used for the archive compression_algorithm: SnapshotterCompressionAlgorithm, + // Temporary directory to store the unpacked archive for verification + temp_dir: PathBuf, + logger: Logger, } @@ -181,6 +191,14 @@ impl Snapshotter for CompressedArchiveSnapshotter { self.snapshot(filepath, appender) } + + fn does_snapshot_exist(&self, filepath: &Path) -> bool { + self.get_file_path(filepath).exists() + } + + fn get_file_path(&self, filepath: &Path) -> PathBuf { + self.ongoing_snapshot_directory.join(filepath) + } } impl CompressedArchiveSnapshotter { @@ -214,12 +232,19 @@ impl CompressedArchiveSnapshotter { db_directory, ongoing_snapshot_directory, compression_algorithm, + temp_dir: std::env::temp_dir(), logger: logger.new_with_component_name::(), }) } + #[cfg(test)] + /// Allow to use a custom temporary directory to avoid conflicts during the snapshot verification. + pub fn set_sub_temp_dir>(&mut self, sub_dir: P) { + self.temp_dir = std::env::temp_dir().join(sub_dir); + } + fn snapshot(&self, filepath: &Path, appender: T) -> StdResult { - let archive_path = self.ongoing_snapshot_directory.join(filepath); + let archive_path = self.get_file_path(filepath); if let Some(archive_dir) = archive_path.parent() { fs::create_dir_all(archive_dir).with_context(|| { format!( @@ -358,7 +383,8 @@ impl CompressedArchiveSnapshotter { } }; - let unpack_temp_dir = std::env::temp_dir() + let unpack_temp_dir = self + .temp_dir .join("mithril_snapshotter_verify_archive") // Add the archive name to the directory to allow two verifications at the same time // (useful for tests). @@ -476,7 +502,7 @@ impl Snapshotter for DumbSnapshotter { .write() .map_err(|e| SnapshotError::UploadFileError(e.to_string()))?; let snapshot = OngoingSnapshot { - filepath: archive_name.to_path_buf(), + filepath: self.get_file_path(archive_name), filesize: 0, }; *value = Some(snapshot.clone()); @@ -491,6 +517,14 @@ impl Snapshotter for DumbSnapshotter { ) -> StdResult { self.snapshot_all(archive_name) } + + fn does_snapshot_exist(&self, _filepath: &Path) -> bool { + false + } + + fn get_file_path(&self, filepath: &Path) -> PathBuf { + filepath.to_path_buf() + } } #[cfg(test)] @@ -537,7 +571,7 @@ mod tests { } #[test] - fn test_dumb_snapshotter_snasphot_return_archive_name_with_size_0() { + fn test_dumb_snapshotter_snapshot_return_archive_name_with_size_0() { let snapshotter = DumbSnapshotter::new(); let snapshot = snapshotter .snapshot_all(Path::new("archive.tar.gz")) @@ -850,4 +884,96 @@ mod tests { assert!(unpack_path.join(directory_to_archive_path).is_dir()); assert!(unpack_path.join(file_to_archive_path).is_file()); } + + #[test] + fn snapshot_overwrite_archive_already_existing() { + let test_dir = get_test_directory("snapshot_overwrite_archive_already_existing"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + create_file(&source, "file_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source.clone(), + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let first_snapshot = snapshotter + .snapshot_all(Path::new(&random_archive_name())) + .unwrap(); + let first_snapshot_size = first_snapshot.get_file_size(); + + create_file(&source, "another_file_to_archive.txt"); + + let second_snapshot = snapshotter + .snapshot_all(Path::new(&random_archive_name())) + .unwrap(); + let second_snapshot_size = second_snapshot.get_file_size(); + + assert_ne!(first_snapshot_size, second_snapshot_size); + + let unpack_path = unpack_gz_decoder(test_dir, second_snapshot); + assert!(unpack_path.join("another_file_to_archive.txt").exists()); + } + + #[test] + fn is_snapshot_exist_return_true_when_file_exists() { + let test_dir = get_test_directory("is_snapshot_exist_return_true_when_file_exists"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + create_file(&source, "file_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot_path = PathBuf::from(random_archive_name()); + assert!(!snapshotter.does_snapshot_exist(&snapshot_path)); + + snapshotter.snapshot_all(&snapshot_path).unwrap(); + + assert!(snapshotter.does_snapshot_exist(&snapshot_path)); + } + + #[test] + fn can_return_full_archive_path() { + let destination = get_test_directory("can_return_full_archive_path"); + let source = PathBuf::from("/source"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination.clone(), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot_path = PathBuf::from("snapshot.tar.gz"); + let full_path = snapshotter.get_file_path(&snapshot_path); + + assert_eq!(destination.join("snapshot.tar.gz"), full_path); + } + + #[test] + fn can_set_temp_dir_with_path_or_str() { + let mut snapshotter = CompressedArchiveSnapshotter::new( + PathBuf::from("db"), + PathBuf::from("pending_snapshot"), + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + snapshotter.set_sub_temp_dir(Path::new("sub_dir")); + snapshotter.set_sub_temp_dir(PathBuf::from("sub_dir")); + snapshotter.set_sub_temp_dir("sub_dir"); + } } diff --git a/mithril-common/Cargo.toml b/mithril-common/Cargo.toml index e2fad47c05..4a2367215d 100644 --- a/mithril-common/Cargo.toml +++ b/mithril-common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-common" -version = "0.4.104" +version = "0.4.105" description = "Common types, interfaces, and utilities for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-common/src/entities/cardano_database.rs b/mithril-common/src/entities/cardano_database.rs index a7b78d8249..fa9e35f246 100644 --- a/mithril-common/src/entities/cardano_database.rs +++ b/mithril-common/src/entities/cardano_database.rs @@ -8,6 +8,8 @@ use crate::{ signable_builder::Artifact, }; +use super::MultiFilesUri; + /// Cardano database snapshot. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct CardanoDatabaseSnapshot { @@ -68,8 +70,8 @@ impl CardanoDatabaseSnapshot { } } -/// Locations of the the immutable file digests. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +/// Locations of the immutable file digests. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "type")] pub enum DigestLocation { /// Aggregator digest route location. @@ -84,19 +86,21 @@ pub enum DigestLocation { }, } -/// Locations of the ancillary files. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +/// Locations of the immutable files. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "type")] pub enum ImmutablesLocation { /// Cloud storage location. CloudStorage { /// URI of the cloud storage location. - uri: String, + uri: MultiFilesUri, }, } /// Locations of the ancillary files. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, EnumDiscriminants)] +#[derive( + Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, EnumDiscriminants, +)] #[serde(rename_all = "snake_case", tag = "type")] pub enum AncillaryLocation { /// Cloud storage location. @@ -145,89 +149,93 @@ mod tests { ) } - #[test] - fn test_cardano_database_snapshot_compute_hash() { - let cardano_database_snapshot = CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - }; - - assert_eq!( - "b1cc5e0deaa7856e8e811e349d6e639fa667aa70288602955f438c5893ce29c8", - cardano_database_snapshot.compute_hash() - ); - } - - #[test] - fn compute_hash_returns_same_hash_with_same_cardano_database_snapshot() { - assert_eq!( - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - } - .compute_hash(), - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - } - .compute_hash() - ); - } - - #[test] - fn compute_hash_returns_different_hash_with_different_merkle_root() { - assert_ne!( - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - } - .compute_hash(), - CardanoDatabaseSnapshot { - merkle_root: "mk-root-456".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - } - .compute_hash() - ); - } - - #[test] - fn compute_hash_returns_different_hash_with_same_epoch_in_beacon() { - assert_eq!( - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 98), - ..dummy() - } - .compute_hash(), - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(123, 12), - ..dummy() - } - .compute_hash() - ); - } + mod cardano_database_snapshot_compute_hash { + use super::*; - #[test] - fn compute_hash_returns_different_hash_with_different_beacon() { - assert_ne!( - CardanoDatabaseSnapshot { + #[test] + fn test_cardano_database_snapshot_compute_hash() { + let cardano_database_snapshot = CardanoDatabaseSnapshot { merkle_root: "mk-root-123".to_string(), beacon: CardanoDbBeacon::new(123, 98), ..dummy() - } - .compute_hash(), - CardanoDatabaseSnapshot { - merkle_root: "mk-root-123".to_string(), - beacon: CardanoDbBeacon::new(456, 98), - ..dummy() - } - .compute_hash() - ); + }; + + assert_eq!( + "b1cc5e0deaa7856e8e811e349d6e639fa667aa70288602955f438c5893ce29c8", + cardano_database_snapshot.compute_hash() + ); + } + + #[test] + fn compute_hash_returns_same_hash_with_same_cardano_database_snapshot() { + assert_eq!( + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash(), + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash() + ); + } + + #[test] + fn compute_hash_returns_different_hash_with_different_merkle_root() { + assert_ne!( + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash(), + CardanoDatabaseSnapshot { + merkle_root: "mk-root-456".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash() + ); + } + + #[test] + fn compute_hash_returns_different_hash_with_same_epoch_in_beacon() { + assert_eq!( + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash(), + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 12), + ..dummy() + } + .compute_hash() + ); + } + + #[test] + fn compute_hash_returns_different_hash_with_different_beacon() { + assert_ne!( + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(123, 98), + ..dummy() + } + .compute_hash(), + CardanoDatabaseSnapshot { + merkle_root: "mk-root-123".to_string(), + beacon: CardanoDbBeacon::new(456, 98), + ..dummy() + } + .compute_hash() + ); + } } } diff --git a/mithril-common/src/entities/file_uri.rs b/mithril-common/src/entities/file_uri.rs new file mode 100644 index 0000000000..88a870cd6e --- /dev/null +++ b/mithril-common/src/entities/file_uri.rs @@ -0,0 +1,92 @@ +use std::collections::HashSet; + +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; + +use crate::StdResult; + +/// FileUri represents a file URI used to identify the file's location +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct FileUri(pub String); + +impl From for String { + fn from(file_uri: FileUri) -> Self { + file_uri.0 + } +} + +/// [TemplateUri] represents an URI pattern used to build a file's location +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] +pub struct TemplateUri(pub String); + +/// [MultiFilesUri] represents a unique location uri for multiple files +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] +pub enum MultiFilesUri { + /// URI template representing several URI + Template(TemplateUri), +} + +impl MultiFilesUri { + /// Extract a template from a list of URIs + pub fn extract_template_from_uris( + file_uris: Vec, + extractor: impl Fn(&str) -> StdResult>, + ) -> StdResult> { + let mut templates = HashSet::new(); + for file_uri in file_uris { + let template_uri = extractor(&file_uri)?; + template_uri.map(|template| templates.insert(template)); + } + + if templates.len() > 1 { + return Err(anyhow!("Multiple templates found in the file URIs")); + } + + Ok(templates.into_iter().next().map(TemplateUri)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn returns_template() { + let file_uris = vec![ + "http://whatever/00001.tar.gz".to_string(), + "http://whatever/00002.tar.gz".to_string(), + ]; + fn extractor_returning_same_uri(_file_uri: &str) -> StdResult> { + Ok(Some( + "http://whatever/{immutable_file_number}.tar.gz".to_string(), + )) + } + + let template = + MultiFilesUri::extract_template_from_uris(file_uris, extractor_returning_same_uri) + .unwrap(); + + assert_eq!( + template, + Some(TemplateUri( + "http://whatever/{immutable_file_number}.tar.gz".to_string() + )) + ); + } + + #[test] + fn returns_error_with_multiple_templates() { + let file_uris = vec![ + "http://whatever/00001.tar.gz".to_string(), + "http://00002.tar.gz/whatever".to_string(), + ]; + fn extractor_returning_different_uri(file_uri: &str) -> StdResult> { + Ok(Some(file_uri.to_string())) + } + + MultiFilesUri::extract_template_from_uris(file_uris, extractor_returning_different_uri) + .expect_err( + "Should return an error when multiple templates are found in the file URIs", + ); + } +} diff --git a/mithril-common/src/entities/mod.rs b/mithril-common/src/entities/mod.rs index e98b700347..2c0eeb04f3 100644 --- a/mithril-common/src/entities/mod.rs +++ b/mithril-common/src/entities/mod.rs @@ -16,6 +16,7 @@ mod certificate_metadata; mod certificate_pending; mod compression_algorithm; mod epoch; +mod file_uri; mod http_server_error; mod mithril_stake_distribution; mod protocol_message; @@ -48,6 +49,7 @@ pub use certificate_metadata::{CertificateMetadata, StakeDistributionParty}; pub use certificate_pending::CertificatePending; pub use compression_algorithm::*; pub use epoch::{Epoch, EpochError}; +pub use file_uri::{FileUri, MultiFilesUri, TemplateUri}; pub use http_server_error::{ClientError, ServerError}; pub use mithril_stake_distribution::MithrilStakeDistribution; pub use protocol_message::{ProtocolMessage, ProtocolMessagePartKey, ProtocolMessagePartValue}; diff --git a/mithril-common/src/messages/cardano_database.rs b/mithril-common/src/messages/cardano_database.rs index 0ae8215da8..2ee6b40f73 100644 --- a/mithril-common/src/messages/cardano_database.rs +++ b/mithril-common/src/messages/cardano_database.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::entities::{ AncillaryLocation, ArtifactsLocations, CardanoDbBeacon, CompressionAlgorithm, DigestLocation, - Epoch, ImmutablesLocation, + Epoch, ImmutablesLocation, MultiFilesUri, TemplateUri, }; /// Locations of the Cardano database related files. @@ -81,10 +81,14 @@ impl CardanoDatabaseSnapshotMessage { }], immutables: vec![ ImmutablesLocation::CloudStorage { - uri: "https://host-1/immutables-2".to_string(), + uri: MultiFilesUri::Template(TemplateUri( + "https://host-1/immutables-2".to_string(), + )), }, ImmutablesLocation::CloudStorage { - uri: "https://host-2/immutables-2".to_string(), + uri: MultiFilesUri::Template(TemplateUri( + "https://host-2/immutables-2".to_string(), + )), }, ], ancillary: vec![AncillaryLocation::CloudStorage { @@ -121,11 +125,15 @@ mod tests { "immutables": [ { "type": "cloud_storage", - "uri": "https://host-1/immutables-2" + "uri": { + "Template": "https://host-1/immutables-{immutable_file_number}" + } }, { "type": "cloud_storage", - "uri": "https://host-2/immutables-2" + "uri": { + "Template": "https://host-2/immutables-{immutable_file_number}" + } } ], "ancillary": [ @@ -161,10 +169,14 @@ mod tests { }], immutables: vec![ ImmutablesLocation::CloudStorage { - uri: "https://host-1/immutables-2".to_string(), + uri: MultiFilesUri::Template(TemplateUri( + "https://host-1/immutables-{immutable_file_number}".to_string(), + )), }, ImmutablesLocation::CloudStorage { - uri: "https://host-2/immutables-2".to_string(), + uri: MultiFilesUri::Template(TemplateUri( + "https://host-2/immutables-{immutable_file_number}".to_string(), + )), }, ], ancillary: vec![AncillaryLocation::CloudStorage { diff --git a/openapi.yaml b/openapi.yaml index d697ce2b10..ce3c48e1bf 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -4,7 +4,7 @@ info: # `mithril-common/src/lib.rs` file. If you plan to update it # here to reflect changes in the API, please also update the constant in the # Rust file. - version: 0.1.41 + version: 0.1.42 title: Mithril Aggregator Server description: | The REST API provided by a Mithril Aggregator Node in a Mithril network. @@ -1871,28 +1871,45 @@ components: immutables: type: array items: - $ref: "#/components/schemas/CardanoDatabaseArtifactLocationMessagePart" + $ref: "#/components/schemas/CardanoDatabaseImmutableLocationMessagePart" ancillary: type: array items: $ref: "#/components/schemas/CardanoDatabaseArtifactLocationMessagePart" examples: - [ - { - "merkle_root": "c8224920b9f5ad7377594eb8a15f34f08eb3103cc5241d57cafc5638403ec7c6", - "beacon": + { + "digests": + [ { - "network": "preview", - "epoch": 123, - "immutable_file_number": 2345 + "type": "cloud_storage", + "uri": "https://aggregator-endpoint/artifact/cardano-database/digests" + } + ], + "immutables": + [ + { + "type": "cloud_storage", + "uri": + { + "Template": "https://mithril-cdn-us.iohk.io/snapshot/{immutable-file_number}.tar.zst" + } }, - "certificate_hash": "f6c01b373bafc4e039844071d5da3ace4a9c0745b9e9560e3e2af01823e9abfb", - "total_db_size_uncompressed": 800796318, - "compression_algorithm": "gzip", - "cardano_node_version": "0.0.1", - "created_at": "2023-01-19T13:43:05.618857482Z" - } - ] + { + "type": "cloud_storage", + "uri": + { + "Template": "https://mithril-cdn-eu.iohk.io/snapshot/{immutable-file_number}.tar.zst" + } + } + ], + "ancillary": + [ + { + "type": "cloud_storage", + "uri": "https://mithril-cdn-us.iohk.io/snapshot/ancillary-6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732.tar.zst" + } + ] + } CardanoDatabaseArtifactLocationMessagePart: description: CardanoDatabaseArtifactLocationMessagePart represents the location of a single Cardano database artifact @@ -1913,6 +1930,45 @@ components: "uri": "https://mithril-cdn-us.iohk.io/snapshot/digests-6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732.txt" } + CardanoDatabaseImmutableLocationMessagePart: + description: CardanoDatabaseImmutableLocationMessagePart represents the location of a single Cardano database immutable archive + type: object + required: + - type + - uri + properties: + type: + description: Type of the artifact location + type: string + uri: + $ref: "#/components/schemas/MultiFilesUri" + examples: + { + "type": "cloud_storage", + "uri": + { "Template": "https://aggregator/{immutable_file_number}.tar.gz" } + } + + MultiFilesUri: + description: MultiFilesUri represents information to retrieve multiple files + oneOf: + - $ref: "#/components/schemas/TemplateUri" + examples: + { "Template": "https://aggregator/{immutable_file_number}.tar.gz" } + + TemplateUri: + description: TemplateUri represents a URI template for multiple files + type: object + additionalProperties: false + required: + - Template + properties: + Template: + description: URI template + type: string + examples: + { "Template": "https://aggregator/{immutable_file_number}.tar.gz" } + CardanoDatabaseSnapshotListMessage: description: CardanoDatabaseSnapshotListMessage represents a list of Cardano database snapshots type: array @@ -2055,18 +2111,24 @@ components: [ { "type": "cloud_storage", - "uri": "https://mithril-cdn-us.iohk.io/snapshot/digests-6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732.txt" + "uri": "https://aggregator-endpoint/artifact/cardano-database/digests" } ], "immutables": [ { "type": "cloud_storage", - "uri": "https://mithril-cdn-us.iohk.io/snapshot/immutables-{immutable-file_number}.tar.zst" + "uri": + { + "Template": "https://mithril-cdn-us.iohk.io/snapshot/{immutable-file_number}.tar.zst" + } }, { "type": "cloud_storage", - "uri": "https://mithril-cdn-eu.iohk.io/snapshot/immutables-6367ee65d0d1272e6e70736a1ea2cae34015874517f6328364f6b73930966732-{immutable-file_number}.tar.zst" + "uri": + { + "Template": "https://mithril-cdn-eu.iohk.io/snapshot/{immutable-file_number}.tar.zst" + } } ], "ancillary":