Skip to content

Conversation

@sgrebnov
Copy link
Member

@sgrebnov sgrebnov commented Oct 2, 2025

Which issue does this PR close?

PR fixes schema mismatch errors (similar to the example shown below) when using IcebergCommitExec with DataFusion. This occurs when IcebergCommitExec is not the top-level plan but is instead wrapped as the input to another plan node, for example when added by a custom optimization rule (cache invalidation step for example).

An internal error occurred. Internal error: PhysicalOptimizer rule 'OutputRequirements' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "count", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, got new schema: Schema { fields: [Field { name: "r_regionkey", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "r_name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "r_comment", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }], metadata: {} }.
This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues

What changes are included in this PR?

PR updates compute_properties logic to use target (output) schema instead of input schema. Below is example DataFusion DataSinkExec implementation demonstrating that properties must be created based on target schema, not input.

https://github.com/apache/datafusion/blob/4eacb6046773b759dae0b3d801fe8cb1c6b65c0f/datafusion/datasource/src/sink.rs#L101C1-L117C6

impl DataSinkExec {
    /// Create a plan to write to `sink`
    pub fn new(
        input: Arc<dyn ExecutionPlan>,
        sink: Arc<dyn DataSink>,
        sort_order: Option<LexRequirement>,
    ) -> Self {
        let count_schema = make_count_schema();
        let cache = Self::create_schema(&input, count_schema);
        Self {
            input,
            sink,
            count_schema: make_count_schema(),
            sort_order,
            cache,
        }
    }
....

    fn properties(&self) -> &PlanProperties {
        &self.cache
    }

Are these changes tested?

Tested manually, expanded existing test to verify output schema, tested as part of Spice Iceberg write automated tests

Copy link
Contributor

@CTTY CTTY left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix!

I'm questioning if we actually need to store table schema in the IcebergCommitExec. Commit node should be taking data files and rows count, and output only the sum of rows count, and don't really need to be aware of table schema. But either way, this can be done in a separate PR and should not block this fix

@sgrebnov
Copy link
Member Author

sgrebnov commented Oct 12, 2025

@CTTY @liurenjie1024 - thank you for review! Please let me know if there’s anything to improve before we can merge this

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sgrebnov for this fix!

@liurenjie1024 liurenjie1024 merged commit ec545f2 into apache:main Oct 14, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants