Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: asyncio-based CDK #34424

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
e6d93ce
Working: new async_http.py module
clnoll Dec 22, 2023
8793464
Working: new file for async exceptions (some overlap with existing)
clnoll Dec 22, 2023
3e39da7
file rename
clnoll Dec 22, 2023
e979302
Working: new core_async.py with AsyncStream
clnoll Dec 22, 2023
bb5f26b
Working: stream_helper_async; not sure if needed
clnoll Dec 22, 2023
ee29b91
Working: AsyncAbstractSource
clnoll Dec 23, 2023
d138bcf
Working: AsyncAbstractStream
clnoll Dec 23, 2023
14fe32b
Working: AsyncDefaultStream
clnoll Dec 23, 2023
6fb1eea
Working: AsyncPartitionGenerator; not used
clnoll Dec 23, 2023
65e8241
Not used: async_call_rate and rate_limiting_async
clnoll Dec 23, 2023
83a8126
Working: async adapters for stream
clnoll Dec 23, 2023
48a725d
Working: async Salesforce
clnoll Dec 23, 2023
52ca102
Code for local testing
clnoll Dec 23, 2023
eef1b4f
Working but not really used: AsyncPartition
clnoll Dec 23, 2023
a6493ea
Use AsyncStream instead of Stream
clnoll Dec 23, 2023
37b049c
Working: use Limiter sessions (but need to test limiters)
clnoll Dec 23, 2023
d928fff
Add json header if sending json data
clnoll Dec 23, 2023
aaf19cf
Update request args
clnoll Dec 23, 2023
5f16581
Fix aiohttp cache
clnoll Dec 23, 2023
343ba2b
Working: handle token-based auth
clnoll Dec 24, 2023
b8675a7
Working: refactor BaseHttpStream out of HttpStream & AsyncHttpStream
clnoll Dec 24, 2023
8572328
Working: move session logic from Salesforce into AsyncHttpStream
clnoll Dec 24, 2023
da1f8b8
Working: simplify session logic
clnoll Dec 24, 2023
b84069f
Manage sessions in the source
clnoll Dec 26, 2023
db25080
Close sessions as the streams are finished processing
clnoll Dec 26, 2023
d3e2875
Working but awkward: handle session with SourceReader
clnoll Dec 26, 2023
a5e05f0
Working: add back in availability check
clnoll Dec 26, 2023
77fcbd6
Working: re-add timer
clnoll Dec 26, 2023
021fd30
Working: async stream_slices
clnoll Dec 26, 2023
c943190
Working: allow a default concurrency limit to be set
clnoll Dec 26, 2023
34a90e3
Working: don't use adapters.py
clnoll Dec 26, 2023
5fcbd8d
Passing: test_streams_core_async.py
clnoll Dec 27, 2023
90e61de
Working: cleanup
clnoll Dec 27, 2023
61cb63a
Working: check for streams before starting the reads
clnoll Dec 27, 2023
eed55f6
Working: only sync stream if it's in the catalog
clnoll Dec 27, 2023
98e5e0d
Working: move ensure_session onto AsyncStream
clnoll Dec 27, 2023
de25233
Working: handle errors from reader thread
clnoll Dec 27, 2023
0a0d0a7
Working: some error handling fixes
clnoll Dec 27, 2023
c2b7265
Passing: test_abstract_source_async.py
clnoll Dec 28, 2023
5840f9d
WIP: test_http_async.py
clnoll Dec 28, 2023
298bff1
Fix backoff handling
clnoll Dec 28, 2023
b52c8a1
Fix caching
clnoll Dec 29, 2023
8f685f0
Fix cache test
clnoll Dec 29, 2023
55445f2
All new tests passing except form
clnoll Dec 29, 2023
fcaa5e1
Remove rate limiter; it can be added back in later
clnoll Dec 29, 2023
fd0dd81
test_rate_limiting_async.py
clnoll Dec 29, 2023
ec8f2ed
WIP: test_availability_strategy_async.py
clnoll Dec 30, 2023
7f3f48d
Passing: test_availability_strategy_async.py
clnoll Dec 30, 2023
f98f61c
Add availability_strategy_async.py and tests
clnoll Dec 31, 2023
29af3ed
Reorganize scenario-based test helpers
clnoll Jan 1, 2024
b5122ff
Revert an unintended change
clnoll Jan 2, 2024
abb30f0
Reorganize scenario builder
clnoll Jan 2, 2024
b47821b
Remove capsys from scenario-based tests as it's flaky; instead,
clnoll Jan 2, 2024
5722338
Add scenario-based tests for asyncio-based concurrency
clnoll Jan 2, 2024
1dda388
wip
clnoll Jan 2, 2024
333d482
fix tests
clnoll Jan 2, 2024
5b4d658
Update Salesforce
clnoll Jan 3, 2024
81b7e7e
Fix Salesforce
clnoll Jan 6, 2024
3741cc9
Salesforce tests
clnoll Jan 6, 2024
475f33f
Handle check_availability StopAsyncIteration
clnoll Jan 6, 2024
67fe34e
cleanup
clnoll Jan 6, 2024
96d1a78
StopIteration in SourceReader.__next__ should not be StopAsyncIteration
clnoll Jan 6, 2024
537010d
Clean up error handling; CDK passing, Salesforce 2 failing due to exc…
clnoll Jan 6, 2024
b8ce8db
Fix error handling; all Salesforce & CDK tests & comparison with mast…
clnoll Jan 6, 2024
145ee23
Reorg into separate async_cdk module
clnoll Jan 6, 2024
c5f4908
__init__.py
clnoll Jan 7, 2024
cebca84
Use SourceDispatcher to route requests to async source
clnoll Jan 7, 2024
5874671
Fix types and formatting
clnoll Jan 7, 2024
8e3544c
WIP
clnoll Jan 8, 2024
63a379c
Make unified sync/async error
clnoll Jan 8, 2024
9c7bd9c
Remove some repetetive rate limiting code
clnoll Jan 9, 2024
bfa7124
Salesforce: start splitting into sync vs async
clnoll Jan 9, 2024
6e93ca9
Split out Salesforce async
clnoll Jan 9, 2024
d6cfc45
Salesforce async full refresh & sync incremental
clnoll Jan 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
return


