Skip to content

Commit

Permalink
feat(low-code concurrent): Add use_global_cursor flag to Concurrent…
Browse files Browse the repository at this point in the history
…PerPartitionCursor (#279)
  • Loading branch information
tolik0 authored Jan 28, 2025
1 parent d9d93ab commit d318618
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(
self._lookback_window: int = 0
self._parent_state: Optional[StreamState] = None
self._over_limit: int = 0
self._use_global_cursor: bool = False
self._partition_serializer = PerPartitionKeySerializer()

self._set_initial_state(stream_state)
Expand All @@ -105,16 +106,18 @@ def cursor_field(self) -> CursorField:

@property
def state(self) -> MutableMapping[str, Any]:
states = []
for partition_tuple, cursor in self._cursor_per_partition.items():
if cursor.state:
states.append(
{
"partition": self._to_dict(partition_tuple),
"cursor": copy.deepcopy(cursor.state),
}
)
state: dict[str, Any] = {self._PERPARTITION_STATE_KEY: states}
state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor}
if not self._use_global_cursor:
states = []
for partition_tuple, cursor in self._cursor_per_partition.items():
if cursor.state:
states.append(
{
"partition": self._to_dict(partition_tuple),
"cursor": copy.deepcopy(cursor.state),
}
)
state[self._PERPARTITION_STATE_KEY] = states

if self._global_cursor:
state[self._GLOBAL_STATE_KEY] = self._global_cursor
Expand Down Expand Up @@ -147,7 +150,8 @@ def close_partition(self, partition: Partition) -> None:
< cursor.state[self.cursor_field.cursor_field_key]
):
self._new_global_cursor = copy.deepcopy(cursor.state)
self._emit_state_message()
if not self._use_global_cursor:
self._emit_state_message()

def ensure_at_least_one_state_emitted(self) -> None:
"""
Expand Down Expand Up @@ -225,14 +229,18 @@ def _ensure_partition_limit(self) -> None:
"""
with self._lock:
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if partition_key in self._finished_partitions:
if (
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
):
oldest_partition = self._cursor_per_partition.pop(
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}."
)
break
else:
Expand Down Expand Up @@ -297,6 +305,8 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
self._new_global_cursor = deepcopy(stream_state)

else:
self._use_global_cursor = stream_state.get("use_global_cursor", False)

self._lookback_window = int(stream_state.get("lookback_window", 0))

for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
Expand All @@ -320,6 +330,9 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
self._partition_router.set_initial_state(stream_state)

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
self._use_global_cursor = True

if not record.associated_slice:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
Expand Down Expand Up @@ -358,3 +371,6 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
)
cursor = self._cursor_per_partition[partition_key]
return cursor

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import copy
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, List, Mapping, MutableMapping, Optional, Union
Expand Down Expand Up @@ -721,6 +721,7 @@ def _run_read(
"cursor": {"created_at": VOTE_300_CREATED_AT},
},
],
"use_global_cursor": False,
"lookback_window": 1,
"parent_state": {},
"state": {"created_at": VOTE_100_CREATED_AT},
Expand Down Expand Up @@ -1121,6 +1122,7 @@ def run_incremental_parent_state_test(
}
},
"lookback_window": 1,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
Expand Down Expand Up @@ -1170,8 +1172,66 @@ def test_incremental_parent_state(
)


STATE_MIGRATION_EXPECTED_STATE = {
"state": {"created_at": VOTE_100_CREATED_AT},
"parent_state": {
"post_comments": {
"use_global_cursor": False,
"state": {"updated_at": COMMENT_10_UPDATED_AT},
"parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}},
"lookback_window": 1,
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
},
{
"partition": {"id": 2, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_20_UPDATED_AT},
},
{
"partition": {"id": 3, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_30_UPDATED_AT},
},
],
}
},
"lookback_window": 1,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": VOTE_100_CREATED_AT},
},
{
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": VOTE_111_CREATED_AT},
},
{
"partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": PARTITION_SYNC_START_TIME},
},
{
"partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}},
"cursor": {"created_at": VOTE_200_CREATED_AT},
},
{
"partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}},
"cursor": {"created_at": VOTE_210_CREATED_AT},
},
{
"partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}},
"cursor": {"created_at": VOTE_300_CREATED_AT},
},
],
}
STATE_MIGRATION_GLOBAL_EXPECTED_STATE = copy.deepcopy(STATE_MIGRATION_EXPECTED_STATE)
del STATE_MIGRATION_GLOBAL_EXPECTED_STATE["states"]
STATE_MIGRATION_GLOBAL_EXPECTED_STATE["use_global_cursor"] = True


