Skip to content

Commit

Permalink
refine interface of manifest writer:
Browse files Browse the repository at this point in the history
1. adopt factory method to build different type manifest writer
2. provide add, exist, delete method
  • Loading branch information
ZENOTME committed Dec 19, 2024
1 parent 74a85e7 commit 491d60f
Show file tree
Hide file tree
Showing 4 changed files with 643 additions and 274 deletions.
55 changes: 25 additions & 30 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,8 @@ mod tests {
use super::*;
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata,
DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -263,37 +262,33 @@ mod tests {
let current_partition_spec = self.table.metadata().default_partition_spec();

// Write data files
let data_file_manifest = ManifestWriter::new(
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
current_snapshot.snapshot_id(),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.key_metadata(None)
.build()
.unwrap(),
)
.build()],
))
.await
.unwrap();
.build_v2_data();
writer
.add(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_file_manifest = writer.to_manifest_file().await.unwrap();

// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
Expand Down
37 changes: 20 additions & 17 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,9 +974,9 @@ mod tests {
use crate::io::{FileIO, OutputFile};
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PrimitiveType,
Schema, Struct, TableMetadata, Type,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -1049,20 +1049,16 @@ mod tests {
let current_partition_spec = self.table.metadata().default_partition_spec();

// Write data files
let data_file_manifest = ManifestWriter::new(
let mut writer = ManifestWriterBuilder::new(
self.next_manifest_file(),
current_snapshot.snapshot_id(),
vec![],
current_schema.clone(),
current_partition_spec.as_ref().clone(),
)
.write(Manifest::new(
ManifestMetadata::builder()
.schema(current_schema.clone())
.content(ManifestContentType::Data)
.format_version(FormatVersion::V2)
.partition_spec((**current_partition_spec).clone())
.schema_id(current_schema.schema_id())
.build(),
vec![
.build_v2_data();
writer
.add(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
Expand All @@ -1078,6 +1074,10 @@ mod tests {
.unwrap(),
)
.build(),
)
.unwrap();
writer
.delete(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.snapshot_id(parent_snapshot.snapshot_id())
Expand All @@ -1095,6 +1095,10 @@ mod tests {
.unwrap(),
)
.build(),
)
.unwrap();
writer
.existing(
ManifestEntry::builder()
.status(ManifestStatus::Existing)
.snapshot_id(parent_snapshot.snapshot_id())
Expand All @@ -1112,10 +1116,9 @@ mod tests {
.unwrap(),
)
.build(),
],
))
.await
.unwrap();
)
.unwrap();
let data_file_manifest = writer.to_manifest_file().await.unwrap();

// Write to manifest list
let mut manifest_list_write = ManifestListWriter::v2(
Expand Down
Loading

0 comments on commit 491d60f

Please sign in to comment.