Skip to content

Commit ec545f2

Browse files
authored
Improve IcebergCommitExec to correctly populate properties/schema (#1721)
## Which issue does this PR close? PR fixes schema mismatch errors (similar to the example shown below) when using `IcebergCommitExec` with DataFusion. This occurs when `IcebergCommitExec` is not the top-level plan but is instead wrapped as the input to another plan node, for example when added by a custom optimization rule (cache invalidation step for example). >An internal error occurred. Internal error: PhysicalOptimizer rule 'OutputRequirements' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "count", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, got new schema: Schema { fields: [Field { name: "r_regionkey", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "r_name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "r_comment", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }], metadata: {} }. This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues ## What changes are included in this PR? PR updates `compute_properties` logic to use target (output) schema instead of input schema. Below is example DataFusion `DataSinkExec` implementation demonstrating that properties must be created based on target schema, not input. https://github.com/apache/datafusion/blob/4eacb6046773b759dae0b3d801fe8cb1c6b65c0f/datafusion/datasource/src/sink.rs#L101C1-L117C6 ```rust impl DataSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc<dyn ExecutionPlan>, sink: Arc<dyn DataSink>, sort_order: Option<LexRequirement>, ) -> Self { let count_schema = make_count_schema(); let cache = Self::create_schema(&input, count_schema); Self { input, sink, count_schema: make_count_schema(), sort_order, cache, } } .... fn properties(&self) -> &PlanProperties { &self.cache } ``` ## Are these changes tested? Tested manually, expanded existing test to verify output schema, tested as part of [Spice Iceberg write automated tests](https://github.com/spiceai/spiceai/blob/trunk/crates/runtime/tests/iceberg/write/mod.rs)
1 parent b4a44a3 commit ec545f2

File tree

1 file changed

+7
-2
lines changed
  • crates/integrations/datafusion/src/physical_plan

1 file changed

+7
-2
lines changed

crates/integrations/datafusion/src/physical_plan/commit.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,16 @@ impl IcebergCommitExec {
5757
input: Arc<dyn ExecutionPlan>,
5858
schema: ArrowSchemaRef,
5959
) -> Self {
60-
let plan_properties = Self::compute_properties(schema.clone());
60+
let count_schema = Self::make_count_schema();
61+
62+
let plan_properties = Self::compute_properties(Arc::clone(&count_schema));
6163

6264
Self {
6365
table,
6466
catalog,
6567
input,
6668
schema,
67-
count_schema: Self::make_count_schema(),
69+
count_schema,
6870
plan_properties,
6971
}
7072
}
@@ -469,6 +471,9 @@ mod tests {
469471
let commit_exec =
470472
IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema);
471473

474+
// Verify Execution Plan schema matches the count schema
475+
assert_eq!(commit_exec.schema(), IcebergCommitExec::make_count_schema());
476+
472477
// Execute the commit exec
473478
let task_ctx = Arc::new(TaskContext::default());
474479
let stream = commit_exec.execute(0, task_ctx)?;

0 commit comments

Comments
 (0)