diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 942d7605c..16604d781 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -19,12 +19,13 @@ use std::sync::Arc; -use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; -use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::builder::{ + BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, +}; +use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; -use crate::spec::TableMetadata; use crate::table::Table; use crate::Result; @@ -45,19 +46,18 @@ impl MetadataTable { /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { - metadata_table: self, - } + SnapshotsTable { table: &self.0 } } - fn metadata(&self) -> &TableMetadata { - self.0.metadata() + /// Get the manifests table. + pub fn manifests(&self) -> ManifestsTable { + ManifestsTable { table: &self.0 } } } /// Snapshots table. pub struct SnapshotsTable<'a> { - metadata_table: &'a MetadataTable, + table: &'a Table, } impl<'a> SnapshotsTable<'a> { @@ -104,7 +104,7 @@ impl<'a> SnapshotsTable<'a> { let mut manifest_list = StringBuilder::new(); let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - for snapshot in self.metadata_table.metadata().snapshots() { + for snapshot in self.table.metadata().snapshots() { committed_at.append_value(snapshot.timestamp_ms()); snapshot_id.append_value(snapshot.snapshot_id()); parent_id.append_option(snapshot.parent_snapshot_id()); @@ -128,6 +128,133 @@ impl<'a> SnapshotsTable<'a> { } } +/// Manifests table. +pub struct ManifestsTable<'a> { + table: &'a Table, +} + +impl<'a> ManifestsTable<'a> { + fn partition_summary_fields(&self) -> Vec { + vec![ + Field::new("contains_null", DataType::Boolean, false), + Field::new("contains_nan", DataType::Boolean, true), + Field::new("lower_bound", DataType::Utf8, true), + Field::new("upper_bound", DataType::Utf8, true), + ] + } + + /// Returns the schema of the manifests table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new("content", DataType::Int8, false), + Field::new("path", DataType::Utf8, false), + Field::new("length", DataType::Int64, false), + Field::new("partition_spec_id", DataType::Int32, false), + Field::new("added_snapshot_id", DataType::Int64, false), + Field::new("added_data_files_count", DataType::Int32, false), + Field::new("existing_data_files_count", DataType::Int32, false), + Field::new("deleted_data_files_count", DataType::Int32, false), + Field::new("added_delete_files_count", DataType::Int32, false), + Field::new("existing_delete_files_count", DataType::Int32, false), + Field::new("deleted_delete_files_count", DataType::Int32, false), + Field::new( + "partition_summaries", + DataType::List(Arc::new(Field::new_struct( + "item", + self.partition_summary_fields(), + false, + ))), + false, + ), + ]) + } + + /// Scans the manifests table. + pub async fn scan(&self) -> Result { + let mut content = PrimitiveBuilder::::new(); + let mut path = StringBuilder::new(); + let mut length = PrimitiveBuilder::::new(); + let mut partition_spec_id = PrimitiveBuilder::::new(); + let mut added_snapshot_id = PrimitiveBuilder::::new(); + let mut added_data_files_count = PrimitiveBuilder::::new(); + let mut existing_data_files_count = PrimitiveBuilder::::new(); + let mut deleted_data_files_count = PrimitiveBuilder::::new(); + let mut added_delete_files_count = PrimitiveBuilder::::new(); + let mut existing_delete_files_count = PrimitiveBuilder::::new(); + let mut deleted_delete_files_count = PrimitiveBuilder::::new(); + let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( + Fields::from(self.partition_summary_fields()), + 0, + )) + .with_field(Arc::new(Field::new_struct( + "item", + self.partition_summary_fields(), + false, + ))); + + if let Some(snapshot) = self.table.metadata().current_snapshot() { + let manifest_list = snapshot + .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + .await?; + for manifest in manifest_list.entries() { + content.append_value(manifest.content as i8); + path.append_value(manifest.manifest_path.clone()); + length.append_value(manifest.manifest_length); + partition_spec_id.append_value(manifest.partition_spec_id); + added_snapshot_id.append_value(manifest.added_snapshot_id); + added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_data_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_data_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + added_delete_files_count + .append_value(manifest.added_files_count.unwrap_or(0) as i32); + existing_delete_files_count + .append_value(manifest.existing_files_count.unwrap_or(0) as i32); + deleted_delete_files_count + .append_value(manifest.deleted_files_count.unwrap_or(0) as i32); + + let partition_summaries_builder = partition_summaries.values(); + for summary in &manifest.partitions { + partition_summaries_builder + .field_builder::(0) + .unwrap() + .append_value(summary.contains_null); + partition_summaries_builder + .field_builder::(1) + .unwrap() + .append_option(summary.contains_nan); + partition_summaries_builder + .field_builder::(2) + .unwrap() + .append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder + .field_builder::(3) + .unwrap() + .append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); + partition_summaries_builder.append(true); + } + partition_summaries.append(true); + } + } + + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(content.finish()), + Arc::new(path.finish()), + Arc::new(length.finish()), + Arc::new(partition_spec_id.finish()), + Arc::new(added_snapshot_id.finish()), + Arc::new(added_data_files_count.finish()), + Arc::new(existing_data_files_count.finish()), + Arc::new(deleted_data_files_count.finish()), + Arc::new(added_delete_files_count.finish()), + Arc::new(existing_delete_files_count.finish()), + Arc::new(deleted_delete_files_count.finish()), + Arc::new(partition_summaries.finish()), + ])?) + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; @@ -253,4 +380,106 @@ mod tests { Some("committed_at"), ); } + + #[tokio::test] + async fn test_manifests_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let record_batch = fixture + .table + .metadata_table() + .manifests() + .scan() + .await + .unwrap(); + + check_record_batch( + record_batch, + expect![[r#" + Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + content: PrimitiveArray + [ + 0, + ], + path: (skipped), + length: (skipped), + partition_spec_id: PrimitiveArray + [ + 0, + ], + added_snapshot_id: PrimitiveArray + [ + 3055729675574597004, + ], + added_data_files_count: PrimitiveArray + [ + 1, + ], + existing_data_files_count: PrimitiveArray + [ + 1, + ], + deleted_data_files_count: PrimitiveArray + [ + 1, + ], + added_delete_files_count: PrimitiveArray + [ + 1, + ], + existing_delete_files_count: PrimitiveArray + [ + 1, + ], + deleted_delete_files_count: PrimitiveArray + [ + 1, + ], + partition_summaries: ListArray + [ + StructArray + -- validity: + [ + valid, + ] + [ + -- child 0: "contains_null" (Boolean) + BooleanArray + [ + false, + ] + -- child 1: "contains_nan" (Boolean) + BooleanArray + [ + false, + ] + -- child 2: "lower_bound" (Utf8) + StringArray + [ + "100", + ] + -- child 3: "upper_bound" (Utf8) + StringArray + [ + "300", + ] + ], + ]"#]], + &["path", "length"], + Some("path"), + ); + } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index cb3e5d8c8..5a97e74e7 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1050,7 +1050,7 @@ pub mod tests { .unwrap() } - async fn setup_manifest_files(&mut self) { + pub async fn setup_manifest_files(&mut self) { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let parent_snapshot = current_snapshot .parent_snapshot(self.table.metadata()) diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 97d259ad3..4618c8a4f 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -597,7 +597,7 @@ impl ManifestFile { } /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests -#[derive(Debug, PartialEq, Clone, Eq)] +#[derive(Debug, PartialEq, Clone, Copy, Eq)] pub enum ManifestContentType { /// The manifest content is data. Data = 0,