Skip to content

Commit c6beaf0

Browse files
authored
batch processing for updating TI UUIDs (#49015)
closes #43438
1 parent 23965c4 commit c6beaf0

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
7d6f2aa31fb10d8006b6b7f572bedcc4be78eb828edfd42386e0b872b6999afc
1+
d79700d79f51a3f70709131183df2e80e6be0f0e73ffdbcc21731890a0a030fd

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,14 @@ database:
703703
type: string
704704
example: "airflow.providers.fab.auth_manager.models.db.FABDBManager"
705705
default: ~
706+
migration_batch_size:
707+
description: |
708+
The number of rows to process in each batch when performing a migration.
709+
This is useful for large tables to avoid locking and failure due to query timeouts.
710+
version_added: 3.0.0
711+
type: integer
712+
example: ~
713+
default: "10000"
706714
logging:
707715
description: ~
708716
options:

airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from sqlalchemy import text
3232
from sqlalchemy.dialects import postgresql
3333

34+
from airflow.configuration import conf
35+
3436
# revision identifiers, used by Alembic.
3537
revision = "d59cbbef95eb"
3638
down_revision = "05234396c6fc"
@@ -194,6 +196,7 @@ def create_foreign_keys():
194196

195197
def upgrade():
196198
"""Add UUID primary key to task instance table."""
199+
batch_size = conf.getint("database", "migration_batch_size", fallback=1000)
197200
conn = op.get_bind()
198201
dialect_name = conn.dialect.name
199202

@@ -202,12 +205,28 @@ def upgrade():
202205
if dialect_name == "postgresql":
203206
op.execute(pg_uuid7_fn)
204207

205-
# TODO: Add batching to handle updates in smaller chunks for large tables to avoid locking
206208
# Migrate existing rows with UUID v7 using a timestamp-based generation
207-
op.execute(
208-
"UPDATE task_instance SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, clock_timestamp()))"
209-
)
210-
209+
while True:
210+
result = conn.execute(
211+
text(
212+
"""
213+
WITH cte AS (
214+
SELECT ctid
215+
FROM task_instance
216+
WHERE id IS NULL
217+
LIMIT :batch_size
218+
)
219+
UPDATE task_instance
220+
SET id = uuid_generate_v7(coalesce(queued_dttm, start_date, clock_timestamp()))
221+
FROM cte
222+
WHERE task_instance.ctid = cte.ctid
223+
"""
224+
).bindparams(batch_size=batch_size)
225+
)
226+
row_count = result.rowcount
227+
if row_count == 0:
228+
break
229+
print(f"Migrated {row_count} task_instance rows in this batch...")
211230
op.execute(pg_uuid7_fn_drop)
212231

213232
# Drop existing primary key constraint to task_instance table

0 commit comments

Comments
 (0)