Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent-partition-cursor): Fix cursor comparison error #298

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
AbstractStreamStateConverter,
)
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -72,13 +75,15 @@ def __init__(
stream_state: Any,
message_repository: MessageRepository,
connector_state_manager: ConnectorStateManager,
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
self._stream_namespace = stream_namespace
self._message_repository = message_repository
self._connector_state_manager = connector_state_manager
self._connector_state_converter = connector_state_converter
self._cursor_field = cursor_field

self._cursor_factory = cursor_factory
Expand Down Expand Up @@ -144,11 +149,9 @@ def close_partition(self, partition: Partition) -> None:
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
):
if (
self._new_global_cursor is None
or self._new_global_cursor[self.cursor_field.cursor_field_key]
< cursor.state[self.cursor_field.cursor_field_key]
):
if self._new_global_cursor is None or self._extract_cursor_value_from_state(
self._new_global_cursor
) < self._extract_cursor_value_from_state(cursor.state):
self._new_global_cursor = copy.deepcopy(cursor.state)
if not self._use_global_cursor:
self._emit_state_message()
Expand Down Expand Up @@ -372,5 +375,10 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
cursor = self._cursor_per_partition[partition_key]
return cursor

def _extract_cursor_value_from_state(self, state: StreamState) -> Any:
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
return self._connector_state_converter.parse_value(
state[self.cursor_field.cursor_field_key]
)

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

datetime_format = datetime_based_cursor_model.datetime_format

cursor_granularity = (
parse_duration(datetime_based_cursor_model.cursor_granularity)
if datetime_based_cursor_model.cursor_granularity
else None
)

connector_state_converter: DateTimeStreamStateConverter
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
datetime_format=datetime_format,
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
cursor_granularity=cursor_granularity,
)

# Create the cursor factory
cursor_factory = ConcurrentCursorFactory(
partial(
Expand All @@ -1225,6 +1241,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
},
"cursor_incremental_sync": {
"type": "DatetimeBasedCursor",
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"],
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
"start_datetime": {"datetime": "{{ config.get('start_date')}}"},
Expand Down Expand Up @@ -399,6 +399,7 @@ def _run_read(
VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20
VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21
VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30
VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30

# Initial State Constants
PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition)
Expand Down Expand Up @@ -596,7 +597,7 @@ def _run_read(
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
Expand Down Expand Up @@ -637,7 +638,7 @@ def _run_read(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down Expand Up @@ -981,7 +982,15 @@ def run_incremental_parent_state_test(
# Fetch the first page of votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
# Requests with intermediate states
# Fetch votes for comment 10 of post 1
Expand Down Expand Up @@ -1018,7 +1027,15 @@ def run_incremental_parent_state_test(
# Fetch votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1056,7 +1073,7 @@ def run_incremental_parent_state_test(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down Expand Up @@ -1344,7 +1361,15 @@ def test_incremental_parent_state(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1382,7 +1407,7 @@ def test_incremental_parent_state(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down Expand Up @@ -1896,7 +1921,15 @@ def test_incremental_parent_state_no_records(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1928,7 +1961,7 @@ def test_incremental_parent_state_no_records(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down
Loading