diff --git a/crates/iceberg/src/puffin/blob.rs b/crates/iceberg/src/puffin/blob.rs new file mode 100644 index 000000000..2b1a0f697 --- /dev/null +++ b/crates/iceberg/src/puffin/blob.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library. +pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1"; + +/// The blob +#[derive(Debug, PartialEq, Clone)] +pub(crate) struct Blob { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + pub(crate) fields: Vec, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The actual blob data + pub(crate) data: Vec, + /// Arbitrary meta-information about the blob + pub(crate) properties: HashMap, +} diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 91bdf125f..ee8c8077a 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -21,8 +21,10 @@ // Temporarily allowing this while crate is under active development #![allow(dead_code)] +mod blob; mod compression; mod metadata; +mod reader; #[cfg(test)] mod test_utils; diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs new file mode 100644 index 000000000..072ffdb85 --- /dev/null +++ b/crates/iceberg/src/puffin/reader.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::{FileRead, InputFile}; +use crate::puffin::blob::Blob; +use crate::puffin::metadata::{BlobMetadata, FileMetadata}; +use crate::Result; + +/// Puffin reader +pub(crate) struct PuffinReader { + input_file: InputFile, + file_metadata: Option, +} + +impl PuffinReader { + /// Returns a new Puffin reader + pub(crate) fn new(input_file: InputFile) -> Self { + Self { + input_file, + file_metadata: None, + } + } + + /// Returns file metadata + pub(crate) async fn file_metadata(&mut self) -> Result<&FileMetadata> { + if let Some(ref file_metadata) = self.file_metadata { + Ok(file_metadata) + } else { + let file_metadata = FileMetadata::read(&self.input_file).await?; + Ok(self.file_metadata.insert(file_metadata)) + } + } + + /// Returns blob + pub(crate) async fn blob(&self, blob_metadata: BlobMetadata) -> Result { + let file_read = self.input_file.reader().await?; + let start = blob_metadata.offset; + let end = start + blob_metadata.length; + let bytes = file_read.read(start..end).await?.to_vec(); + let data = blob_metadata.compression_codec.decompress(bytes)?; + + Ok(Blob { + r#type: blob_metadata.r#type, + fields: blob_metadata.fields, + snapshot_id: blob_metadata.snapshot_id, + sequence_number: blob_metadata.sequence_number, + data, + properties: blob_metadata.properties, + }) + } +} + +#[cfg(test)] +mod tests { + + use crate::puffin::reader::PuffinReader; + use crate::puffin::test_utils::{ + blob_0, blob_1, java_uncompressed_metric_input_file, + java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata, + zstd_compressed_metric_file_metadata, + }; + + #[tokio::test] + async fn test_puffin_reader_uncompressed_metric_data() { + let input_file = java_uncompressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, uncompressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap().clone()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap().clone()) + .await + .unwrap(), + blob_1(), + ) + } + + #[tokio::test] + async fn test_puffin_reader_zstd_compressed_metric_data() { + let input_file = java_zstd_compressed_metric_input_file(); + let mut puffin_reader = PuffinReader::new(input_file); + + let file_metadata = puffin_reader.file_metadata().await.unwrap().clone(); + assert_eq!(file_metadata, zstd_compressed_metric_file_metadata()); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.first().unwrap().clone()) + .await + .unwrap(), + blob_0() + ); + + assert_eq!( + puffin_reader + .blob(file_metadata.blobs.get(1).unwrap().clone()) + .await + .unwrap(), + blob_1(), + ) + } +} diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index e49e51d50..8efb13b9a 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; +use super::blob::Blob; use crate::io::{FileIOBuilder, InputFile}; use crate::puffin::compression::CompressionCodec; use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY}; @@ -68,6 +69,7 @@ pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob"; pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1]; pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2; pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi"; pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { BlobMetadata { @@ -95,10 +97,23 @@ pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata { } } +pub(crate) fn blob_0() -> Blob { + Blob { + r#type: METRIC_BLOB_0_TYPE.to_string(), + fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, + data: METRIC_BLOB_0_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob"; pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2]; pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2; pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1; +pub(crate) const METRIC_BLOB_1_DATA: &str = + "some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?"; pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata { BlobMetadata { @@ -126,6 +141,17 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { } } +pub(crate) fn blob_1() -> Blob { + Blob { + r#type: METRIC_BLOB_1_TYPE.to_string(), + fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(), + snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID, + sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, + data: METRIC_BLOB_1_DATA.as_bytes().to_vec(), + properties: HashMap::new(), + } +} + pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234"; pub(crate) fn file_properties() -> HashMap {