From af400e5c832aaace334c845073c6c994222cefb6 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 25 Mar 2025 17:20:41 -0400 Subject: [PATCH 1/2] fix substreampartitionrouter state migration from child when it is empty --- .../substream_partition_router.py | 2 +- .../test_substream_partition_router.py | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 50fe4f0bc..59fda9396 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -374,7 +374,7 @@ def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> Str # Ignore per-partition states or invalid formats. if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: # If a global state is present under the key "state", use its first value. - if "state" in stream_state and isinstance(stream_state["state"], dict): + if "state" in stream_state and isinstance(stream_state["state"], dict) and stream_state["state"] != {}: substream_state = list(stream_state["state"].values())[0] else: return {} diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index c4a608f4a..7df7023c2 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -473,6 +473,19 @@ def test_substream_partition_router_invalid_parent_record_type(): }, {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, ), + # Case 7: Migrate child state to parent state but child state is empty + ( + { + "state": {}, + "states": [], + "parent_state": { + "posts": {} + }, + "lookback_window": 1, + "use_global_cursor": False + }, + {}, + ), ], ids=[ "empty_initial_state", @@ -481,6 +494,7 @@ def test_substream_partition_router_invalid_parent_record_type(): "initial_state_no_parent_per_partition_state", "initial_state_with_parent_state", "initial_state_no_parent_global_state_declarative", + "initial_state_no_parent_and_no_child", ], ) def test_set_initial_state(initial_state, expected_parent_state): From 26095cb5a676a8ea1fb11dfc7657bd20989a485c Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Tue, 25 Mar 2025 21:39:02 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../substream_partition_router.py | 6 +++++- .../test_substream_partition_router.py | 18 ++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 59fda9396..000beeff9 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -374,7 +374,11 @@ def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> Str # Ignore per-partition states or invalid formats. if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: # If a global state is present under the key "state", use its first value. - if "state" in stream_state and isinstance(stream_state["state"], dict) and stream_state["state"] != {}: + if ( + "state" in stream_state + and isinstance(stream_state["state"], dict) + and stream_state["state"] != {} + ): substream_state = list(stream_state["state"].values())[0] else: return {} diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 7df7023c2..675e76a95 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -475,16 +475,14 @@ def test_substream_partition_router_invalid_parent_record_type(): ), # Case 7: Migrate child state to parent state but child state is empty ( - { - "state": {}, - "states": [], - "parent_state": { - "posts": {} - }, - "lookback_window": 1, - "use_global_cursor": False - }, - {}, + { + "state": {}, + "states": [], + "parent_state": {"posts": {}}, + "lookback_window": 1, + "use_global_cursor": False, + }, + {}, ), ], ids=[