def launch(source: Source, args: List[str]) -> None:
def get_source_iter(source: Source, args: List[str]) -> Iterable[str]:
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)
for message in source_entrypoint.run(parsed_args):
return source_entrypoint.run(parsed_args)


def launch(source: Source, args: List[str]) -> None:
for message in get_source_iter(source, args):
print(message)


Expand Down
135 changes: 105 additions & 30 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import (
Any,
Iterable,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from airbyte_cdk.models import (
AirbyteCatalog,
Expand All @@ -28,7 +38,9 @@
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.stream_status_utils import (
as_airbyte_message as stream_status_as_airbyte_message,
)
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

_default_message_repository = InMemoryMessageRepository()
Expand All @@ -41,7 +53,9 @@ class AbstractSource(Source, ABC):
"""

@abstractmethod
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
def check_connection(
self, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
Expand All @@ -62,22 +76,26 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""

# Stream name to instance map for applying output object transformation
_stream_to_instance_map: Dict[str, Stream] = {}
_stream_to_instance_map: Mapping[str, Stream] = {}
_slice_logger: SliceLogger = DebugSliceLogger()

@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
def discover(
self, logger: logging.Logger, config: Mapping[str, Any]
) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
"""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
def check(
self, logger: logging.Logger, config: Mapping[str, Any]
) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
"""
Expand All @@ -91,15 +109,19 @@ def read(
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
state: Optional[
Union[List[AirbyteStateMessage], MutableMapping[str, Any]]
] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
# get the streams once in case the connector needs to make any queries to generate them
stream_instances = {s.name: s for s in self.streams(config)}
state_manager = ConnectorStateManager(stream_instance_map=stream_instances, state=state)
state_manager = ConnectorStateManager(
stream_instance_map=stream_instances, state=state
)
self._stream_to_instance_map = stream_instances

stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
Expand All @@ -117,43 +139,69 @@ def read(

try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
stream_is_available, reason = stream_instance.check_availability(logger, self)
stream_is_available, reason = stream_instance.check_availability(
logger, self
)
if not stream_is_available:
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. {reason}")
logger.warning(
f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. {reason}"
)
continue
logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.STARTED)
logger.info(
f"Marking stream {configured_stream.stream.name} as STARTED"
)
yield stream_status_as_airbyte_message(
configured_stream.stream, AirbyteStreamStatus.STARTED
)
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
configured_stream=configured_stream,
state_manager=state_manager,
internal_config=internal_config,
)
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
logger.info(
f"Marking stream {configured_stream.stream.name} as STOPPED"
)
yield stream_status_as_airbyte_message(
configured_stream.stream, AirbyteStreamStatus.COMPLETE
)
except AirbyteTracedException as e:
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
yield stream_status_as_airbyte_message(
configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
)
if self.continue_sync_on_stream_failure:
stream_name_to_exception[stream_instance.name] = e
else:
raise e
except Exception as e:
yield from self._emit_queued_messages()
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
logger.exception(
f"Encountered an exception while reading stream {configured_stream.stream.name}"
)
logger.info(
f"Marking stream {configured_stream.stream.name} as STOPPED"
)
yield stream_status_as_airbyte_message(
configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
)
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise AirbyteTracedException.from_exception(
e, message=display_message
) from e
raise e
finally:
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

if self.continue_sync_on_stream_failure and len(stream_name_to_exception) > 0:
raise AirbyteTracedException(message=self._generate_failed_streams_error_message(stream_name_to_exception))
raise AirbyteTracedException(
message=self._generate_failed_streams_error_message(
stream_name_to_exception
)
)
logger.info(f"Finished syncing {self.name}")

@property
Expand All @@ -173,7 +221,9 @@ def _read_stream(
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
if internal_config.page_size and isinstance(stream_instance, HttpStream):
logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
logger.info(
f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
)
stream_instance.page_size = internal_config.page_size
logger.debug(
f"Syncing configured stream: {configured_stream.stream.name}",
Expand All @@ -185,7 +235,10 @@ def _read_stream(
)
stream_instance.log_stream_sync_configuration()

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
use_incremental = (
configured_stream.sync_mode == SyncMode.incremental
and stream_instance.supports_incremental
)
if use_incremental:
record_iterator = self._read_incremental(
logger,
Expand All @@ -195,7 +248,9 @@ def _read_stream(
internal_config,
)
else:
record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config)
record_iterator = self._read_full_refresh(
logger, stream_instance, configured_stream, internal_config
)

record_counter = 0
stream_name = configured_stream.stream.name
Expand All @@ -206,7 +261,9 @@ def _read_stream(
if record_counter == 1:
logger.info(f"Marking stream {stream_name} as RUNNING")
# If we just read the first record of the stream, emit the transition to the RUNNING state
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.RUNNING)
yield stream_status_as_airbyte_message(
configured_stream.stream, AirbyteStreamStatus.RUNNING
)
yield from self._emit_queued_messages()
yield record

Expand All @@ -230,7 +287,9 @@ def _read_incremental(
:return:
"""
stream_name = configured_stream.stream.name
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
stream_state = state_manager.get_stream_state(
stream_name, stream_instance.namespace
)

if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
Expand Down Expand Up @@ -260,22 +319,31 @@ def _read_full_refresh(
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
total_records_counter = 0
for record_data_or_message in stream_instance.read_full_refresh(configured_stream.cursor_field, logger, self._slice_logger):
for record_data_or_message in stream_instance.read_full_refresh(
configured_stream.cursor_field, logger, self._slice_logger
):
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return

def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream) -> AirbyteMessage:
def _get_message(
self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
) -> AirbyteMessage:
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
"""
if isinstance(record_data_or_message, AirbyteMessage):
return record_data_or_message
else:
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())
return stream_data_to_airbyte_message(
stream.name,
record_data_or_message,
stream.transformer,
stream.get_json_schema(),
)

@property
def message_repository(self) -> Union[None, MessageRepository]:
Expand All @@ -293,6 +361,13 @@ def continue_sync_on_stream_failure(self) -> bool:
return False

@staticmethod
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
failures = ", ".join([f"{stream}: {exception.__repr__()}" for stream, exception in stream_failures.items()])
def _generate_failed_streams_error_message(
stream_failures: Mapping[str, AirbyteTracedException]
) -> str:
failures = ", ".join(
[
f"{stream}: {exception.__repr__()}"
for stream, exception in stream_failures.items()
]
)
return f"During the sync, the following streams did not sync successfully: {failures}"
Empty file.
Loading
Loading