diff --git a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source_adapter.py b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source_adapter.py index e36d05f235fd..bbffe8f88f71 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source_adapter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source_adapter.py @@ -66,7 +66,9 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog abstract_streams.append(stream_instance.get_underlying_stream()) return abstract_streams - def convert_to_concurrent_stream(self, logger: logging.Logger, stream: Stream, cursor: Optional[Cursor] = None) -> Stream: + def convert_to_concurrent_stream( + self, logger: logging.Logger, stream: Stream, state_manager: ConnectorStateManager, cursor: Optional[Cursor] = None + ) -> Stream: """ Prepares a stream for concurrent processing by initializing or assigning a cursor, managing the stream's state, and returning an updated Stream instance. @@ -74,7 +76,7 @@ def convert_to_concurrent_stream(self, logger: logging.Logger, stream: Stream, c state: MutableMapping[str, Any] = {} if cursor: - state = cursor.state + state = state_manager.get_stream_state(stream.name, stream.namespace) stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor if hasattr(stream, "parent"): diff --git a/airbyte-cdk/python/unit_tests/sources/concurrent_source/test_concurrent_source_adapter.py b/airbyte-cdk/python/unit_tests/sources/concurrent_source/test_concurrent_source_adapter.py index 84616269064a..22c5d34a6a9c 100644 --- a/airbyte-cdk/python/unit_tests/sources/concurrent_source/test_concurrent_source_adapter.py +++ b/airbyte-cdk/python/unit_tests/sources/concurrent_source/test_concurrent_source_adapter.py @@ -41,7 +41,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> def streams(self, config: Mapping[str, Any]) -> List[Stream]: return [ - self.convert_to_concurrent_stream(self._logger, s) + self.convert_to_concurrent_stream(self._logger, s, Mock()) if is_concurrent else s for s, is_concurrent in self._streams_to_is_concurrent.items() diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py index aa63948f0ef6..4d4fb5c474f2 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py @@ -63,7 +63,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: state_converter = StreamFacadeConcurrentConnectorStateConverter() return [ - self.convert_to_concurrent_stream(stream.logger, stream, self.initialize_cursor( + self.convert_to_concurrent_stream(stream.logger, stream, state_manager, self.initialize_cursor( stream, state_manager, state_converter, self._cursor_boundaries, None, EpochValueConcurrentStreamStateConverter.get_end_provider()) ) for stream in self._streams