Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Handle type widening in Delta streaming source #4042

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

johanl-db
Copy link
Collaborator

@johanl-db johanl-db commented Jan 13, 2025

Description

Streaming reads from a Delta source currently don't allow any form of type change to be propagated: the stream fails with a non-retryable error. Type widening introduced the ability to change the type of an existing column or field in a Delta table.

This change allows widening type changes to be propagated during streaming reads from a Delta source. The same mechanism as non-additive schema changes from column mapping (Drop/Rename) is applied.
To allow widening type changes to propagate, the user must:

  • Provide a schema tracking location via .option("schemaTrackingLocation", ..)
  • Review and accept the type change by setting one of the available SQL confs (detailed in the error message)

Note that the current check for column mapping had a loophole: as long as a schema tracking location was provided, it only gated column drop/rename but allowed any type changes to get through. This is fixed here by properly checking and rejecting non-widening type changes.

How was this patch tested?

  • Added unit tests in DeltaSourceMetadataEvolutionSupportSuite covering the logic to detect and gate non-additive schema changes.
  • Added test suite TypeWideningStreamingSourceSuite to cover stream reads from a delta source when a column is widened.

This PR introduces the following user-facing changes

When reading using a streaming query from a Delta table that had a column type widened:

Before this change:

The stream fails with a non-retryable error:

[DELTA_SCHEMA_CHANGED_WITH_VERSION]
Detected schema change in version <version>:
streaming source schema: <readSchema>
data file schema: <dataSchema>
Please try restarting the query. If this issue repeats across query restarts without
making progress, you have made an incompatible schema change and need to start your
query from scratch using a new checkpoint directory.

Note: users could get around this by providing a schema tracking location and applying a column drop or rename to their source. This allowed arbitrary type changes to go through unchecked.

After this change:

Users must provide a schema tracking location via .option("schemaTrackingLocation"), otherwise the stream fails with:

[DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG]
Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).
Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.
See <docLink> for more details.
Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.

When a schema tracking location is provided, non-widening type changes are now properly rejected and fail with error[DELTA_SCHEMA_CHANGED_WITH_VERSION]

When reading the batch that contains the type change, the stream first fail and records the tracked schema:

[DELTA_STREAMING_METADATA_EVOLUTION]
The schema, table configuration or protocol of your Delta table has changed during streaming.
The schema or metadata tracking log has been updated.
Please restart the stream to continue processing using the updated metadata.
Updated schema: <schema>.
Updated table configurations: <config>.
Updated table protocol: <protocol>

On retry, the stream fails and the user is prompted with a call to action:

[DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_TYPE_WIDENING]
We've detected one or more data types being widened between Delta version 2 and 3:
  col_a: INT -> BIGINT

Your streaming query contains operations that may fail or produce different results after the type change(s).
Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at version 2.
Once you have updated your streaming query or have decided there is no need to update it, you can set (one of) the following SQL configurations to unblock the type change(s) and continue stream processing.
To unblock for this particular stream just for this series of type change(s): set `spark.databricks.delta.streaming.allowSourceTypeWidening.ckpt_123456 = 2`.
To unblock for this particular stream: set `spark.databricks.delta.streaming.allowSourceTypeWidening.ckpt_123456 = always`
To unblock for all streams: set `spark.databricks.delta.streaming.allowSourceTypeWidening = always`.

Users can set one of the proposed config to resume processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant