Skip to content

Commit

Permalink
CDK: ThreadBasedConcurrentStream skeleton and top-level AbstractStream (
Browse files Browse the repository at this point in the history
#30111)

Co-authored-by: girarda <[email protected]>
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]>
Co-authored-by: Catherine Noll <[email protected]>
  • Loading branch information
4 people authored Oct 11, 2023
1 parent 156ee4c commit 25fc396
Show file tree
Hide file tree
Showing 22 changed files with 1,617 additions and 27 deletions.
34 changes: 8 additions & 26 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,7 @@ def _read_stream(
"cursor_field": configured_stream.cursor_field,
},
)
logger.debug(
f"Syncing stream instance: {stream_instance.name}",
extra={
"primary_key": stream_instance.primary_key,
"cursor_field": stream_instance.cursor_field,
},
)
stream_instance.log_stream_sync_configuration()

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
Expand Down Expand Up @@ -294,26 +288,14 @@ def _read_full_refresh(
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
logger.debug(
f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices}
)
total_records_counter = 0
for _slice in slices:
if self._slice_logger.should_log_slice_message(logger):
yield self._slice_logger.create_slice_log_message(_slice)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return
for record_data_or_message in stream_instance.read_full_refresh(configured_stream.cursor_field, logger, self._slice_logger):
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return

def _checkpoint_state(self, stream: Stream, stream_state: Mapping[str, Any], state_manager: ConnectorStateManager) -> AirbyteMessage:
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

# Initialize Streams Package
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from deprecated.classic import deprecated


@deprecated("This class is experimental. Use at your own risk.")
class AbstractStream(ABC):
"""
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream?
We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
"""

@abstractmethod
def read(self) -> Iterable[Record]:
"""
Read a stream in full refresh mode
:return: The stream's records
"""

@property
@abstractmethod
def name(self) -> str:
"""
:return: The stream name
"""

@property
@abstractmethod
def cursor_field(self) -> Optional[str]:
"""
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
"""

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
:return: The stream's availability
"""

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
"""

@abstractmethod
def as_airbyte_stream(self) -> AirbyteStream:
"""
:return: A dict of the JSON schema representing this stream.
"""

@abstractmethod
def log_stream_sync_configuration(self) -> None:
"""
Logs the stream's configuration for debugging purposes.
"""
Loading

0 comments on commit 25fc396

Please sign in to comment.