Skip to content

Commit

Permalink
fix(delete): fix position delete (#8)
Browse files Browse the repository at this point in the history
* fix position

* fix comm

* fix comm
  • Loading branch information
xxhZs authored and xxchan committed Dec 29, 2024
1 parent ae45aef commit 3ed4198
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::spec::{DataContentType, Datum, PrimitiveType, Schema};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -189,12 +189,6 @@ impl ArrowReader {
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
Expand Down Expand Up @@ -246,13 +240,27 @@ impl ArrowReader {

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream = record_batch_stream_builder.build()?;

let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
// The schema of the xxx file doesn't change, so we don't need to convert the schema.
record_batch_stream.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
})
} else {
// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering.
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

record_batch_stream.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});
})
};

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}
Expand Down

0 comments on commit 3ed4198

Please sign in to comment.