From 544effb7308f451b1d2c70ffae80eb6b2175f9ce Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 24 Jan 2025 00:50:04 +0200 Subject: [PATCH] Fix legacy state migration in SubstreamPartitionRouter --- .../substream_partition_router.py | 8 +- .../test_substream_partition_router.py | 109 ++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 1c7bb6961..ae558c634 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None: if not parent_state and incremental_dependency: # Attempt to retrieve child state - substream_state = list(stream_state.values()) - substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment + substream_state_values = list(stream_state.values()) + substream_state = substream_state_values[0] if substream_state_values else {} + # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state} + if isinstance(substream_state, (list, dict)): + substream_state = {} + parent_state = {} # Copy child state to parent streams with incremental dependencies diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 5814805e2..52306d348 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type(): _ = [s for s in partition_router.stream_slices()] +@pytest.mark.parametrize( + "initial_state, expected_parent_state", + [ + # Case 1: Empty initial state, no parent state expected + ({}, {}), + # Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor` + ( + {"updated_at": "2023-05-27T00:00:00Z"}, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + # Case 3: Initial state with global `state`, no migration expected + ( + {"state": {"updated": "2023-05-27T00:00:00Z"}}, + {}, + ), + # Case 4: Initial state with per-partition `states`, no migration expected + ( + { + "states": [ + { + "partition": { + "issue_id": "10012", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10019", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10000", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + ] + }, + {}, + ), + # Case 5: Initial state with `parent_state`, existing parent state persists + ( + { + "parent_state": { + "parent_stream_name1": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + }, + }, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + ], + ids=[ + "empty_initial_state", + "initial_state_no_parent_legacy_state", + "initial_state_no_parent_global_state", + "initial_state_no_parent_per_partition_state", + "initial_state_with_parent_state", + ], +) +def test_set_initial_state(initial_state, expected_parent_state): + """ + Test the `set_initial_state` method of SubstreamPartitionRouter. + + This test verifies that the method correctly handles different initial state formats + and sets the appropriate parent stream state. + """ + parent_stream = MockStream( + slices=[{}], + records=[], + name="parent_stream_name1", + cursor_field="parent_stream_cursor", + ) + parent_stream.state = {} + parent_stream_config = ParentStreamConfig( + stream=parent_stream, + parent_key="id", + partition_field="parent_stream_id", + parameters={}, + config={}, + incremental_dependency=True, + ) + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[parent_stream_config], + parameters={}, + config={}, + ) + + partition_router.set_initial_state(initial_state) + + # Assert the state of the parent stream + assert parent_stream.state == expected_parent_state, ( + f"Unexpected parent state. Initial state: {initial_state}, " + f"Expected: {expected_parent_state}, Got: {parent_stream.state}" + ) + + @pytest.mark.parametrize( "parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data", [