diff --git a/Cargo.lock b/Cargo.lock index 1ce9a33ca69..041bf4a1236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,6 +367,15 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" +dependencies = [ + "serde", +] + [[package]] name = "cast" version = "0.3.0" @@ -3082,6 +3091,7 @@ dependencies = [ "anyhow", "base64 0.22.1", "bytes", + "bytesize", "chrono", "clap", "criterion", diff --git a/relay/sources/relayd/Cargo.toml b/relay/sources/relayd/Cargo.toml index b32e5c0a9b9..936fa89a8b8 100644 --- a/relay/sources/relayd/Cargo.toml +++ b/relay/sources/relayd/Cargo.toml @@ -18,6 +18,7 @@ name = "benches" # (it works since https://github.com/openssl/openssl/commit/e2590c3a162eb118c36b09c2168164283aa099b4) anyhow = "1" base64 = "0.22" +bytesize = { version = "1.3.0", features = ["serde"] } bytes = "1" chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] } clap = { version = "4.4.6", features = ["derive"] } diff --git a/relay/sources/relayd/src/api/shared_files.rs b/relay/sources/relayd/src/api/shared_files.rs index 561fdd4fdcb..1f3dcdb0887 100644 --- a/relay/sources/relayd/src/api/shared_files.rs +++ b/relay/sources/relayd/src/api/shared_files.rs @@ -43,8 +43,11 @@ pub fn routes_1(job_config: Arc) -> BoxedFilter<(impl Reply,)> { }); let job_config_put = job_config; + let max_size = job_config_put.cfg.shared_files.max_size; let put = method::put() .map(move || job_config_put.clone()) + // Checking the header is enough as hyper will not read more. + .and(body::content_length_limit(max_size.as_u64())) .and(base) .and(query::()) .and(body::bytes()) diff --git a/relay/sources/relayd/src/configuration/main.rs b/relay/sources/relayd/src/configuration/main.rs index b808c656ab1..e5a34d28cea 100644 --- a/relay/sources/relayd/src/configuration/main.rs +++ b/relay/sources/relayd/src/configuration/main.rs @@ -12,6 +12,7 @@ use std::{ }; use anyhow::{anyhow, bail, Context, Error}; +use bytesize::ByteSize; use secrecy::SecretString; use serde::{ de::{Deserializer, Error as SerdeError, Unexpected, Visitor}, @@ -316,6 +317,8 @@ pub struct InventoryConfig { pub catchup: CatchupConfig, #[serde(default)] pub cleanup: CleanupConfig, + #[serde_inline_default(ByteSize::mib(5))] + pub max_size: ByteSize, } impl Default for InventoryConfig { @@ -350,6 +353,8 @@ pub struct ReportingConfig { pub cleanup: CleanupConfig, #[serde(default)] pub skip_event_types: HashSet, + #[serde_inline_default(ByteSize::mib(1))] + pub max_size: ByteSize, } impl Default for ReportingConfig { @@ -427,6 +432,8 @@ pub struct SharedFiles { pub path: PathBuf, #[serde(default)] pub cleanup: SharedFilesCleanupConfig, + #[serde_inline_default(ByteSize::mib(50))] + pub max_size: ByteSize, } impl Default for SharedFiles { @@ -539,7 +546,7 @@ pub struct RsyncConfig { #[serde_inline_default("localhost:5310".into())] listen: String, - // False to allow non authenticated clients + // False to allow non-authenticated clients #[serde_inline_default(true)] authentication: bool, } @@ -614,6 +621,7 @@ mod tests { frequency: Duration::from_secs(3600), retention: Duration::from_secs(3600 * 24 * 7), }, + max_size: ByteSize::mib(5), }, reporting: ReportingConfig { directory: PathBuf::from("/var/rudder/reports/"), @@ -627,6 +635,7 @@ mod tests { retention: Duration::from_secs(3600 * 24 * 7), }, skip_event_types: HashSet::new(), + max_size: ByteSize::mib(1), }, }, output: OutputConfig { @@ -655,6 +664,7 @@ mod tests { cleanup: SharedFilesCleanupConfig { frequency: Duration::from_secs(600), }, + max_size: ByteSize::mib(50), }, shared_folder: SharedFolder { path: PathBuf::from("/var/rudder/configuration-repository/shared-files/"), @@ -720,6 +730,7 @@ mod tests { frequency: Duration::from_secs(10), retention: Duration::from_secs(10), }, + max_size: ByteSize::mib(150), }, reporting: ReportingConfig { directory: PathBuf::from("target/tmp/reporting/"), @@ -733,6 +744,7 @@ mod tests { retention: Duration::from_secs(30 * 60 + 20), }, skip_event_types: HashSet::new(), + max_size: ByteSize::gib(1), }, }, output: OutputConfig { @@ -763,6 +775,7 @@ mod tests { cleanup: SharedFilesCleanupConfig { frequency: Duration::from_secs(43), }, + max_size: ByteSize::kib(42), }, shared_folder: SharedFolder { path: PathBuf::from("tests/api_shared_folder"), diff --git a/relay/sources/relayd/src/processing.rs b/relay/sources/relayd/src/processing.rs index 5deef1ad4b4..130a4e4393f 100644 --- a/relay/sources/relayd/src/processing.rs +++ b/relay/sources/relayd/src/processing.rs @@ -6,7 +6,8 @@ use std::{ path::{Path, PathBuf}, }; -use anyhow::Error; +use anyhow::{bail, Error}; +use bytesize::ByteSize; use sha2::{Digest, Sha256}; use tokio::fs::{remove_file, rename}; use tracing::debug; @@ -66,6 +67,33 @@ async fn failure(file: ReceivedFile, directory: RootDirectory) -> Result<(), Err Ok(()) } +/// Returns Ok if the file is smaller than the limit, otherwise moves it to the failed directory +/// and returns an error. +async fn ensure_file_size_limit( + file: ReceivedFile, + limit: ByteSize, + directory: PathBuf, +) -> Result<(), Error> { + match file.metadata().map(|m| m.len()) { + Ok(size) => { + if size > limit.as_u64() { + failure(file.clone(), directory).await?; + bail!( + "skipping {:#?} as it is too large ({} bytes > {})", + file, + size, + limit + ); + } else { + Ok(()) + } + } + Err(e) => { + bail!("skipping {:#?} as it could not be read: {}", file, e); + } + } +} + /// Computes an id from a file name. It is added as key-value in tracing and allows following a file across relays /// with a simple id. /// It does not need to be cryptographic but only to have reasonable statistic quality, but we already have sha2 available diff --git a/relay/sources/relayd/src/processing/inventory.rs b/relay/sources/relayd/src/processing/inventory.rs index cc7a088a39d..73b37193237 100644 --- a/relay/sources/relayd/src/processing/inventory.rs +++ b/relay/sources/relayd/src/processing/inventory.rs @@ -7,6 +7,7 @@ use anyhow::Error; use tokio::sync::mpsc; use tracing::{debug, error, info, instrument, span, Instrument, Level}; +use crate::processing::ensure_file_size_limit; use crate::{ configuration::main::InventoryOutputSelect, input::watch::*, @@ -83,6 +84,20 @@ async fn serve( continue; } + match ensure_file_size_limit( + file.clone(), + job_config.cfg.processing.inventory.max_size, + job_config.cfg.processing.inventory.directory.clone(), + ) + .await + { + Ok(_) => (), + Err(e) => { + error!("{:?}", e); + continue; + } + } + let queue_id = queue_id_from_file(&file); debug!("received: {:?}", file); let span = span!( diff --git a/relay/sources/relayd/src/processing/reporting.rs b/relay/sources/relayd/src/processing/reporting.rs index 5d7834ee483..77a56efba03 100644 --- a/relay/sources/relayd/src/processing/reporting.rs +++ b/relay/sources/relayd/src/processing/reporting.rs @@ -7,6 +7,7 @@ use anyhow::Error; use tokio::{sync::mpsc, task::spawn_blocking}; use tracing::{debug, error, info, instrument, span, warn, Instrument, Level}; +use crate::processing::ensure_file_size_limit; use crate::{ configuration::main::ReportingOutputSelect, data::{RunInfo, RunLog}, @@ -65,6 +66,20 @@ async fn serve(job_config: Arc, mut rx: mpsc::Receiver) continue; } + match ensure_file_size_limit( + file.clone(), + job_config.cfg.processing.reporting.max_size, + job_config.cfg.processing.reporting.directory.clone(), + ) + .await + { + Ok(_) => (), + Err(e) => { + error!("{:?}", e); + continue; + } + } + let queue_id = queue_id_from_file(&file); let span = span!( Level::INFO, diff --git a/relay/sources/relayd/tests/files/config/main.conf b/relay/sources/relayd/tests/files/config/main.conf index 0ff537adfbf..d407afcfd25 100644 --- a/relay/sources/relayd/tests/files/config/main.conf +++ b/relay/sources/relayd/tests/files/config/main.conf @@ -16,6 +16,7 @@ https_idle_timeout = "42s" [processing.inventory] directory = "target/tmp/inventories/" output = "upstream" +max_size = "150 MiB" [processing.inventory.catchup] # to test compatibility with previous syntax @@ -31,6 +32,7 @@ retention = "10s" directory = "target/tmp/reporting/" output = "database" skip_event_types = [] +max_size = "1GiB" [processing.reporting.catchup] frequency = "10s" @@ -58,6 +60,7 @@ use_sudo = false [shared_files] path = "tests/api_shared_files" +max_size = "42kiB" [shared_files.cleanup] frequency = "43s" diff --git a/relay/sources/relayd/tests/shared_files.rs b/relay/sources/relayd/tests/shared_files.rs index c691b72f870..13d2950e416 100644 --- a/relay/sources/relayd/tests/shared_files.rs +++ b/relay/sources/relayd/tests/shared_files.rs @@ -77,6 +77,12 @@ fn it_shares_files() { .send().unwrap(); assert_eq!(500, upload.status()); + // Too big + + let upload = client.put("http://127.0.0.1:3030/rudder/relay-api/1/shared-files/37817c4d-fbf7-4850-a985-50021f4e8f41/e745a140-40bc-4b86-b6dc-084488fc906b/file2?ttl=1d").body(format!("{}\n{}", signature, content)) + .header("Content-Length", "10000000000") .send().unwrap(); + assert_eq!(500, upload.status()); + // Correct upload let upload = client.put("http://127.0.0.1:3030/rudder/relay-api/1/shared-files/37817c4d-fbf7-4850-a985-50021f4e8f41/e745a140-40bc-4b86-b6dc-084488fc906b/file2?ttl=1d").body(format!("{}\n{}", signature, content))