Skip to content

Commit

Permalink
merge origin/main
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 committed Dec 31, 2024
2 parents fc780a2 + 328e18e commit 940ddc1
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 82 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ cargo 1.69.0 (6e9a83356 2023-04-12)

Currently, iceberg-rust uses Docker to set up environment for integration tests. Native Docker has some limitations, please check (https://github.com/apache/iceberg-rust/pull/748). Please use Orbstack or Podman.

For MacOS users, you can install [OrbStack](https://orbstack.dev/) as a docker alternative.
For MacOS users, you can install [OrbStack as a docker alternative](docs/contributing/orbstack.md).

For podman, refer to [Using Podman instead of Docker](docs/contributing/podman.md)

Expand Down
41 changes: 29 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/catalog/s3tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ keywords = ["iceberg", "sql", "catalog"]
anyhow = { workspace = true }
async-trait = { workspace = true }
aws-config = { workspace = true }
aws-sdk-s3tables = "1.0.0"
aws-sdk-s3tables = "1.2.0"
iceberg = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
Expand Down
124 changes: 60 additions & 64 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,67 +25,53 @@ use arrow_array::builder::{
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
use async_trait::async_trait;

use crate::io::FileIO;
use crate::spec::TableMetadataRef;
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

/// Table metadata scan.
/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table.
///
/// Used to inspect a table's history, snapshots, and other metadata as a table.
///
/// See also <https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables>.
/// References:
/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39>
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
pub struct MetadataScan {
metadata_ref: TableMetadataRef,
io: FileIO,
}
pub struct MetadataTable(Table);

impl MetadataScan {
impl MetadataTable {
/// Creates a new metadata scan.
pub fn new(table: &Table) -> Self {
Self {
metadata_ref: table.metadata_ref(),
io: table.file_io().clone(),
}
pub(super) fn new(table: Table) -> Self {
Self(table)
}

/// Returns the snapshots of the table.
pub async fn snapshots(&self) -> Result<RecordBatch> {
SnapshotsTable::scan(self).await
/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable {
metadata_table: self,
}
}

/// Returns the manifests of the table.
pub async fn manifests(&self) -> Result<RecordBatch> {
ManifestsTable::scan(self).await
/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable {
metadata_table: self,
}
}
}

/// Table metadata scan.
///
/// Use to inspect a table's history, snapshots, and other metadata as a table.
///
/// References:
/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39>
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[async_trait]
pub trait MetadataTable {
/// Returns the schema of the metadata table.
fn schema() -> Schema;

/// Scans the metadata table.
async fn scan(scan: &MetadataScan) -> Result<RecordBatch>;
fn metadata(&self) -> &TableMetadata {
self.0.metadata()
}
}

/// Snapshots table.
pub struct SnapshotsTable;
pub struct SnapshotsTable<'a> {
metadata_table: &'a MetadataTable,
}

#[async_trait]
impl MetadataTable for SnapshotsTable {
fn schema() -> Schema {
impl<'a> SnapshotsTable<'a> {
/// Returns the schema of the snapshots table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new(
"committed_at",
Expand Down Expand Up @@ -117,7 +103,8 @@ impl MetadataTable for SnapshotsTable {
])
}

async fn scan(scan: &MetadataScan) -> Result<RecordBatch> {
/// Scans the snapshots table.
pub fn scan(&self) -> Result<RecordBatch> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -126,7 +113,7 @@ impl MetadataTable for SnapshotsTable {
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in scan.metadata_ref.snapshots() {
for snapshot in self.metadata_table.metadata().snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
Expand All @@ -139,7 +126,7 @@ impl MetadataTable for SnapshotsTable {
summary.append(true)?;
}

Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Expand All @@ -151,22 +138,21 @@ impl MetadataTable for SnapshotsTable {
}

/// Manifests table.
pub struct ManifestsTable;
pub struct ManifestsTable<'a> {
metadata_table: &'a MetadataTable,
}

impl ManifestsTable {
fn partition_summary_fields() -> Vec<Field> {
impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}
}

#[async_trait]
impl MetadataTable for ManifestsTable {
fn schema() -> Schema {
fn schema(&self) -> Schema {
Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Expand All @@ -183,15 +169,16 @@ impl MetadataTable for ManifestsTable {
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
ManifestsTable::partition_summary_fields(),
self.partition_summary_fields(),
false,
))),
false,
),
])
}

async fn scan(scan: &MetadataScan) -> Result<RecordBatch> {
/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -204,18 +191,21 @@ impl MetadataTable for ManifestsTable {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(ManifestsTable::partition_summary_fields()),
Fields::from(self.partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
ManifestsTable::partition_summary_fields(),
self.partition_summary_fields(),
false,
)));

if let Some(snapshot) = scan.metadata_ref.current_snapshot() {
if let Some(snapshot) = self.metadata_table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(&scan.io, &scan.metadata_ref)
.load_manifest_list(
&self.metadata_table.0.file_io(),
&self.metadata_table.0.metadata_ref(),
)
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content.clone() as i8);
Expand Down Expand Up @@ -259,7 +249,7 @@ impl MetadataTable for ManifestsTable {
}
}

Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand Down Expand Up @@ -331,10 +321,10 @@ mod tests {
));
}

#[tokio::test]
async fn test_snapshots_table() {
#[test]
fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let record_batch = table.metadata_scan().snapshots().await.unwrap();
let record_batch = table.metadata_table().snapshots().scan().unwrap();
check_record_batch(
record_batch,
expect![[r#"
Expand Down Expand Up @@ -407,7 +397,13 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture.table.metadata_scan().manifests().await.unwrap();
let record_batch = fixture
.table
.metadata_table()
.manifests()
.scan()
.await
.unwrap();

check_record_batch(
record_batch,
Expand Down
9 changes: 5 additions & 4 deletions crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use crate::arrow::ArrowReaderBuilder;
use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::metadata_scan::MetadataScan;
use crate::metadata_scan::MetadataTable;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};
Expand Down Expand Up @@ -201,9 +201,10 @@ impl Table {
TableScanBuilder::new(self)
}

/// Creates a metadata scan. See [`MetadataScan`] for more details.
pub fn metadata_scan(&self) -> MetadataScan {
MetadataScan::new(self)
/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn metadata_table(self) -> MetadataTable {
MetadataTable::new(self)
}

/// Returns the flag indicating whether the `Table` is readonly or not
Expand Down
Loading

0 comments on commit 940ddc1

Please sign in to comment.