Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent-partition-cursor): Fix cursor comparison error #298

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
AbstractStreamStateConverter,
)
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -72,13 +75,15 @@ def __init__(
stream_state: Any,
message_repository: MessageRepository,
connector_state_manager: ConnectorStateManager,
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
self._stream_namespace = stream_namespace
self._message_repository = message_repository
self._connector_state_manager = connector_state_manager
self._connector_state_converter = connector_state_converter
self._cursor_field = cursor_field

self._cursor_factory = cursor_factory
Expand Down Expand Up @@ -301,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
):
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
self._global_cursor = deepcopy(stream_state)
self._new_global_cursor = deepcopy(stream_state)
self._set_global_state(stream_state)

else:
self._use_global_cursor = stream_state.get("use_global_cursor", False)
Expand All @@ -319,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:

# set default state for missing partitions if it is per partition with fallback to global
if self._GLOBAL_STATE_KEY in stream_state:
self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])

# Set initial parent state
if stream_state.get("parent_state"):
Expand All @@ -329,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
"""
Initializes the global cursor state from the provided stream state.

If the cursor field key is present in the stream state, its value is parsed,
formatted, and stored as the global cursor. This ensures consistency in state
representation across partitions.
"""
if self.cursor_field.cursor_field_key in stream_state:
global_state_value = stream_state[self.cursor_field.cursor_field_key]
final_format_global_state_value = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(global_state_value)
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
)

fixed_global_state = {
self.cursor_field.cursor_field_key: final_format_global_state_value
}

self._global_cursor = deepcopy(fixed_global_state)
self._new_global_cursor = deepcopy(fixed_global_state)

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
self._use_global_cursor = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

datetime_format = datetime_based_cursor_model.datetime_format

cursor_granularity = (
parse_duration(datetime_based_cursor_model.cursor_granularity)
if datetime_based_cursor_model.cursor_granularity
else None
)

connector_state_converter: DateTimeStreamStateConverter
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
tolik0 marked this conversation as resolved.
Show resolved Hide resolved
datetime_format=datetime_format,
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
cursor_granularity=cursor_granularity,
)

# Create the cursor factory
cursor_factory = ConcurrentCursorFactory(
partial(
Expand All @@ -1233,6 +1249,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
},
"cursor_incremental_sync": {
"type": "DatetimeBasedCursor",
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"],
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
"start_datetime": {"datetime": "{{ config.get('start_date')}}"},
Expand Down Expand Up @@ -399,13 +399,16 @@ def _run_read(
VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20
VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21
VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30
VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30

# Initial State Constants
PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition)
PARENT_POSTS_CURSOR = "2024-01-05T00:00:00Z" # Parent posts cursor (expected in state)

INITIAL_STATE_PARTITION_10_CURSOR = "2024-01-02T00:00:01Z"
INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000
INITIAL_STATE_PARTITION_11_CURSOR = "2024-01-03T00:00:02Z"
INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000
INITIAL_GLOBAL_CURSOR = INITIAL_STATE_PARTITION_11_CURSOR
INITIAL_GLOBAL_CURSOR_DATE = datetime.fromisoformat(
INITIAL_STATE_PARTITION_11_CURSOR.replace("Z", "")
Expand Down Expand Up @@ -596,7 +599,7 @@ def _run_read(
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
Expand Down Expand Up @@ -637,7 +640,7 @@ def _run_read(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand All @@ -662,7 +665,7 @@ def _run_read(
"id": 10,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
},
{
"partition": {
Expand All @@ -672,7 +675,7 @@ def _run_read(
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
"lookback_window": 86400,
},
# Expected state
Expand Down Expand Up @@ -981,7 +984,15 @@ def run_incremental_parent_state_test(
# Fetch the first page of votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
# Requests with intermediate states
# Fetch votes for comment 10 of post 1
Expand Down Expand Up @@ -1018,7 +1029,15 @@ def run_incremental_parent_state_test(
# Fetch votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1056,7 +1075,7 @@ def run_incremental_parent_state_test(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down Expand Up @@ -1344,7 +1363,15 @@ def test_incremental_parent_state(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1382,7 +1409,7 @@ def test_incremental_parent_state(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down Expand Up @@ -1896,7 +1923,15 @@ def test_incremental_parent_state_no_records(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
Expand Down Expand Up @@ -1928,7 +1963,7 @@ def test_incremental_parent_state_no_records(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
Expand Down
Loading