From 0a444a5e91bd3933e72af15d12e9448ee1218437 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 7 Feb 2025 14:25:42 +0200 Subject: [PATCH 1/2] Add migration from global state --- .../substream_partition_router.py | 32 +++++++++++++------ .../test_concurrent_perpartitioncursor.py | 13 +++++++- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 6ccb055e8..73a747f02 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -299,23 +299,33 @@ def set_initial_state(self, stream_state: StreamState) -> None: def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: """ - Migrate the child stream state to the parent stream's state format. + Migrate the child or global stream state into the parent stream's state format. - This method converts the global or child state into a format compatible with parent - streams. The migration occurs only for parent streams with incremental dependencies. - The method filters out per-partition states and retains only the global state in the - format `{cursor_field: cursor_value}`. + This method converts the child stream state—or, if present, the global state—into a format that is + compatible with parent streams that use incremental synchronization. The migration occurs only for + parent streams with incremental dependencies. It filters out per-partition states and retains only the + global state in the form {cursor_field: cursor_value}. + + The method supports multiple input formats: + - A simple global state, e.g.: + {"updated_at": "2023-05-27T00:00:00Z"} + - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: + {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} + In this case, the migration uses the first value from the "state" dictionary. + - Any per-partition state formats or other non-simple structures are ignored during migration. Args: stream_state (StreamState): The state to migrate. Expected formats include: - {"updated_at": "2023-05-27T00:00:00Z"} - - {"states": [...] } (ignored during migration) + - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} + (In this format, only the first global state value is used, and per-partition states are ignored.) Returns: StreamState: A migrated state for parent streams in the format: { "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} } + where each parent stream with an incremental dependency is assigned its corresponding cursor value. Example: Input: {"updated_at": "2023-05-27T00:00:00Z"} @@ -326,11 +336,15 @@ def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> Str substream_state_values = list(stream_state.values()) substream_state = substream_state_values[0] if substream_state_values else {} - # Ignore per-partition states or invalid formats + # Ignore per-partition states or invalid formats. if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: - return {} + # If a global state is present under the key "state", use its first value. + if "state" in stream_state and isinstance(stream_state["state"], dict): + substream_state = list(stream_state["state"].values())[0] + else: + return {} - # Copy child state to parent streams with incremental dependencies + # Build the parent state for all parent streams with incremental dependencies. parent_state = {} if substream_state: for parent_config in self.parent_stream_configs: diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index cca92ae46..ef06676f5 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1450,8 +1450,19 @@ def test_incremental_parent_state( }, STATE_MIGRATION_GLOBAL_EXPECTED_STATE, ), + ( + { + "state": {"created_at": PARTITION_SYNC_START_TIME}, + }, + STATE_MIGRATION_EXPECTED_STATE, + ), + ], + ids=[ + "legacy_python_format", + "low_code_per_partition_state", + "low_code_global_format", + "global_state_no_parent", ], - ids=["legacy_python_format", "low_code_per_partition_state", "low_code_global_format"], ) def test_incremental_parent_state_migration( test_name, manifest, mock_requests, expected_records, initial_state, expected_state From 3e37a7262432990b6aca3fa5fb0645514d41567a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 7 Feb 2025 14:53:28 +0200 Subject: [PATCH 2/2] Fix unit test --- .../partition_routers/test_substream_partition_router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 089d2bf07..c4a608f4a 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -415,7 +415,7 @@ def test_substream_partition_router_invalid_parent_record_type(): # Case 3: Initial state with global `state`, no migration expected ( {"state": {"updated": "2023-05-27T00:00:00Z"}}, - {}, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, ), # Case 4: Initial state with per-partition `states`, no migration expected ( @@ -471,7 +471,7 @@ def test_substream_partition_router_invalid_parent_record_type(): "use_global_cursor": True, "state": {"updated": "2023-05-27T00:00:00Z"}, }, - {}, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, ), ], ids=[