@pytest.mark.parametrize(
"test_name, manifest, mock_requests, expected_records, expected_state",
"test_name, manifest, mock_requests, expected_records",
[
(
"test_incremental_parent_state",
Expand Down Expand Up @@ -1326,80 +1386,45 @@ def test_incremental_parent_state(
"id": 300,
},
],
# Expected state
),
],
)
@pytest.mark.parametrize(
"initial_state, expected_state",
[
({"created_at": PARTITION_SYNC_START_TIME}, STATE_MIGRATION_EXPECTED_STATE),
(
{
"state": {"created_at": VOTE_100_CREATED_AT},
"state": {"created_at": PARTITION_SYNC_START_TIME},
"lookback_window": 0,
"use_global_cursor": False,
"parent_state": {
"post_comments": {
"use_global_cursor": False,
"state": {"updated_at": COMMENT_10_UPDATED_AT},
"parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}},
"lookback_window": 1,
"states": [
{
"partition": {"id": 1, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_10_UPDATED_AT},
},
{
"partition": {"id": 2, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_20_UPDATED_AT},
},
{
"partition": {"id": 3, "parent_slice": {}},
"cursor": {"updated_at": COMMENT_30_UPDATED_AT},
},
],
"state": {"updated_at": PARTITION_SYNC_START_TIME},
"parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}},
"lookback_window": 0,
}
},
"lookback_window": 1,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": VOTE_100_CREATED_AT},
},
{
"partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": VOTE_111_CREATED_AT},
},
{
"partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}},
"cursor": {"created_at": PARTITION_SYNC_START_TIME},
},
{
"partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}},
"cursor": {"created_at": VOTE_200_CREATED_AT},
},
{
"partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}},
"cursor": {"created_at": VOTE_210_CREATED_AT},
},
{
"partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}},
"cursor": {"created_at": VOTE_300_CREATED_AT},
},
],
},
STATE_MIGRATION_EXPECTED_STATE,
),
],
)
@pytest.mark.parametrize(
"initial_state",
[
{"created_at": PARTITION_SYNC_START_TIME},
{
"state": {"created_at": PARTITION_SYNC_START_TIME},
"lookback_window": 0,
"use_global_cursor": True,
"parent_state": {
"post_comments": {
"state": {"updated_at": PARTITION_SYNC_START_TIME},
"parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}},
"lookback_window": 0,
}
(
{
"state": {"created_at": PARTITION_SYNC_START_TIME},
"lookback_window": 0,
"use_global_cursor": True,
"parent_state": {
"post_comments": {
"state": {"updated_at": PARTITION_SYNC_START_TIME},
"parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}},
"lookback_window": 0,
}
},
},
},
STATE_MIGRATION_GLOBAL_EXPECTED_STATE,
),
],
ids=["legacy_python_format", "low_code_global_format"],
ids=["legacy_python_format", "low_code_per_partition_state", "low_code_global_format"],
)
def test_incremental_parent_state_migration(
test_name, manifest, mock_requests, expected_records, initial_state, expected_state
Expand Down Expand Up @@ -1510,6 +1535,7 @@ def test_incremental_parent_state_migration(
],
"state": {"created_at": INITIAL_GLOBAL_CURSOR},
"lookback_window": 1,
"use_global_cursor": False,
},
),
],
Expand Down Expand Up @@ -1677,13 +1703,14 @@ def test_incremental_parent_state_no_slices(
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"use_global_cursor": True,
"use_global_cursor": False,
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"lookback_window": 0,
},
# Expected state
{
"lookback_window": 1,
"use_global_cursor": False,
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"states": [
{
Expand Down Expand Up @@ -1953,6 +1980,7 @@ def test_incremental_parent_state_no_records(
},
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"lookback_window": 86400,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
Expand Down Expand Up @@ -2220,6 +2248,7 @@ def test_incremental_substream_error(
},
# Expected state
{
"use_global_cursor": False,
"lookback_window": 1,
"state": {"updated_at": "2024-01-25T00:00:00Z"},
"states": [
Expand Down Expand Up @@ -2318,6 +2347,7 @@ def test_incremental_list_partition_router(
# Expected state
{
"lookback_window": 0,
"use_global_cursor": False,
"state": {"updated_at": "2024-01-08T00:00:00Z"},
"states": [
{"cursor": {"updated_at": "2024-01-20T00:00:00Z"}, "partition": {"id": "1"}},
Expand Down Expand Up @@ -2845,6 +2875,7 @@ def test_incremental_error(
}
},
"lookback_window": 1,
"use_global_cursor": False,
"states": [
{
"partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},
Expand Down

0 comments on commit d318618

Please sign in to comment.