From 3fd3dc848ec2a076b3565556dbaa1c7f5ca4acae Mon Sep 17 00:00:00 2001 From: Mihir Nanavati Date: Fri, 24 May 2024 00:55:06 -0400 Subject: [PATCH 1/4] exposition: impose row group limit for parquet Remove internal buffering from the ParquetWriter and instead write single snapshots to the ArrowWriter which buffers data internally until the row group size limit is reached. Set the row group limit for the ArrowWriter from the specified batch size in the ParquetOptions. Finally, reduce the default row group size from 1M to 50K. --- Cargo.lock | 48 ++++ metriken-exposition/Cargo.toml | 3 + metriken-exposition/src/lib.rs | 2 +- metriken-exposition/src/parquet.rs | 393 ++++++++++++++++++++--------- 4 files changed, 322 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a1bcd8..714745e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,6 +448,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "fastrand" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" + [[package]] name = "flatbuffers" version = "23.5.26" @@ -677,6 +693,12 @@ dependencies = [ "syn", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" + [[package]] name = "lock_api" version = "0.4.12" @@ -752,6 +774,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "tempfile", ] [[package]] @@ -1072,6 +1095,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +dependencies = [ + "bitflags 2.5.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "ryu" version = "1.0.18" @@ -1171,6 +1207,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tempfile" +version = "3.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys", +] + [[package]] name = "termcolor" version = "1.4.1" diff --git a/metriken-exposition/Cargo.toml b/metriken-exposition/Cargo.toml index ab4787a..15fac6a 100644 --- a/metriken-exposition/Cargo.toml +++ b/metriken-exposition/Cargo.toml @@ -21,6 +21,9 @@ rmp-serde = { version = "1.1.2", optional = true } serde = { version = "1.0.196", features = ["derive"], optional = true } serde_json = { version = "1.0.114", optional = true } +[dev-dependencies] +tempfile = "3.10.1" + [features] serde = ["dep:serde", "chrono/serde", "histogram/serde"] json = ["dep:serde", "dep:serde_json"] diff --git a/metriken-exposition/src/lib.rs b/metriken-exposition/src/lib.rs index c1ba8da..e5d4938 100644 --- a/metriken-exposition/src/lib.rs +++ b/metriken-exposition/src/lib.rs @@ -14,7 +14,7 @@ mod snapshotter; pub use convert::MsgpackToParquet; #[cfg(feature = "parquet")] pub use parquet::{ - ParquetCompression, ParquetHistogramStorage, ParquetOptions, ParquetSchema, ParquetWriter, + ParquetCompression, ParquetHistogramType, ParquetOptions, ParquetSchema, ParquetWriter, }; pub use snapshot::{Counter, Gauge, Histogram, Snapshot}; pub use snapshotter::{Snapshotter, SnapshotterBuilder}; diff --git a/metriken-exposition/src/parquet.rs b/metriken-exposition/src/parquet.rs index 532e26f..3dcace2 100644 --- a/metriken-exposition/src/parquet.rs +++ b/metriken-exposition/src/parquet.rs @@ -4,8 +4,6 @@ use std::sync::Arc; use arrow::array::*; use arrow::datatypes::*; -use arrow::error::ArrowError; -use histogram::Histogram; use parquet::arrow::ArrowWriter; use parquet::basic::{Compression, ZstdLevel}; use parquet::errors::ParquetError; @@ -14,7 +12,7 @@ use parquet::format::{FileMetaData, KeyValue}; use crate::snapshot::{HashedSnapshot, Snapshot}; -const DEFAULT_MAX_BATCH_SIZE: usize = 1024 * 1024; +const DEFAULT_MAX_BATCH_SIZE: usize = 50_000; #[derive(Clone, Debug)] pub struct ParquetCompression { @@ -46,9 +44,9 @@ impl Default for ParquetCompression { } } -/// Storage representation for histograms within the parquet file. +/// Type representation for histograms within the parquet file. #[derive(Clone, Copy, Debug, PartialEq)] -pub enum ParquetHistogramStorage { +pub enum ParquetHistogramType { Standard, Sparse, } @@ -61,7 +59,7 @@ pub struct ParquetOptions { /// Number of rows cached in memory before being written as a `RecordBatch` max_batch_size: usize, /// Type of representation used to store histograms - histogram: ParquetHistogramStorage, + histogram_type: ParquetHistogramType, } impl ParquetOptions { @@ -73,7 +71,7 @@ impl ParquetOptions { /// Sets the compression level for the parquet file. The default is no /// compression. Set the compression level to a corresponding zstd level to /// enable compression. - pub fn compression(mut self, compression: ParquetCompression) -> Self { + pub fn with_compression(mut self, compression: ParquetCompression) -> Self { self.compression = compression; self } @@ -81,15 +79,15 @@ impl ParquetOptions { /// Sets the number of rows to be cache in memory before being written as a /// `RecordBatch`. Large values have better performance at the cost of /// additional memory usage. The default is ~1M rows (2^20). - pub fn max_batch_size(mut self, batch_size: usize) -> Self { + pub fn with_max_batch_size(mut self, batch_size: usize) -> Self { self.max_batch_size = batch_size; self } - /// Sets the storage type for histogram data: standard or sparse. The - /// default is the standard (dense) histogram. - pub fn histogram(mut self, histogram: ParquetHistogramStorage) -> Self { - self.histogram = histogram; + /// Sets the type for histogram data: standard or sparse. The default is + /// the standard (dense) histogram. + pub fn with_histogram_type(mut self, histogram: ParquetHistogramType) -> Self { + self.histogram_type = histogram; self } } @@ -99,7 +97,7 @@ impl Default for ParquetOptions { Self { compression: Default::default(), max_batch_size: DEFAULT_MAX_BATCH_SIZE, - histogram: ParquetHistogramStorage::Standard, + histogram_type: ParquetHistogramType::Standard, } } } @@ -194,7 +192,7 @@ impl ParquetSchema { )])), ); - let mut counters = BTreeMap::new(); + let mut counters = Vec::with_capacity(self.counters.len()); // Create one column field per-counter for (counter, mut metadata) in self.counters.into_iter() { @@ -206,10 +204,10 @@ impl ParquetSchema { .push(Field::new(counter.clone(), DataType::UInt64, true).with_metadata(metadata)); // initialize storage for the counter values - counters.insert(counter, Vec::with_capacity(self.rows)); + counters.push(counter); } - let mut gauges = BTreeMap::new(); + let mut gauges = Vec::with_capacity(self.gauges.len()); // Create one column field per-gauge for (gauge, mut metadata) in self.gauges.into_iter() { @@ -220,10 +218,10 @@ impl ParquetSchema { fields.push(Field::new(gauge.clone(), DataType::Int64, true).with_metadata(metadata)); // initialize storage for the gauge values - gauges.insert(gauge, Vec::with_capacity(self.rows)); + gauges.push(gauge); } - let mut histograms = BTreeMap::new(); + let mut histograms = Vec::with_capacity(self.histograms.len()); // Create columns for the snapshot: the buckets are stored as a // nested list type where each list element is an array of `u64`s. @@ -233,8 +231,8 @@ impl ParquetSchema { // representation, the non-zero bucket indices and counts are stored // in separate columns. for (histogram, mut metadata) in self.histograms.into_iter() { - match options.histogram { - ParquetHistogramStorage::Standard => { + match options.histogram_type { + ParquetHistogramType::Standard => { // merge metric annotations into the metric metadata metadata.insert("metric_type".to_string(), "histogram".to_string()); @@ -247,13 +245,13 @@ impl ParquetSchema { .with_metadata(metadata.clone()), ); } - ParquetHistogramStorage::Sparse => { + ParquetHistogramType::Sparse => { // merge metric annotations into the metric metadata metadata.insert("metric_type".to_string(), "sparse histogram".to_string()); fields.push( Field::new( - format!("{histogram}:bucket_index"), + format!("{histogram}:bucket_indices"), DataType::new_list(DataType::UInt64, true), true, ) @@ -261,7 +259,7 @@ impl ParquetSchema { ); fields.push( Field::new( - format!("{histogram}:bucket_count"), + format!("{histogram}:bucket_counts"), DataType::new_list(DataType::UInt64, true), true, ) @@ -271,7 +269,7 @@ impl ParquetSchema { }; // initialize storage for the histogram values - histograms.insert(histogram, Vec::with_capacity(self.rows)); + histograms.push(histogram); } let metadata: Option> = if self.metadata.is_empty() { @@ -292,6 +290,7 @@ impl ParquetSchema { let props = WriterProperties::builder() .set_compression(options.compression.inner) .set_key_value_metadata(metadata) + .set_max_row_group_size(options.max_batch_size) .build(); let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?; @@ -299,7 +298,6 @@ impl ParquetSchema { writer: arrow_writer, options, schema, - timestamps: Vec::new(), counters, gauges, histograms, @@ -313,135 +311,284 @@ pub struct ParquetWriter { options: ParquetOptions, schema: Arc, - /// Columnar data for timestamps - timestamps: Vec, - - /// Schema-ordered columnar data for counters - counters: BTreeMap>>, - - /// Schema-ordered columnar data for gauges - gauges: BTreeMap>>, - - /// Schema-ordered columnar data for histograms - histograms: BTreeMap>>, + /// Schema-ordered list of counters, gauges, and histograms + counters: Vec, + gauges: Vec, + histograms: Vec, } impl ParquetWriter { /// Process individual snapshots of metrics and store them in a columnar /// representation. Fill in the gaps for missing data, i.e., missing or /// dynamic metrics with `None` so that all columns have the same length. - /// Writes out batches of aggregated columns once they reach a certain size. + /// Writes them to the ArrowWriter, which internally buffers batches until + /// the maximum row group size is reached. pub fn push(&mut self, snapshot: Snapshot) -> Result<(), ParquetError> { + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + let mut hs: HashedSnapshot = HashedSnapshot::from(snapshot); - // Aggregate timestamps into a column - self.timestamps.push(hs.ts); + // Create a single element column for the timestamp + columns.push(Arc::new(UInt64Array::from(vec![hs.ts]))); - // Aggregate metrics in the existing columns. Since `remove` returns + // Create single element columns for metrics. Since `remove` returns // `None` if a metric in the schema does not exist in the snapshot gaps // are automatically filled without additional handling. - for (key, v) in self.counters.iter_mut() { - v.push(hs.counters.remove(key).map(|v| v.value)); + for counter in self.counters.iter_mut() { + columns.push(Arc::new(UInt64Array::from(vec![hs + .counters + .remove(counter) + .map(|v| v.value)]))); } - for (key, v) in self.gauges.iter_mut() { - v.push(hs.gauges.remove(key).map(|v| v.value)); + for gauge in self.gauges.iter_mut() { + columns.push(Arc::new(Int64Array::from(vec![hs + .gauges + .remove(gauge) + .map(|v| v.value)]))); } - for (key, v) in self.histograms.iter_mut() { - v.push(hs.histograms.remove(key).map(|v| v.value)); - } - - // Check and flush if the max batch size of rows have been processed - if self.timestamps.len() == self.options.max_batch_size { - let batch = self.snapshots_to_recordbatch()?; - self.writer.write(&batch)?; + for h in self.histograms.iter_mut() { + let histogram = hs.histograms.remove(h).map(|v| v.value); + if let Some(hist) = histogram { + match self.options.histogram_type { + ParquetHistogramType::Standard => { + columns.push(Self::listu64_entry_from_slice(hist.as_slice())) + } + ParquetHistogramType::Sparse => { + let sparse = histogram::SparseHistogram::from(&hist); + columns.push(Self::listu64_entry_from_vec(sparse.index)); + columns.push(Self::listu64_entry_from_slice(sparse.count.as_slice())); + } + }; + } else { + match self.options.histogram_type { + ParquetHistogramType::Standard => columns.push(Self::listu64_entry_null()), + ParquetHistogramType::Sparse => columns.append(&mut vec![ + Self::listu64_entry_null(), + Self::listu64_entry_null(), + ]), + }; + } } - Ok(()) + let batch = RecordBatch::try_new(self.schema.clone(), columns)?; + self.writer.write(&batch) } /// Finish writing any buffered metrics and the parquet footer. - pub fn finalize(mut self) -> Result { - let batch = self.snapshots_to_recordbatch()?; - self.writer.write(&batch)?; + pub fn finalize(self) -> Result { self.writer.close() } - /// Convert buffered metrics to a parquet `RecordBatch`. - fn snapshots_to_recordbatch(&mut self) -> Result { - let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + /// Create a list entry for an arrow lists of u64s from a slice. + fn listu64_entry_from_slice(v: &[u64]) -> Arc { + Arc::new(ListArray::from_iter_primitive::([Some( + v.iter().map(|x| Some(*x)).collect::>>(), + )])) + } + + /// Create a list entry for an arrow lists of u64s from a vector. + fn listu64_entry_from_vec(v: Vec) -> Arc { + Arc::new(ListArray::from_iter_primitive::([Some( + v.into_iter() + .map(|x| Some(x as u64)) + .collect::>>(), + )])) + } + + /// Create a null list entry for an arrow lists of u64s. + fn listu64_entry_null() -> Arc { + Arc::new(ListArray::from_iter_primitive::< + UInt64Type, + Vec>, + _, + >([None])) + } +} - // Move existing timestamp array into an arrow array and clear - columns.push(Arc::new(UInt64Array::from(std::mem::take( - &mut self.timestamps, - )))); +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::fs::File; + use std::io::Seek; + use std::time::{Duration, SystemTime}; + + use ::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use arrow::array::*; + use metriken::histogram::Histogram as H2Histogram; + + use crate::*; + + fn build_snapshots() -> Vec { + let h1 = H2Histogram::from_buckets(1, 3, vec![0, 1, 1, 0, 0, 0]).unwrap(); + let s1 = Snapshot { + systemtime: SystemTime::now(), + metadata: HashMap::new(), + counters: vec![Counter { + name: "counter".to_string(), + value: 100, + metadata: HashMap::new(), + }], + gauges: vec![Gauge { + name: "gauge".to_string(), + value: 16, + metadata: HashMap::new(), + }], + histograms: vec![Histogram { + name: "histogram".to_string(), + value: h1, + metadata: HashMap::new(), + }], + }; - // One column per-counter with a similar swap-and-clear of the vector - for (_, val) in self.counters.iter_mut() { - let col = std::mem::take(val); - columns.push(Arc::new(UInt64Array::from(col))); + let h2 = H2Histogram::from_buckets(1, 3, vec![0, 1, 1, 0, 1, 0]).unwrap(); + let s2 = Snapshot { + systemtime: SystemTime::now() + .checked_add(Duration::from_secs(600)) + .unwrap(), + metadata: HashMap::new(), + counters: vec![Counter { + name: "counter".to_string(), + value: 121, + metadata: HashMap::new(), + }], + gauges: vec![Gauge { + name: "gauge".to_string(), + value: 6, + metadata: HashMap::new(), + }], + histograms: vec![Histogram { + name: "histogram".to_string(), + value: h2, + metadata: HashMap::new(), + }], + }; + + vec![s1, s2] + } + + fn write_parquet(snapshots: Vec, options: ParquetOptions) -> File { + let mut schema = ParquetSchema::new(); + for s in &snapshots { + schema.push(s.clone()); } - // One column per-gauge with a similar swap-and-clear of the vector - for (_, val) in self.gauges.iter_mut() { - let col = std::mem::take(val); - columns.push(Arc::new(Int64Array::from(col))); + let mut tmpfile = tempfile::tempfile().unwrap(); + let mut writer = schema + .finalize(tmpfile.try_clone().unwrap(), options) + .unwrap(); + for s in &snapshots { + let _ = writer.push(s.clone()).unwrap(); } + let _ = writer.finalize(); - // One column, per-histogram, for the buckets if the histogram is - // stored in its standard representation; two columns for buckets - // per-histogram if it is stored in its sparse representation. - for (_, val) in self.histograms.iter_mut() { - let hists = std::mem::take(val); - let mut buckets = match self.options.histogram { - ParquetHistogramStorage::Standard => vec![ListBuilder::new(UInt64Builder::new())], - ParquetHistogramStorage::Sparse => vec![ - ListBuilder::new(UInt64Builder::new()), - ListBuilder::new(UInt64Builder::new()), - ], - }; + let _ = tmpfile.rewind(); + tmpfile + } - for h in hists { - if let Some(x) = h { - match self.options.histogram { - ParquetHistogramStorage::Standard => buckets[0].append_value( - x.into_iter() - .map(|x| Some(x.count())) - .collect::>>(), - ), - ParquetHistogramStorage::Sparse => { - let sparse = histogram::SparseHistogram::from(&x); - buckets[0].append_value( - sparse - .index - .into_iter() - .map(|x| Some(x as u64)) - .collect::>>(), - ); - buckets[1].append_value( - sparse - .count - .into_iter() - .map(Some) - .collect::>>(), - ); - } - }; - } else { - // Histogram missing; store `None` for buckets and summaries - buckets[0].append_null(); - if self.options.histogram == ParquetHistogramStorage::Sparse { - buckets[1].append_null(); - } - } - } - columns.push(Arc::new(buckets[0].finish())); - if self.options.histogram == ParquetHistogramStorage::Sparse { - columns.push(Arc::new(buckets[1].finish())); - } - } + fn validate_i64_array(col: ArrayRef, vals: &[i64]) { + let v = col.as_any().downcast_ref::().unwrap(); + assert_eq!(v.values(), vals); + } + + fn validate_u64_array(col: ArrayRef, vals: &[u64]) { + let v = col.as_any().downcast_ref::().unwrap(); + assert_eq!(v.values(), vals); + } + + #[test] + fn test_row_groups() { + let snapshots = build_snapshots(); + let tmpfile = write_parquet(snapshots, ParquetOptions::new().with_max_batch_size(1)); + let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); - RecordBatch::try_new(self.schema.clone(), columns) + // Check row groups + assert_eq!(builder.metadata().row_groups().len(), 2); + assert_eq!(builder.metadata().row_group(0).num_rows(), 1); + assert_eq!(builder.metadata().row_group(1).num_rows(), 1); + } + + #[test] + fn test_default() { + let snapshots = build_snapshots(); + let tmpfile = write_parquet(snapshots, ParquetOptions::new()); + let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); + + // Check row groups + assert_eq!(builder.metadata().row_groups().len(), 1); + assert_eq!(builder.metadata().row_group(0).num_rows(), 2); + + // Check schema + let fields: Vec<&String> = builder.schema().fields().iter().map(|x| x.name()).collect(); + let expected = vec!["timestamp", "counter", "gauge", "histogram:buckets"]; + assert_eq!(fields.len(), expected.len()); + assert_eq!(fields, expected); + + // Check data + let batch = builder.build().unwrap().next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 4); + assert_eq!(batch.num_rows(), 2); + + validate_u64_array(batch.column(1).clone(), &[100, 121]); + validate_i64_array(batch.column(2).clone(), &[16, 6]); + + let histograms = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + validate_u64_array(histograms.value(0), &[0, 1, 1, 0, 0, 0]); + validate_u64_array(histograms.value(1), &[0, 1, 1, 0, 1, 0]); + } + + #[test] + fn test_sparse() { + let snapshots = build_snapshots(); + let tmpfile = write_parquet( + snapshots, + ParquetOptions::new().with_histogram_type(ParquetHistogramType::Sparse), + ); + let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); + + // Check row groups + assert_eq!(builder.metadata().row_groups().len(), 1); + assert_eq!(builder.metadata().row_group(0).num_rows(), 2); + + // Check schema + let fields: Vec<&String> = builder.schema().fields().iter().map(|x| x.name()).collect(); + let expected = vec![ + "timestamp", + "counter", + "gauge", + "histogram:bucket_indices", + "histogram:bucket_counts", + ]; + assert_eq!(fields.len(), expected.len()); + assert_eq!(fields, expected); + + // Check data + let batch = builder.build().unwrap().next().unwrap().unwrap(); + assert_eq!(batch.num_columns(), 5); + assert_eq!(batch.num_rows(), 2); + + validate_u64_array(batch.column(1).clone(), &[100, 121]); + validate_i64_array(batch.column(2).clone(), &[16, 6]); + + let indices = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + validate_u64_array(indices.value(0), &[1, 2]); + validate_u64_array(indices.value(1), &[1, 2, 4]); + + let counts = batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + validate_u64_array(counts.value(0), &[1, 1]); + validate_u64_array(counts.value(1), &[1, 1, 1]); } } From a21426ae3653dd6982734102c2b2d4b6305ab07d Mon Sep 17 00:00:00 2001 From: Mihir Nanavati Date: Tue, 28 May 2024 18:04:17 -0400 Subject: [PATCH 2/4] Fix clippy warning --- metriken-exposition/src/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metriken-exposition/src/parquet.rs b/metriken-exposition/src/parquet.rs index 3dcace2..3d62524 100644 --- a/metriken-exposition/src/parquet.rs +++ b/metriken-exposition/src/parquet.rs @@ -479,7 +479,7 @@ mod tests { .finalize(tmpfile.try_clone().unwrap(), options) .unwrap(); for s in &snapshots { - let _ = writer.push(s.clone()).unwrap(); + let _ = writer.push(s.clone()); } let _ = writer.finalize(); From f18ebf1548a0818796840b3d199d077b5864ff5b Mon Sep 17 00:00:00 2001 From: Mihir Nanavati Date: Tue, 28 May 2024 18:39:01 -0400 Subject: [PATCH 3/4] s/with_/set_ --- metriken-exposition/src/parquet.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metriken-exposition/src/parquet.rs b/metriken-exposition/src/parquet.rs index 3d62524..f02c1c7 100644 --- a/metriken-exposition/src/parquet.rs +++ b/metriken-exposition/src/parquet.rs @@ -71,7 +71,7 @@ impl ParquetOptions { /// Sets the compression level for the parquet file. The default is no /// compression. Set the compression level to a corresponding zstd level to /// enable compression. - pub fn with_compression(mut self, compression: ParquetCompression) -> Self { + pub fn set_compression(mut self, compression: ParquetCompression) -> Self { self.compression = compression; self } @@ -79,14 +79,14 @@ impl ParquetOptions { /// Sets the number of rows to be cache in memory before being written as a /// `RecordBatch`. Large values have better performance at the cost of /// additional memory usage. The default is ~1M rows (2^20). - pub fn with_max_batch_size(mut self, batch_size: usize) -> Self { + pub fn set_max_batch_size(mut self, batch_size: usize) -> Self { self.max_batch_size = batch_size; self } /// Sets the type for histogram data: standard or sparse. The default is /// the standard (dense) histogram. - pub fn with_histogram_type(mut self, histogram: ParquetHistogramType) -> Self { + pub fn set_histogram_type(mut self, histogram: ParquetHistogramType) -> Self { self.histogram_type = histogram; self } @@ -247,7 +247,7 @@ impl ParquetSchema { } ParquetHistogramType::Sparse => { // merge metric annotations into the metric metadata - metadata.insert("metric_type".to_string(), "sparse histogram".to_string()); + metadata.insert("metric_type".to_string(), "sparse_histogram".to_string()); fields.push( Field::new( @@ -500,7 +500,7 @@ mod tests { #[test] fn test_row_groups() { let snapshots = build_snapshots(); - let tmpfile = write_parquet(snapshots, ParquetOptions::new().with_max_batch_size(1)); + let tmpfile = write_parquet(snapshots, ParquetOptions::new().set_max_batch_size(1)); let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); // Check row groups @@ -547,7 +547,7 @@ mod tests { let snapshots = build_snapshots(); let tmpfile = write_parquet( snapshots, - ParquetOptions::new().with_histogram_type(ParquetHistogramType::Sparse), + ParquetOptions::new().set_histogram_type(ParquetHistogramType::Sparse), ); let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); From 6d74329587c5331039dc712efc309c0ac130e08a Mon Sep 17 00:00:00 2001 From: Mihir Nanavati Date: Wed, 29 May 2024 00:24:25 -0400 Subject: [PATCH 4/4] s/set_/ --- metriken-exposition/src/parquet.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/metriken-exposition/src/parquet.rs b/metriken-exposition/src/parquet.rs index f02c1c7..0424f38 100644 --- a/metriken-exposition/src/parquet.rs +++ b/metriken-exposition/src/parquet.rs @@ -12,6 +12,19 @@ use parquet::format::{FileMetaData, KeyValue}; use crate::snapshot::{HashedSnapshot, Snapshot}; +/// The batch size (or maximum row group size) is the number of rows that +/// the `ArrowWriter` caches in memory before attempting to write them to +/// the file as a single row group. The size of the row group represents +/// a trade-off on a few axes: larger row groups have better compression, +/// but also a larger memory footprint during creation. Operations on a +/// parquet file can also be parallelized per-row group, so too few row +/// groups limit the number of cores that can operate on the file. +/// +/// The general recommendation is to have at least as many row groups +/// as cores and benchmarks from DuckDB show that the value of larger +/// row groups starts tapering after 50-100K (though this is dependent +/// on the workload). The default for the `ArrowWriter` is 1M, which is +/// too large for histograms, so pick a more conservative default. const DEFAULT_MAX_BATCH_SIZE: usize = 50_000; #[derive(Clone, Debug)] @@ -71,7 +84,7 @@ impl ParquetOptions { /// Sets the compression level for the parquet file. The default is no /// compression. Set the compression level to a corresponding zstd level to /// enable compression. - pub fn set_compression(mut self, compression: ParquetCompression) -> Self { + pub fn compression(mut self, compression: ParquetCompression) -> Self { self.compression = compression; self } @@ -79,14 +92,14 @@ impl ParquetOptions { /// Sets the number of rows to be cache in memory before being written as a /// `RecordBatch`. Large values have better performance at the cost of /// additional memory usage. The default is ~1M rows (2^20). - pub fn set_max_batch_size(mut self, batch_size: usize) -> Self { + pub fn max_batch_size(mut self, batch_size: usize) -> Self { self.max_batch_size = batch_size; self } /// Sets the type for histogram data: standard or sparse. The default is /// the standard (dense) histogram. - pub fn set_histogram_type(mut self, histogram: ParquetHistogramType) -> Self { + pub fn histogram_type(mut self, histogram: ParquetHistogramType) -> Self { self.histogram_type = histogram; self } @@ -500,7 +513,7 @@ mod tests { #[test] fn test_row_groups() { let snapshots = build_snapshots(); - let tmpfile = write_parquet(snapshots, ParquetOptions::new().set_max_batch_size(1)); + let tmpfile = write_parquet(snapshots, ParquetOptions::new().max_batch_size(1)); let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap(); // Check row groups @@ -547,7 +560,7 @@ mod tests { let snapshots = build_snapshots(); let tmpfile = write_parquet( snapshots, - ParquetOptions::new().set_histogram_type(ParquetHistogramType::Sparse), + ParquetOptions::new().histogram_type(ParquetHistogramType::Sparse), ); let builder = ParquetRecordBatchReaderBuilder::try_new(tmpfile).unwrap();