Skip to content

Commit

Permalink
fix(low-code): Fix legacy state migration in SubstreamPartitionRouter (
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 authored Jan 23, 2025
1 parent ec7e961 commit 97419d0
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:

if not parent_state and incremental_dependency:
# Attempt to retrieve child state
substream_state = list(stream_state.values())
substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment
substream_state_values = list(stream_state.values())
substream_state = substream_state_values[0] if substream_state_values else {}
# Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state}
if isinstance(substream_state, (list, dict)):
substream_state = {}

parent_state = {}

# Copy child state to parent streams with incremental dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type():
_ = [s for s in partition_router.stream_slices()]


@pytest.mark.parametrize(
"initial_state, expected_parent_state",
[
# Case 1: Empty initial state, no parent state expected
({}, {}),
# Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor`
(
{"updated_at": "2023-05-27T00:00:00Z"},
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
),
# Case 3: Initial state with global `state`, no migration expected
(
{"state": {"updated": "2023-05-27T00:00:00Z"}},
{},
),
# Case 4: Initial state with per-partition `states`, no migration expected
(
{
"states": [
{
"partition": {
"issue_id": "10012",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
{
"partition": {
"issue_id": "10019",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
{
"partition": {
"issue_id": "10000",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
]
},
{},
),
# Case 5: Initial state with `parent_state`, existing parent state persists
(
{
"parent_state": {
"parent_stream_name1": {"parent_stream_cursor": "2023-05-27T00:00:00Z"},
},
},
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
),
],
ids=[
"empty_initial_state",
"initial_state_no_parent_legacy_state",
"initial_state_no_parent_global_state",
"initial_state_no_parent_per_partition_state",
"initial_state_with_parent_state",
],
)
def test_set_initial_state(initial_state, expected_parent_state):
"""
Test the `set_initial_state` method of SubstreamPartitionRouter.
This test verifies that the method correctly handles different initial state formats
and sets the appropriate parent stream state.
"""
parent_stream = MockStream(
slices=[{}],
records=[],
name="parent_stream_name1",
cursor_field="parent_stream_cursor",
)
parent_stream.state = {}
parent_stream_config = ParentStreamConfig(
stream=parent_stream,
parent_key="id",
partition_field="parent_stream_id",
parameters={},
config={},
incremental_dependency=True,
)

partition_router = SubstreamPartitionRouter(
parent_stream_configs=[parent_stream_config],
parameters={},
config={},
)

partition_router.set_initial_state(initial_state)

# Assert the state of the parent stream
assert parent_stream.state == expected_parent_state, (
f"Unexpected parent state. Initial state: {initial_state}, "
f"Expected: {expected_parent_state}, Got: {parent_stream.state}"
)


@pytest.mark.parametrize(
"parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
Expand Down

0 comments on commit 97419d0

Please sign in to comment.