Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alex/concurrent with facade #30596

Closed
wants to merge 173 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
173 commits
Select commit Hold shift + click to select a range
27f2ca3
AbstractStream
girarda Sep 1, 2023
f0dc3ff
missing update
girarda Sep 1, 2023
79b4268
concurrency
girarda Sep 1, 2023
312555e
delete unused files
girarda Sep 1, 2023
b5b72f9
update
girarda Sep 1, 2023
b44b3f7
Automated Commit - Formatting Changes
girarda Sep 1, 2023
8366b00
Update airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/conc…
girarda Sep 2, 2023
9071c7b
override supports_incremental for concurrent_stream
girarda Sep 19, 2023
f298c6d
Update comment and check
girarda Sep 19, 2023
3069988
Merge branch 'master' into alex/abstract_stream
girarda Sep 19, 2023
4a16d57
decouple AbstractStream and Stream
girarda Sep 19, 2023
3f54c36
merge
girarda Sep 19, 2023
cff94f3
use a facade and a legacy adapter
girarda Sep 19, 2023
0e23a91
implement availability strategy adapter
girarda Sep 20, 2023
dbd9b91
move code
girarda Sep 20, 2023
8ebfc4b
reset
girarda Sep 20, 2023
6b45dce
missing file
girarda Sep 20, 2023
6d686f5
reset to master
girarda Sep 20, 2023
033818e
docs and move code
girarda Sep 20, 2023
d008cb2
comment
girarda Sep 20, 2023
0ae6b2e
comment
girarda Sep 20, 2023
2f0b05c
use facades
girarda Sep 20, 2023
f01ed1f
reset to master
girarda Sep 20, 2023
27dd603
reset to master
girarda Sep 20, 2023
0465e14
reset to master
girarda Sep 20, 2023
e55c19b
Update concurrent stream
girarda Sep 20, 2023
65de389
remove duplicate file
girarda Sep 20, 2023
9a8c4ce
bridge stream_slices
girarda Sep 20, 2023
834b90e
comment
girarda Sep 20, 2023
bb01ca3
move read_full_refresh_logic
girarda Sep 20, 2023
ba9d018
Update abstract_stream.py
girarda Sep 20, 2023
d4bd169
minor cleanup
girarda Sep 20, 2023
b6a6ab8
Merge branch 'alex/abstract_stream' of github.com:airbytehq/airbyte i…
girarda Sep 20, 2023
6856165
Remove generate_partitions from the interface
girarda Sep 20, 2023
11109db
merge
girarda Sep 20, 2023
3dd11fa
name is abstractmethod
girarda Sep 20, 2023
6d9190c
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 20, 2023
0b190b8
reset
girarda Sep 20, 2023
21eeb30
remove unused import
girarda Sep 20, 2023
ae2393f
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 20, 2023
b75b3b9
add missing __init__.py
girarda Sep 20, 2023
0d9ec29
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 20, 2023
8abe4e4
Update airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/conc…
girarda Sep 22, 2023
d8a1791
Merge branch 'alex/concurrency_stream' of github.com:airbytehq/airbyt…
girarda Sep 22, 2023
bac35ea
update
girarda Sep 22, 2023
996225d
use a single queue
girarda Sep 22, 2023
25f8bfd
Add error message parser
girarda Sep 22, 2023
281b1b3
Update
girarda Sep 27, 2023
86e58bf
wip
girarda Sep 27, 2023
c130996
read all records
girarda Sep 27, 2023
56a312b
verify all partitions are processed
girarda Sep 27, 2023
a92c258
hash the partitions
girarda Sep 27, 2023
9b435eb
fix tests
girarda Sep 27, 2023
056afa4
Add missing methods
girarda Sep 27, 2023
0736bd6
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
523356c
Automated Commit - Formatting Changes
girarda Sep 27, 2023
d138f36
rename file
girarda Sep 27, 2023
2de827b
unit tests
girarda Sep 27, 2023
e89d523
Merge branch 'alex/abstract_stream' of github.com:airbytehq/airbyte i…
girarda Sep 27, 2023
c7d4afd
merge
girarda Sep 27, 2023
871c35a
Revert "Automated Commit - Formatting Changes"
girarda Sep 27, 2023
1bfd66f
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
1e3f59e
Automated Commit - Formatting Changes
girarda Sep 27, 2023
2c148b9
use a typedef
girarda Sep 27, 2023
e44def2
Revert "Automated Commit - Formatting Changes"
girarda Sep 27, 2023
9d12b35
Use typedef
girarda Sep 27, 2023
6fd58f4
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
4570248
merge
girarda Sep 27, 2023
f959ef3
Merge branch 'master' into alex/abstract_stream
girarda Sep 27, 2023
1bfca28
Automated Commit - Formatting Changes
girarda Sep 27, 2023
2ba4318
Update docs
girarda Sep 27, 2023
dd85074
Automated Commit - Formatting Changes
girarda Sep 27, 2023
164db6f
mypy
girarda Sep 27, 2023
2decbdf
Merge branch 'alex/concurrent_with_facade' of github.com:airbytehq/ai…
girarda Sep 27, 2023
ba5355c
Revert "Automated Commit - Formatting Changes"
girarda Sep 27, 2023
ab3bc9c
Automated Commit - Formatting Changes
girarda Sep 27, 2023
1c9c944
Revert "Automated Commit - Formatting Changes"
girarda Sep 27, 2023
fe64b5c
update
girarda Sep 27, 2023
de0a511
comments
girarda Sep 27, 2023
7b91371
comments
girarda Sep 27, 2023
a6d3881
update
girarda Sep 27, 2023
e998bf6
more comments
girarda Sep 27, 2023
da8c809
move to a legacy file
girarda Sep 27, 2023
4f5e996
update interface
girarda Sep 27, 2023
739cc81
update interface
girarda Sep 27, 2023
7e4544c
docstrings
girarda Sep 27, 2023
4108e6b
docstrings
girarda Sep 27, 2023
1eae17f
remove support for nested cursor fields
girarda Sep 27, 2023
236cc01
fix cursor field
girarda Sep 27, 2023
0aa4a3d
merge
girarda Sep 27, 2023
b5a5661
Use message repository to emit non-record messages
girarda Sep 27, 2023
1f5c0ad
rename
girarda Sep 27, 2023
aee6850
docstrings
girarda Sep 27, 2023
393d2f7
docstrings
girarda Sep 27, 2023
e813b34
rename
girarda Sep 27, 2023
d1d5293
updat interface
girarda Sep 27, 2023
e0a81f0
merge
girarda Sep 27, 2023
b02cb17
fix
girarda Sep 27, 2023
0669074
reset
girarda Sep 27, 2023
64dbbe7
update
girarda Sep 27, 2023
70f1c18
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
0d1afbd
Update doc string
girarda Sep 27, 2023
2d6e34e
merge
girarda Sep 27, 2023
6d8db92
define a type for stream availability
girarda Sep 27, 2023
81ddf3b
test the singleton too
girarda Sep 27, 2023
e8e9dca
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
29a6b96
reset
girarda Sep 27, 2023
4f31fe7
typo
girarda Sep 27, 2023
faf2f4a
Merge branch 'alex/abstract_stream' into alex/concurrent_with_facade
girarda Sep 27, 2023
d9371db
fix availability facade
girarda Sep 27, 2023
ff29740
Add missing tests
girarda Sep 28, 2023
a40bf9a
more unit tests
girarda Sep 28, 2023
324b146
more tests
girarda Sep 28, 2023
87f6ea8
delete print
girarda Sep 28, 2023
153bbcf
more tests
girarda Sep 28, 2023
313be2a
even more tests
girarda Sep 28, 2023
6e6a498
fix
girarda Sep 28, 2023
1e21ff3
merge
girarda Sep 28, 2023
e37d01b
rename typedef
girarda Sep 28, 2023
389ce7d
Update interface
girarda Sep 28, 2023
7bb8fe2
merge
girarda Sep 28, 2023
6c6f9f1
docstring
girarda Sep 28, 2023
333017c
Update as per code review
girarda Sep 28, 2023
c38bbc0
Merge branch 'master' into alex/abstract_stream
girarda Sep 28, 2023
d5670f3
Revert "Update as per code review"
girarda Sep 28, 2023
062f00d
merge
girarda Sep 28, 2023
8599d3d
Update as per code review
girarda Sep 28, 2023
039ff8e
delete duplicate files
girarda Sep 28, 2023
855c729
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Sep 28, 2023
461d841
delete duplicate files
girarda Sep 28, 2023
1ed3089
Automated Commit - Formatting Changes
girarda Sep 28, 2023
bbba964
Automated Commit - Formatting Changes
girarda Sep 28, 2023
9164a08
Automated Commit - Formatting Changes
girarda Sep 28, 2023
e3267f0
Don't check for errors at every iteration
girarda Sep 29, 2023
b74a575
Merge branch 'alex/concurrency_stream' of github.com:airbytehq/airbyt…
girarda Sep 29, 2023
bbb9959
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Sep 29, 2023
c37d40e
pass local_cdk flag
girarda Sep 29, 2023
bf31e03
Merge branch 'alex/concurrent_with_facade' of github.com:airbytehq/ai…
girarda Sep 29, 2023
0697ae0
Update airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/abst…
girarda Oct 2, 2023
5819170
hide primary key (implementation part)
girarda Oct 2, 2023
4be3ebe
hide primary key
girarda Oct 2, 2023
a11bbfa
Automated Commit - Formatting Changes
girarda Oct 2, 2023
b8dc55b
merge
girarda Oct 2, 2023
35b1224
update with hidden primary key
girarda Oct 2, 2023
3dd8c4c
fix mypy issues
girarda Oct 2, 2023
2ea3ae2
merge
girarda Oct 2, 2023
7c60736
update
girarda Oct 2, 2023
bff0742
merge
girarda Oct 2, 2023
7f1dba3
stop exposing the logger
girarda Oct 2, 2023
ad7d7de
instantiate the stream with a logger
girarda Oct 2, 2023
d5bb48f
remove logger from the interface
girarda Oct 2, 2023
cc5ca35
Add missing abstractmethod annotation
girarda Oct 2, 2023
9d1686c
Merge branch 'alex/abstract_stream' into alex/concurrency_stream
girarda Oct 2, 2023
eb2c7fb
Update the tests
girarda Oct 2, 2023
5a8dce8
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Oct 3, 2023
0307bbf
Remove get_error_display_message from interface
girarda Oct 3, 2023
519e9fb
Automated Commit - Formatting Changes
girarda Oct 3, 2023
c968ab9
docstrings
girarda Oct 3, 2023
5f855c9
Merge branch 'alex/abstract_stream' of github.com:airbytehq/airbyte i…
girarda Oct 3, 2023
e53abac
merge
girarda Oct 3, 2023
9747bac
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Oct 3, 2023
9c75714
remove references to legacy
girarda Oct 9, 2023
19350b2
merge
girarda Oct 9, 2023
dee8662
remove instances of legacy
girarda Oct 9, 2023
e256c67
rename
girarda Oct 9, 2023
8412bfb
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Oct 9, 2023
6b6016c
merge
girarda Oct 9, 2023
460a298
wait if there are too many pending tasks
girarda Oct 9, 2023
56e412d
rename
girarda Oct 9, 2023
7c7eb2e
remove dead code
girarda Oct 9, 2023
89aa89c
Automated Commit - Formatting Changes
girarda Oct 9, 2023
bea6dcc
delete commented out code
girarda Oct 10, 2023
b1ac25a
Merge branch 'alex/concurrency_stream' into alex/concurrent_with_facade
girarda Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,7 @@ def _read_stream(
"cursor_field": configured_stream.cursor_field,
},
)
logger.debug(
f"Syncing stream instance: {stream_instance.name}",
extra={
"primary_key": stream_instance.primary_key,
"cursor_field": stream_instance.cursor_field,
},
)
stream_instance.log_stream_sync_configuration()

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
Expand Down Expand Up @@ -294,26 +288,14 @@ def _read_full_refresh(
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
logger.debug(
f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices}
)
total_records_counter = 0
for _slice in slices:
if self._slice_logger.should_log_slice_message(logger):
yield self._slice_logger.create_slice_log_message(_slice)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record_data_or_message in record_data_or_messages:
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return
for record_data_or_message in stream_instance.read_full_refresh(configured_stream.cursor_field, logger, self._slice_logger):
message = self._get_message(record_data_or_message, stream_instance)
yield message
if message.type == MessageType.RECORD:
total_records_counter += 1
if internal_config.is_limit_reached(total_records_counter):
return

def _checkpoint_state(self, stream: Stream, stream_state: Mapping[str, Any], state_manager: ConnectorStateManager) -> AirbyteMessage:
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.partitions.record import Record
from deprecated.classic import deprecated


@deprecated("This class is experimental. Use at your own risk.")
class AbstractStream(ABC):
"""
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream?
We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
"""

@abstractmethod
def read(self) -> Iterable[Record]:
"""
Read a stream in full refresh mode
:return: The stream's records
"""

@property
@abstractmethod
def name(self) -> str:
"""
:return: The stream name
"""

@property
@abstractmethod
def cursor_field(self) -> Optional[str]:
"""
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
"""

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
:return: The stream's availability
"""

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
"""

@abstractmethod
def as_airbyte_stream(self) -> AirbyteStream:
"""
:return: A dict of the JSON schema representing this stream.
"""

@abstractmethod
def log_stream_sync_configuration(self) -> None:
"""
Logs the stream's configuration for debugging purposes.
"""
Loading
Loading