From 25e89094bf859cefb159574c6d2b925d4148d992 Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Tue, 7 Jan 2025 10:41:54 +0100 Subject: [PATCH] Split metadata table into separate modules (#872) --- .../manifests.rs} | 235 +----------------- crates/iceberg/src/inspect/metadata_table.rs | 99 ++++++++ crates/iceberg/src/inspect/mod.rs | 26 ++ crates/iceberg/src/inspect/snapshots.rs | 183 ++++++++++++++ crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/table.rs | 2 +- 6 files changed, 320 insertions(+), 227 deletions(-) rename crates/iceberg/src/{metadata_scan.rs => inspect/manifests.rs} (56%) create mode 100644 crates/iceberg/src/inspect/metadata_table.rs create mode 100644 crates/iceberg/src/inspect/mod.rs create mode 100644 crates/iceberg/src/inspect/snapshots.rs diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/inspect/manifests.rs similarity index 56% rename from crates/iceberg/src/metadata_scan.rs rename to crates/iceberg/src/inspect/manifests.rs index 16604d781..ed9afc326 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -15,125 +15,29 @@ // specific language governing permissions and limitations // under the License. -//! Metadata table api. - use std::sync::Arc; use arrow_array::builder::{ - BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, + BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, }; -use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; +use arrow_array::types::{Int32Type, Int64Type, Int8Type}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, Fields, Schema}; use crate::table::Table; use crate::Result; -/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. -/// -/// References: -/// - -/// - -/// - -#[derive(Debug)] -pub struct MetadataTable(Table); - -impl MetadataTable { - /// Creates a new metadata scan. - pub(super) fn new(table: Table) -> Self { - Self(table) - } - - /// Get the snapshots table. - pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { table: &self.0 } - } - - /// Get the manifests table. - pub fn manifests(&self) -> ManifestsTable { - ManifestsTable { table: &self.0 } - } -} - -/// Snapshots table. -pub struct SnapshotsTable<'a> { - table: &'a Table, -} - -impl<'a> SnapshotsTable<'a> { - /// Returns the schema of the snapshots table. - pub fn schema(&self) -> Schema { - Schema::new(vec![ - Field::new( - "committed_at", - DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), - false, - ), - Field::new("snapshot_id", DataType::Int64, false), - Field::new("parent_id", DataType::Int64, true), - Field::new("operation", DataType::Utf8, false), - Field::new("manifest_list", DataType::Utf8, false), - Field::new( - "summary", - DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct( - vec![ - Field::new("keys", DataType::Utf8, false), - Field::new("values", DataType::Utf8, true), - ] - .into(), - ), - false, - )), - false, - ), - false, - ), - ]) - } - - /// Scans the snapshots table. - pub fn scan(&self) -> Result { - let mut committed_at = - PrimitiveBuilder::::new().with_timezone("+00:00"); - let mut snapshot_id = PrimitiveBuilder::::new(); - let mut parent_id = PrimitiveBuilder::::new(); - let mut operation = StringBuilder::new(); - let mut manifest_list = StringBuilder::new(); - let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); - - for snapshot in self.table.metadata().snapshots() { - committed_at.append_value(snapshot.timestamp_ms()); - snapshot_id.append_value(snapshot.snapshot_id()); - parent_id.append_option(snapshot.parent_snapshot_id()); - manifest_list.append_value(snapshot.manifest_list()); - operation.append_value(snapshot.summary().operation.as_str()); - for (key, value) in &snapshot.summary().additional_properties { - summary.keys().append_value(key); - summary.values().append_value(value); - } - summary.append(true)?; - } - - Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?) - } -} - /// Manifests table. pub struct ManifestsTable<'a> { table: &'a Table, } impl<'a> ManifestsTable<'a> { + /// Create a new Manifests table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + fn partition_summary_fields(&self) -> Vec { vec![ Field::new("contains_null", DataType::Boolean, false), @@ -257,130 +161,11 @@ impl<'a> ManifestsTable<'a> { #[cfg(test)] mod tests { - use expect_test::{expect, Expect}; - use itertools::Itertools; + use expect_test::expect; - use super::*; + use crate::inspect::metadata_table::tests::check_record_batch; use crate::scan::tests::TableTestFixture; - /// Snapshot testing to check the resulting record batch. - /// - /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, - /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, - /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). - /// Check the doc of [`expect_test`] for more details. - /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. - /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. - fn check_record_batch( - record_batch: RecordBatch, - expected_schema: Expect, - expected_data: Expect, - ignore_check_columns: &[&str], - sort_column: Option<&str>, - ) { - let mut columns = record_batch.columns().to_vec(); - if let Some(sort_column) = sort_column { - let column = record_batch.column_by_name(sort_column).unwrap(); - let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); - columns = columns - .iter() - .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) - .collect_vec(); - } - - expected_schema.assert_eq(&format!( - "{}", - record_batch.schema().fields().iter().format(",\n") - )); - expected_data.assert_eq(&format!( - "{}", - record_batch - .schema() - .fields() - .iter() - .zip_eq(columns) - .map(|(field, column)| { - if ignore_check_columns.contains(&field.name().as_str()) { - format!("{}: (skipped)", field.name()) - } else { - format!("{}: {:?}", field.name(), column) - } - }) - .format(",\n") - )); - } - - #[test] - fn test_snapshots_table() { - let table = TableTestFixture::new().table; - let record_batch = table.metadata_table().snapshots().scan().unwrap(); - check_record_batch( - record_batch, - expect![[r#" - Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, - Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], - expect![[r#" - committed_at: PrimitiveArray - [ - 2018-01-04T21:22:35.770+00:00, - 2019-04-12T20:29:15.770+00:00, - ], - snapshot_id: PrimitiveArray - [ - 3051729675574597004, - 3055729675574597004, - ], - parent_id: PrimitiveArray - [ - null, - 3051729675574597004, - ], - operation: StringArray - [ - "append", - "append", - ], - manifest_list: (skipped), - summary: MapArray - [ - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - ]"#]], - &["manifest_list"], - Some("committed_at"), - ); - } - #[tokio::test] async fn test_manifests_table() { let mut fixture = TableTestFixture::new(); diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs new file mode 100644 index 000000000..2e6055be8 --- /dev/null +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{ManifestsTable, SnapshotsTable}; +use crate::table::Table; + +/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. +/// +/// References: +/// - +/// - +/// - +#[derive(Debug)] +pub struct MetadataTable(Table); + +impl MetadataTable { + /// Creates a new metadata scan. + pub fn new(table: Table) -> Self { + Self(table) + } + + /// Get the snapshots table. + pub fn snapshots(&self) -> SnapshotsTable { + SnapshotsTable::new(&self.0) + } + + /// Get the manifests table. + pub fn manifests(&self) -> ManifestsTable { + ManifestsTable::new(&self.0) + } +} + +#[cfg(test)] +pub mod tests { + use arrow_array::RecordBatch; + use expect_test::Expect; + use itertools::Itertools; + + /// Snapshot testing to check the resulting record batch. + /// + /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, + /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, + /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). + /// Check the doc of [`expect_test`] for more details. + /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. + pub fn check_record_batch( + record_batch: RecordBatch, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, + ) { + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); + } +} diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs new file mode 100644 index 000000000..b64420ea1 --- /dev/null +++ b/crates/iceberg/src/inspect/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata table APIs. + +mod manifests; +mod metadata_table; +mod snapshots; + +pub use manifests::ManifestsTable; +pub use metadata_table::*; +pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs new file mode 100644 index 000000000..209fc65b1 --- /dev/null +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; + +use crate::table::Table; +use crate::Result; + +/// Snapshots table. +pub struct SnapshotsTable<'a> { + table: &'a Table, +} + +impl<'a> SnapshotsTable<'a> { + /// Create a new Snapshots table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the schema of the snapshots table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "committed_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("operation", DataType::Utf8, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new( + "summary", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + ]) + } + + /// Scans the snapshots table. + pub fn scan(&self) -> Result { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in self.table.metadata().snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; + } + + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use crate::inspect::metadata_table::tests::check_record_batch; + use crate::scan::tests::TableTestFixture; + + #[test] + fn test_snapshots_table() { + let table = TableTestFixture::new().table; + let record_batch = table.metadata_table().snapshots().scan().unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + operation: StringArray + [ + "append", + "append", + ], + manifest_list: (skipped), + summary: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + ]"#]], + &["manifest_list"], + Some("committed_at"), + ); + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 1946f35f3..fe5a52999 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,7 +73,7 @@ mod avro; pub mod io; pub mod spec; -pub mod metadata_scan; +pub mod inspect; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..a43281ce4 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -20,9 +20,9 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; +use crate::inspect::MetadataTable; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; -use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent};