diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index ab667c655..054006a9a 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -26,6 +26,7 @@ AbstractStreamStateConverter, ) from airbyte_cdk.sources.types import Record, StreamSlice, StreamState +from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_parse logger = logging.getLogger("airbyte") @@ -151,8 +152,12 @@ def close_partition(self, partition: Partition) -> None: ): if ( self._new_global_cursor is None - or self._new_global_cursor[self.cursor_field.cursor_field_key] - < cursor.state[self.cursor_field.cursor_field_key] + or ab_datetime_parse( + self._new_global_cursor[self.cursor_field.cursor_field_key] + ).to_epoch_millis() + < ab_datetime_parse( + cursor.state[self.cursor_field.cursor_field_key] + ).to_epoch_millis() ): self._new_global_cursor = copy.deepcopy(cursor.state) if not self._use_global_cursor: