Skip to content

Commit

Permalink
Merge pull request #58 from JanKaul/fix-manifest-entry-status
Browse files Browse the repository at this point in the history
fix the status of the manifest entry for existing files
  • Loading branch information
JanKaul authored Nov 12, 2024
2 parents c3fb3a1 + 9b3011d commit b8dee87
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
4 changes: 4 additions & 0 deletions iceberg-rust-spec/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl ManifestEntry {
pub fn builder() -> ManifestEntryBuilder {
ManifestEntryBuilder::default()
}

pub fn status_mut(&mut self) -> &mut Status {
&mut self.status
}
}

impl ManifestEntry {
Expand Down
18 changes: 14 additions & 4 deletions iceberg-rust/src/table/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use std::{
};

use apache_avro::{
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
Writer as AvroWriter,
};
use iceberg_rust_spec::{
manifest::{ManifestEntry, ManifestEntryV1, ManifestEntryV2},
manifest::{ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
manifest_list::{self, FieldSummary, ManifestListEntry},
partition::{PartitionField, PartitionSpec},
schema::{Schema, SchemaV1, SchemaV2},
Expand Down Expand Up @@ -202,7 +203,7 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
table_metadata: &'metadata TableMetadata,
branch: Option<&str>,
) -> Result<Self, Error> {
let manifest_reader = apache_avro::Reader::new(bytes)?;
let manifest_reader = ManifestReader::new(bytes)?;

let mut writer = AvroWriter::new(schema, Vec::new());

Expand Down Expand Up @@ -254,7 +255,16 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {

writer.add_user_metadata("content".to_string(), "data")?;

writer.extend(manifest_reader.filter_map(Result::ok))?;
writer.extend(
manifest_reader
.map(|entry| {
let mut entry = entry
.map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))?;
*entry.status_mut() = Status::Existing;
to_value(entry)
})
.filter_map(Result::ok),
)?;

Ok(ManifestWriter {
manifest,
Expand Down
9 changes: 7 additions & 2 deletions iceberg-rust/src/table/transaction/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,13 @@ impl Operation {
.await?
.into();

let manifest_reader =
ManifestReader::new(&*manifest_bytes)?.map(|x| x.map_err(Error::from));
let manifest_reader = ManifestReader::new(&*manifest_bytes)?
.map(|x| x.map_err(Error::from))
.map(|entry| {
let mut entry = entry?;
*entry.status_mut() = Status::Existing;
Ok(entry)
});

split_datafiles(
new_datafile_iter.chain(manifest_reader),
Expand Down

0 comments on commit b8dee87

Please sign in to comment.