Skip to content

Commit

Permalink
fix(concurrent-cdk): follow-up #44946 update state to state_manager s…
Browse files Browse the repository at this point in the history
…tate in convert to concurrent stream (#45718)
  • Loading branch information
lazebnyi authored Oct 1, 2024
1 parent 1e205f2 commit d727af0
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog
abstract_streams.append(stream_instance.get_underlying_stream())
return abstract_streams

def convert_to_concurrent_stream(self, logger: logging.Logger, stream: Stream, cursor: Optional[Cursor] = None) -> Stream:
def convert_to_concurrent_stream(
self, logger: logging.Logger, stream: Stream, state_manager: ConnectorStateManager, cursor: Optional[Cursor] = None
) -> Stream:
"""
Prepares a stream for concurrent processing by initializing or assigning a cursor,
managing the stream's state, and returning an updated Stream instance.
"""
state: MutableMapping[str, Any] = {}

if cursor:
state = cursor.state
state = state_manager.get_stream_state(stream.name, stream.namespace)

stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor
if hasattr(stream, "parent"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [
self.convert_to_concurrent_stream(self._logger, s)
self.convert_to_concurrent_stream(self._logger, s, Mock())
if is_concurrent
else s
for s, is_concurrent in self._streams_to_is_concurrent.items()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
state_converter = StreamFacadeConcurrentConnectorStateConverter()

return [
self.convert_to_concurrent_stream(stream.logger, stream, self.initialize_cursor(
self.convert_to_concurrent_stream(stream.logger, stream, state_manager, self.initialize_cursor(
stream, state_manager, state_converter, self._cursor_boundaries, None, EpochValueConcurrentStreamStateConverter.get_end_provider())
)
for stream in self._streams
Expand Down

0 comments on commit d727af0

Please sign in to comment.