Skip to content

Commit

Permalink
Metadata table scans as streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Jan 3, 2025
1 parent 1c632b8 commit 359bbf2
Showing 1 changed file with 62 additions and 32 deletions.
94 changes: 62 additions & 32 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ use arrow_array::builder::{
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use futures::StreamExt;

use crate::io::FileIO;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -95,7 +99,17 @@ impl<'a> SnapshotsTable<'a> {
}

/// Scans the snapshots table.
pub fn scan(&self) -> Result<RecordBatch> {
pub fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();

Ok(
futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) })
.boxed(),
)
}

fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result<RecordBatch> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -104,7 +118,7 @@ impl<'a> SnapshotsTable<'a> {
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in self.table.metadata().snapshots() {
for snapshot in table_metadata.snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
Expand All @@ -117,7 +131,7 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Expand All @@ -134,7 +148,7 @@ pub struct ManifestsTable<'a> {
}

impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
fn partition_summary_fields() -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Expand All @@ -161,7 +175,7 @@ impl<'a> ManifestsTable<'a> {
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
))),
false,
Expand All @@ -170,7 +184,22 @@ impl<'a> ManifestsTable<'a> {
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
pub fn scan(&self) -> Result<ArrowRecordBatchStream> {
let arrow_schema = Arc::new(self.schema());
let table_metadata = self.table.metadata_ref();
let file_io = self.table.file_io().clone();

Ok(futures::stream::once(async move {
Self::build_batch(arrow_schema, &table_metadata, &file_io).await
})
.boxed())
}

async fn build_batch(
arrow_schema: SchemaRef,
table_metadata: &TableMetadata,
file_io: &FileIO,
) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -183,19 +212,17 @@ impl<'a> ManifestsTable<'a> {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
)));

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
if let Some(snapshot) = table_metadata.current_snapshot() {
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
path.append_value(manifest.manifest_path.clone());
Expand Down Expand Up @@ -238,7 +265,7 @@ impl<'a> ManifestsTable<'a> {
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Ok(RecordBatch::try_new(arrow_schema, vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand All @@ -257,7 +284,9 @@ impl<'a> ManifestsTable<'a> {

#[cfg(test)]
mod tests {
use arrow_select::concat::concat_batches;
use expect_test::{expect, Expect};
use futures::TryStreamExt;
use itertools::Itertools;

use super::*;
Expand All @@ -271,13 +300,20 @@ mod tests {
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
fn check_record_batch(
record_batch: RecordBatch,
async fn check_record_batches(
batch_stream: ArrowRecordBatchStream,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
assert!(!record_batches.is_empty(), "Empty record batches");

// Combine record batches using the first batch's schema
let first_batch = record_batches.first().unwrap();
let record_batch = concat_batches(&first_batch.schema(), &record_batches).unwrap();

let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
Expand Down Expand Up @@ -310,12 +346,12 @@ mod tests {
));
}

#[test]
fn test_snapshots_table() {
#[tokio::test]
async fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let record_batch = table.metadata_table().snapshots().scan().unwrap();
check_record_batch(
record_batch,
let batch_stream = table.metadata_table().snapshots().scan().unwrap();
check_record_batches(
batch_stream,
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -378,24 +414,18 @@ mod tests {
]"#]],
&["manifest_list"],
Some("committed_at"),
);
).await;
}

#[tokio::test]
async fn test_manifests_table() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture
.table
.metadata_table()
.manifests()
.scan()
.await
.unwrap();
let batch_stream = fixture.table.metadata_table().manifests().scan().unwrap();

check_record_batch(
record_batch,
check_record_batches(
batch_stream,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -480,6 +510,6 @@ mod tests {
]"#]],
&["path", "length"],
Some("path"),
);
).await;
}
}

0 comments on commit 359bbf2

Please sign in to comment.