-
Notifications
You must be signed in to change notification settings - Fork 30
chore: clean up ConcurrentDeclarativeSource only processing DefaultStream and streamline inheritance to Source interface #743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…treamline inheritance to Source interface, deprecate legacy components
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@brian/streamline_cds_move_legacy_components#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch brian/streamline_cds_move_legacy_componentsHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Fast)3 742 tests - 10 3 730 ✅ - 10 6m 12s ⏱️ -11s Results for commit e6f9485. ± Comparison against base commit dd52cfe. This pull request removes 82 and adds 72 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
PyTest Results (Full)3 745 tests - 10 3 733 ✅ - 10 10m 49s ⏱️ -20s Results for commit e6f9485. ± Comparison against base commit dd52cfe. This pull request removes 82 and adds 72 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
📝 WalkthroughWalkthroughRoutes DeclarativeStream and cursor types to legacy modules, removes CustomIncrementalSync from schema/models/parsing, refactors ConcurrentDeclarativeSource to subclass Source with a Status-based check and public streams(config) API, adds local utilities to concurrent partition cursor, and updates many tests/imports accordingly. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant S as ConcurrentDeclarativeSource
participant F as ModelToComponentFactory
participant C as ConnectionChecker
participant R as AirbyteConnectionStatus
User->>S: check(logger, config)
S->>F: build ConnectionChecker component (uses self._config)
F-->>S: ConnectionChecker
S->>C: check_connection(Source, logger, config)
C-->>S: result {status: SUCCEEDED|FAILED, message}
S-->>User: AirbyteConnectionStatus(status, message)
Note right of S: check() returns Status-based result replacing (bool, message)
sequenceDiagram
autonumber
actor User
participant S as ConcurrentDeclarativeSource
participant F as ModelToComponentFactory
User->>S: streams(config)
S->>F: create stream components (may use partition_router)
F-->>S: stream instances
S->>S: select/filter concurrent streams
S-->>User: List[Stream]
Note right of S: public streams(config) replaces previous _group_streams tuple flow
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Would you like me to flag the highest-risk files for deeper review (ConcurrentDeclarativeSource, ModelToComponentFactory, concurrent_partition_cursor) — wdyt? 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (10)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
220-239: Don’t block on a semaphore while holding the global lock.
acquire()may block for a while; doing it underself._lockserializes unrelated partitions and can hurt throughput. Grab the semaphore under lock, release the lock, block on the semaphore, then re-acquire the lock for the rest. This keeps invariants without long critical sections. wdyt?- with self._lock: - self._semaphore_per_partition[partition_key].acquire() - if not self._use_global_cursor: - self._cursor_per_partition[partition_key].close_partition(partition=partition) - cursor = self._cursor_per_partition[partition_key] - if ( - partition_key in self._partitions_done_generating_stream_slices - and self._semaphore_per_partition[partition_key]._value == 0 - ): - self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) - - # Clean up the partition if it is fully processed - self._cleanup_if_done(partition_key) - - self._check_and_update_parent_state() - - self._emit_state_message() + # Read semaphore under lock, then block outside the lock + with self._lock: + semaphore = self._semaphore_per_partition[partition_key] + semaphore.acquire() + + with self._lock: + if not self._use_global_cursor: + self._cursor_per_partition[partition_key].close_partition(partition=partition) + cursor = self._cursor_per_partition[partition_key] + if ( + partition_key in self._partitions_done_generating_stream_slices + and self._semaphore_per_partition[partition_key]._value == 0 + ): + self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key]) + + # Clean up the partition if it is fully processed + self._cleanup_if_done(partition_key) + + self._check_and_update_parent_state() + + self._emit_state_message()
266-273: Guard Timer.finish() to avoid RuntimeError when no slices were generated.If no partitions/slices were ever generated and
ensure_at_least_one_state_emitted()is called,finish()can raise because_timer.start()never ran. Guarding withis_running()makes this safe. wdyt?- if not any( + if not any( semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() ): self._global_cursor = self._new_global_cursor - self._lookback_window = self._timer.finish() + if self._timer.is_running(): + self._lookback_window = self._timer.finish() + else: + self._lookback_window = 0 self._parent_state = self._partition_router.get_stream_state() self._emit_state_message(throttle=False)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (2)
63-66: Bug: calling float.ceil directly raises AttributeError at runtime.
((...)/1e9).__ceil__()will fail because float doesn’t expose__ceil__publicly; usemath.ceiland aNonecheck.Apply this diff?
@@ - def finish(self) -> int: - if self._start: - return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__() - else: - raise RuntimeError("Global substream cursor timer not started") + def finish(self) -> int: + import math + if self._start is not None: + seconds = (time.perf_counter_ns() - self._start) / 1e9 + return math.ceil(seconds) + raise RuntimeError("Global substream cursor timer not started")
238-241: Avoid relying on Semaphore._value (private, non-portable).Reading
_slice_semaphore._valueis implementation detail and racy across Python versions. Would you track an explicit “inflight slices” counter guarded by the same lock (or a Condition) and checkinflight == 0instead, wdyt?@@ class GlobalSubstreamCursor(DeclarativeCursor): - self._slice_semaphore = threading.Semaphore(0) + self._slice_semaphore = threading.Semaphore(0) + self._inflight = 0 @@ - self._slice_semaphore.release() + self._slice_semaphore.release() + with self._lock: + self._inflight += 1 if last: self._all_slices_yielded = True @@ - self._slice_semaphore.acquire() - if self._all_slices_yielded and self._slice_semaphore._value == 0: + self._slice_semaphore.acquire() + self._inflight -= 1 + if self._all_slices_yielded and self._inflight == 0: self._lookback_window = self._timer.finish() self._stream_cursor.close_slice( StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args )unit_tests/sources/declarative/custom_state_migration.py (2)
24-26: Bug: computedself._cursor_fieldis never used; rawself._cursor.cursor_fieldis used instead.This can break when the cursor field is interpolated. Use the evaluated
self._cursor_fieldfor state lookups and emitted keys.Apply this diff?
- self._cursor_field = InterpolatedString.create( - self._cursor.cursor_field, parameters=self._parameters - ).eval(self._config) + self._cursor_field = InterpolatedString.create( + self._cursor.cursor_field, parameters=self._parameters + ).eval(self._config) @@ - updated_at = stream_state[self._cursor.cursor_field] + updated_at = stream_state[self._cursor_field] @@ - "cursor": {self._cursor.cursor_field: updated_at}, + "cursor": {self._cursor_field: updated_at}, @@ - "cursor": {self._cursor.cursor_field: updated_at}, + "cursor": {self._cursor_field: updated_at},Also, do you want to guard against a missing key in
stream_stateand raise a clearer error, wdyt?Also applies to: 31-45
19-23: Defensive check: ensureincremental_syncexists.If
declarative_stream.incremental_synccan beNone, accessing.cursor_fieldwill crash. Add a clear check?def __init__(self, declarative_stream: DeclarativeStreamModel, config: Config): self._config = config - self.declarative_stream = declarative_stream - self._cursor = declarative_stream.incremental_sync + self.declarative_stream = declarative_stream + self._cursor = declarative_stream.incremental_sync + if self._cursor is None: + raise ValueError("CustomStateMigration requires an incremental_sync with a cursor_field")Wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
389-399: Narrowstreams(...)to return only concurrent streams and assert at runtime.Since CDS now “only processes DefaultStream”, can we tighten the annotation and assert to fail fast if a non-concurrent stream slips in?
- def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder + def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore @@ - return source_streams + # Fail fast if any non-concurrent stream appears (should not happen per design). + assert all(isinstance(s, AbstractStream) for s in source_streams), "ConcurrentDeclarativeSource.streams expected AbstractStream instances only" + return source_streams # type: ignoreWdyt?
Also applies to: 409-421
632-644: Filter to concurrent streams in_select_streams(...).If a non-concurrent
Streamever gets through,ConcurrentSource.readcould break. Filtering is cheap insurance.- for configured_stream in configured_catalog.streams: - stream_instance = stream_name_to_instance.get(configured_stream.stream.name) - if stream_instance: - abstract_streams.append(stream_instance) + for configured_stream in configured_catalog.streams: + stream_instance = stream_name_to_instance.get(configured_stream.stream.name) + if stream_instance and isinstance(stream_instance, AbstractStream): + abstract_streams.append(stream_instance)Wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1966-1973: Bug: partition_field_start uses partition_field_end (leads to wrong request option injection).This will send both start/end under the end field key. Should be start->partition_field_start. Patch below, wdyt?
- request_options_provider = DatetimeBasedRequestOptionsProvider( - start_time_option=start_time_option, - end_time_option=end_time_option, - partition_field_start=cursor_model.partition_field_end, - partition_field_end=cursor_model.partition_field_end, - config=config, - parameters=model.parameters or {}, - ) + request_options_provider = DatetimeBasedRequestOptionsProvider( + start_time_option=start_time_option, + end_time_option=end_time_option, + partition_field_start=cursor_model.partition_field_start, + partition_field_end=cursor_model.partition_field_end, + config=config, + parameters=model.parameters or {}, + )
2656-2665: Missing required kw-only arg: stream_slicer in create_dynamic_schema_loader.create_simple_retriever requires stream_slicer (kw-only). This call omits it and will raise TypeError. Pass None explicitly while keeping partition_router for request option routing, wdyt?
retriever = self._create_component_from_model( model=model.retriever, config=config, name=name, primary_key=None, - partition_router=self._build_stream_slicer_from_partition_router( + stream_slicer=None, + partition_router=self._build_stream_slicer_from_partition_router( model.retriever, config ), transformations=[], use_cache=True,
🧹 Nitpick comments (36)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (9)
33-48: Fix docstring to match the function signature (zero-arg state supplier).The docstring still claims
get_stream_state_func“takes an element,” but it’s a zero-arg callable invoked between elements. Also clarify that the function is a generator yielding (element, is_last, state). Adjusting the doc clarifies usage and avoids confusion, wdyt?Apply this diff to update the docstring:
- Iterates over the given generator, yielding tuples containing the element, a flag - indicating whether it's the last element in the generator, and the result of - `get_stream_state_func` applied to the element. + Iterates over the given generator and yields (element, is_last, state) tuples. + `get_stream_state_func` is a zero-argument callable that is invoked before the + first element and between successive elements to fetch the current stream state. @@ - get_stream_state_func: A function that takes an element from the generator and - returns its state. + get_stream_state_func: A zero-argument callable returning the current stream + state mapping (or None) for the stream. @@ - An iterator that yields tuples of the form (element, is_last, state). + A generator that yields tuples of the form (element, is_last, state).
66-85: Harden Timer.finish() None-check; keep semantics intact.Using
if self._start:can mask a legitimate0value and is less explicit. Switching to a None check makes intent clear and avoids surprises. Do you want to also reset_startafterfinish()to prevent accidental reuse, or do you intentionally rely onis_running()staying True as a “stream_slices was invoked” sentinel, wdyt?def finish(self) -> int: - if self._start: + if self._start is not None: return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__() else: raise RuntimeError("Global substream cursor timer not started")
303-313: Avoid unused variable ‘last’.
lastfrom the outer partition iterator isn’t used. Renaming to_(or_is_last) avoids lint noise and clarifies intent. wdyt?- for partition, last, parent_state in iterate_with_last_flag_and_state( + for partition, _, parent_state in iterate_with_last_flag_and_state( slices, self._partition_router.get_stream_state ):
320-331: Reuse computed partition_key to avoid duplicate work.Nit: you compute
partition_keyat Line 320, then recompute the same key at Line 322. Reusing it improves clarity and avoids extra serialization. wdyt?- cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) + cursor = self._cursor_per_partition.get(partition_key)
371-379: Docstring references a non-existent_finished_partitions.The implementation removes finished partitions by checking membership in
_partition_key_to_indexand tracking_partitions_done_generating_stream_slices. Update the docstring to reflect current behavior. wdyt?- 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. - These partitions are considered processed and safe to delete. + 1. Attempt to remove partitions that have completed processing + (i.e., not present in `_partition_key_to_index`). These are safe to delete.
387-407: Make the partition limit condition clearer.Using
>= DEFAULT_MAX_PARTITIONS_NUMBERis clearer than> DEFAULT_MAX_PARTITIONS_NUMBER - 1. Same semantics, better readability. wdyt?- while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + while len(self._cursor_per_partition) >= self.DEFAULT_MAX_PARTITIONS_NUMBER:
109-113: Typo in class docstring: “CurrentPerPartitionCursor” → “ConcurrentPerPartitionCursor”.Tiny nit but helps searchability and avoids confusion. wdyt?
- CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. + ConcurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
221-231: Avoid relying onSemaphore._value(private API).Accessing
_valueis CPython-specific and not thread-safe. Could we track “in-flight slices per partition” explicitly (e.g., anintcounter in a dict updated alongside the semaphore) or use aBoundedSemaphoreand a separate completion flag to determine when to advance the global cursor, wdyt?
5-11: Import cleanup (minor).Both
import copyandfrom copy import deepcopyare used. Would you prefer to standardize on one (e.g., onlydeepcopy) and replacecopy.deepcopy(...)calls accordingly, wdyt?airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (1)
35-41: Type narrowing to DatetimeBasedCursor only — confirm callers/manifests are migratedConstraining cursor to DatetimeBasedCursor aligns with the schema changes. Could we double-check that any manifests/tests that previously provided CustomIncrementalSync are updated, and add a short note in the migration docstring that only DatetimeBasedCursor is supported now, wdyt?
airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py (1)
9-9: Adopt Source/Stream in check_dynamic_stream — aligned with refactorThe type updates are coherent with the rest of the PR. One small hardening idea: if an exception occurs before the first iteration assigns stream, referencing stream.name in the except could raise an UnboundLocalError. Would you like to guard the name used in the error message, wdyt?
For example:
current_stream_name = "<unknown>" try: for stream in streams[: min(self.stream_count, len(streams))]: current_stream_name = getattr(stream, "name", "<unknown>") stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: logger.warning(f"Stream {current_stream_name} is not available: {reason}") return False, reason except Exception as error: error_message = f"Encountered an error trying to connect to stream {current_stream_name}. Error: {error}" logger.error(error_message, exc_info=True) return False, error_messageAlso applies to: 12-12, 36-39
unit_tests/sources/streams/concurrent/test_cursor.py (2)
16-18: Tests updated to legacy import — nice. Add a guard for compatibility?Would you add a tiny test asserting
from airbyte_cdk import DatetimeBasedCursorstill works (re-export), to protect downstreams relying on the top-level import, wdyt?
1288-1296: Tests rely on private attrs of DatetimeBasedCursor.The test accesses
_start_datetime,_partition_field_start/_end,_step, etc. Could we expose small accessors or a thin helper for these to reduce brittleness against internal refactors, wdyt?Also applies to: 1299-1303
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
89-96: Partition limit loop is a bit opaque with “- 1”.Would
>= self.DEFAULT_MAX_PARTITIONS_NUMBERread clearer and avoid off-by-one confusion, wdyt?- while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + while len(self._cursor_per_partition) >= self.DEFAULT_MAX_PARTITIONS_NUMBER:
204-209: Unreachable code after raise.The second
if not stream_slice:is dead. Remove for clarity, wdyt?def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: if not stream_slice: raise ValueError("A partition needs to be provided in order to extract a state") - if not stream_slice: - return None - return self._get_state_for_partition(stream_slice.partition)unit_tests/sources/declarative/partition_routers/helpers.py (2)
55-58: Type hints: make optional explicitcursor_field and stream_state can be None in stream_slices; would you update hints to Optional[...] for accuracy, wdyt?
- def stream_slices( - self, - *, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Optional[StreamSlice]]: + def stream_slices( + self, + *, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[Optional[StreamSlice]]:
64-83: read_records return type vs yielded Recordsread_records advertises Iterable[Mapping[str, Any]] but yields Record objects when stream_slice is set. For clarity, would you either change the annotation or yield dicts consistently, wdyt?
- ) -> Iterable[Mapping[str, Any]]: + ) -> Iterable[Union[Mapping[str, Any], Record]]:airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
8-12: Prefer public re-exports to reduce internal couplingSince these classes are re-exported at airbyte_cdk.init, would you consider importing via the top-level package to avoid binding tests/runtime to internal legacy paths, wdyt?
-from airbyte_cdk.legacy.sources.declarative.incremental import ( - GlobalSubstreamCursor, - PerPartitionCursor, - PerPartitionWithGlobalCursor, -) +from airbyte_cdk import ( + GlobalSubstreamCursor, + PerPartitionCursor, + PerPartitionWithGlobalCursor, +)unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
15-20: Optional: import via top-level re-exports to decouple testsTo avoid tests depending on internal legacy structure, would you switch to top-level re-exports, wdyt?
-from airbyte_cdk.legacy.sources.declarative.incremental import ( - DatetimeBasedCursor, - DeclarativeCursor, - ResumableFullRefreshCursor, -) +from airbyte_cdk import ( + DatetimeBasedCursor, + DeclarativeCursor, + ResumableFullRefreshCursor, +)unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py (1)
7-9: Optionally use top-level re-exportWould using the package-level re-export help keep this test insulated from internal module layout, wdyt?
-from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( - DatetimeBasedCursor, -) +from airbyte_cdk import DatetimeBasedCursorairbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
10-13: Minor: avoid importing helpers from deep modules?iterate_with_last_flag_and_state coming from global_substream_cursor creates a subtle coupling; would you consider relocating that helper to a small shared utils module within legacy.incremental and import from there, wdyt?
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (1)
143-147: Typo in test name (“aysnc”) — rename for clarity.Would you mind renaming the test to “async” to avoid confusion, wdyt?
-def test_isinstance_global_cursor_aysnc_job_partition_router(): +def test_isinstance_global_cursor_async_job_partition_router():unit_tests/sources/declarative/checks/test_check_dynamic_stream.py (2)
125-127: Make parametrized test IDs unique for readability.Both cases use “test_stream_available”; shall we differentiate them to ease test triage, wdyt?
- pytest.param(200, Status.SUCCEEDED, True, [], id="test_stream_available"), - pytest.param(200, Status.SUCCEEDED, False, [], id="test_stream_available"), + pytest.param(200, Status.SUCCEEDED, True, [], id="test_stream_available_with_availability"), + pytest.param(200, Status.SUCCEEDED, False, [], id="test_stream_available_without_availability"),
169-171: Guard against None message on failure-expected cases.To avoid surprises if message is missing, would you add a small guard when expected_messages is non-empty, wdyt?
- for message in expected_messages: - assert message in connection_status.message + if expected_messages: + assert connection_status.message, "Expected failure message missing" + for message in expected_messages: + assert message in connection_status.messageairbyte_cdk/sources/declarative/checks/check_stream.py (2)
66-71: Signature updated tosource: Source—keep tuple return forConnectionCheckerbut consider future-proofing.Since the public check surface across the PR is moving to Status-based results, would you like to add a small note to the docstring here clarifying why this still returns
(bool, Any)as an internal component contract, to avoid confusion, wdyt?
122-127: Param type updated toSource—guarding already handled.
should_check_dynamic_streamspre-check avoids attribute errors; looks good. Tiny nit: the comment/docstring forevaluate_availabilitystill references “migrate everything to AbstractStream” which is outdated with the DefaultStream-only direction—want to update that for clarity, wdyt?unit_tests/sources/declarative/checks/test_check_stream.py (2)
372-399: Status-based expectations read well; consider asserting messages on failures too.You already validate HTTP error messages elsewhere; would you like to also assert
connection_status.messagewhenexpected_result is Status.FAILEDhere to tighten failure-path coverage, wdyt?Also applies to: 400-470, 471-519, 520-569, 570-617
694-694: ExpectingValueErrorwith onlytypeprovided.If the model validation raises
ValidationErrorinstead, this test could become brittle. Do you want to accept eitherValueErrororValidationErrorto future-proof it, wdyt?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
514-533: ConditionalStreams evaluation uses emptyparameters.If any legacy manifests rely on
$parametersin the condition, this could surprise them. Should we pass through manifest-level parameters (if present) toInterpolatedBooleanfor parity, wdyt?unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
18-25: Use top-level re-exports for DeclarativeStream and DatetimeBasedCursor?
We’ve confirmed they’re re-exported in airbyte_cdk/init.py (imports on lines 68–69, all at 237, 241), so you can simplify the test imports:-from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk import DeclarativeStream @@ -from airbyte_cdk.legacy.sources.declarative.incremental import ( - CursorFactory, - DatetimeBasedCursor, - PerPartitionCursor, - PerPartitionWithGlobalCursor, - ResumableFullRefreshCursor, -) +from airbyte_cdk.legacy.sources.declarative.incremental import ( + CursorFactory, + PerPartitionCursor, + PerPartitionWithGlobalCursor, + ResumableFullRefreshCursor, +) +from airbyte_cdk import DatetimeBasedCursorwdyt?
airbyte_cdk/legacy/sources/declarative/incremental/__init__.py (1)
2-2: Add a short module docstring to signal deprecation surface?A tiny docstring will guide users toward preferred import paths without breaking anything. Add something like below, wdyt?
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +""" +Legacy declarative incremental cursor re-exports. +Prefer importing from `airbyte_cdk` top-level or concurrent modules where available. +This module exists for backward compatibility. +""" + from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import (unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
715-731: Stabilize stream selection: prefer name-based lookup over list indexesIndexing into
streams(e.g.,[0],[2],[7]) is brittle as ordering may shift with manifest or constructor changes. Shall we switch to a helper that resolves byname, and update these spots, wdyt?Apply within the shown ranges:
- party_members_stream = streams[0] + party_members_stream = get_stream_by_name(streams, "party_members") ... - locations_stream = streams[2] + locations_stream = get_stream_by_name(streams, "locations") ... - incremental_counting_stream = streams[7] + incremental_counting_stream = get_stream_by_name(streams, "incremental_counting_stream")Add this small helper once (outside these ranges):
def get_stream_by_name(streams, name: str): return next(s for s in streams if s.name == name)Also applies to: 733-747, 758-771
1393-1398: Avoid index-based access in other tests, tooSame ask here to reduce flakiness; resolve by stream name instead, wdyt?
- streams = source.streams(_CONFIG) - assert streams[0].cursor.state.get("state") != state_blob.__dict__, "State was not migrated." - assert streams[0].cursor.state.get("states") == [ + streams = source.streams(_CONFIG) + party_members_stream = get_stream_by_name(streams, "party_members") + assert party_members_stream.cursor.state.get("state") != state_blob.__dict__, "State was not migrated." + assert party_members_stream.cursor.state.get("states") == [ {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, ], "State was migrated, but actual state don't match expected"- async_job_stream = streams[6] + async_job_stream = get_stream_by_name(streams, "async_job_stream")- locations_stream = streams[2] + locations_stream = get_stream_by_name(streams, "locations")- locations_stream = streams[2] + locations_stream = get_stream_by_name(streams, "locations")Also applies to: 1637-1639, 1695-1697, 1754-1756
903-906: Harden the patched init against signature driftPatching
AbstractStreamStateConverter.__init__with a fixed signature is fragile. Can we accept*args, **kwargsso future params don’t break tests, wdyt?Proposed replacement for
mocked_init(defined above this patch):def mocked_init(self, *args, **kwargs): # Force concurrent state format in tests without depending on the ctor signature self._is_sequential_state = False
2183-2187: DRY the check-path mocking and avoid coupling to a private methodTwo tests patch
SimpleRetriever._fetch_next_pageforcheck(). Would you consider a small fixture/context helper to centralize this and reduce reliance on a private method, wdyt?Example fixture:
import contextlib import pytest from unittest.mock import patch @pytest.fixture def mock_check_pages(): @contextlib.contextmanager def _mock(pages): with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): yield return _mockUsage:
with mock_check_pages([_create_page({...})]): connection_status = source.check(logging.getLogger(""), config) assert connection_status.status == Status.SUCCEEDEDAlso applies to: 2962-2973
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3387-3391: Be defensive when swapping request_options_provider for partition_router.PartitionRouter may not always implement the RequestOptionsProvider methods. Guard with hasattr to avoid runtime AttributeError and cast to satisfy typing, wdyt?
- if isinstance(request_options_provider, DefaultRequestOptionsProvider) and isinstance( - partition_router, PartitionRouter - ): - request_options_provider = partition_router + if ( + isinstance(request_options_provider, DefaultRequestOptionsProvider) + and isinstance(partition_router, PartitionRouter) + and hasattr(partition_router, "get_request_params") + ): + request_options_provider = cast(RequestOptionsProvider, partition_router)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (38)
airbyte_cdk/__init__.py(1 hunks)airbyte_cdk/legacy/sources/declarative/declarative_stream.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/__init__.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py(1 hunks)airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py(2 hunks)airbyte_cdk/sources/declarative/checks/check_stream.py(3 hunks)airbyte_cdk/sources/declarative/checks/connection_checker.py(2 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py(8 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml(0 hunks)airbyte_cdk/sources/declarative/incremental/__init__.py(0 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py(2 hunks)airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py(1 hunks)airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py(0 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(4 hunks)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py(1 hunks)unit_tests/connector_builder/test_connector_builder_handler.py(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_datetime_based_cursor.py(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.py(1 hunks)unit_tests/legacy/sources/declarative/test_declarative_stream.py(1 hunks)unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py(0 hunks)unit_tests/sources/declarative/checks/test_check_dynamic_stream.py(3 hunks)unit_tests/sources/declarative/checks/test_check_stream.py(17 hunks)unit_tests/sources/declarative/custom_state_migration.py(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(2 hunks)unit_tests/sources/declarative/partition_routers/helpers.py(1 hunks)unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py(1 hunks)unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py(1 hunks)unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py(2 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(1 hunks)unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py(15 hunks)unit_tests/sources/streams/concurrent/test_cursor.py(2 hunks)
💤 Files with no reviewable changes (4)
- airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py
- unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
- airbyte_cdk/sources/declarative/declarative_component_schema.yaml
- airbyte_cdk/sources/declarative/incremental/init.py
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.
Applied to files:
unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.pyunit_tests/legacy/sources/declarative/test_declarative_stream.pyairbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.pyairbyte_cdk/legacy/sources/declarative/declarative_stream.pyunit_tests/sources/declarative/partition_routers/helpers.pyairbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.pyairbyte_cdk/__init__.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.pyunit_tests/legacy/sources/declarative/test_declarative_stream.pyairbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.pyairbyte_cdk/legacy/sources/declarative/declarative_stream.pyunit_tests/sources/declarative/partition_routers/helpers.pyairbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.pyunit_tests/connector_builder/test_connector_builder_handler.pyairbyte_cdk/sources/declarative/retrievers/simple_retriever.pyairbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.pyairbyte_cdk/__init__.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.
Applied to files:
unit_tests/sources/declarative/requesters/paginators/test_stop_condition.pyunit_tests/connector_builder/test_connector_builder_handler.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/legacy/sources/declarative/test_declarative_stream.pyairbyte_cdk/legacy/sources/declarative/declarative_stream.pyunit_tests/sources/declarative/partition_routers/helpers.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyairbyte_cdk/sources/declarative/checks/check_stream.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code graph analysis (33)
unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
cursor(196-197)airbyte_cdk/sources/streams/concurrent/cursor.py (1)
Cursor(51-87)
unit_tests/legacy/sources/declarative/test_declarative_stream.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
DeclarativeStream(2406-2473)
unit_tests/legacy/sources/declarative/incremental/test_datetime_based_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (3)
airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor(73-353)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
PerPartitionCursor(27-365)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)
unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor(73-353)
airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
unit_tests/sources/streams/concurrent/test_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)
unit_tests/sources/declarative/partition_routers/helpers.py (1)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (3)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (2)
GlobalSubstreamCursor(73-353)iterate_with_last_flag_and_state(19-49)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory(19-24)PerPartitionCursor(27-365)
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
DatetimeBasedCursor(1781-1889)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (3)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (1)
ResumableFullRefreshCursor(12-93)
unit_tests/connector_builder/test_connector_builder_handler.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
DeclarativeStream(2406-2473)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (1)
ResumableFullRefreshCursor(12-93)airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
airbyte_cdk/sources/declarative/checks/connection_checker.py (1)
airbyte_cdk/sources/source.py (1)
Source(55-95)
unit_tests/sources/declarative/checks/test_check_dynamic_stream.py (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
check(487-505)airbyte_cdk/test/mock_http/mocker.py (1)
assert_number_of_calls(131-140)
airbyte_cdk/legacy/sources/declarative/incremental/__init__.py (6)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor(73-353)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory(19-24)PerPartitionCursor(27-365)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (2)
ChildPartitionResumableFullRefreshCursor(97-116)ResumableFullRefreshCursor(12-93)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)
airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py (2)
airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
DatetimeBasedCursor(1781-1889)
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (2)
ChildPartitionResumableFullRefreshCursor(97-116)ResumableFullRefreshCursor(12-93)
airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
airbyte_cdk/__init__.py (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream(2406-2473)DatetimeBasedCursor(1781-1889)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6)
airbyte_cdk/sources/source.py (3)
Source(55-95)read(36-45)discover(48-52)airbyte_cdk/sources/abstract_source.py (5)
streams(74-79)read(101-211)discover(85-90)check(92-99)check_connection(59-71)airbyte_cdk/entrypoint.py (3)
read(271-283)discover(260-269)check(219-258)airbyte_cdk/sources/declarative/checks/connection_checker.py (2)
ConnectionChecker(12-38)check_connection(18-38)airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py (1)
check_connection(35-61)airbyte_cdk/sources/declarative/checks/check_stream.py (1)
check_connection(66-102)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (6)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory(19-24)PerPartitionCursor(27-365)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (1)
ResumableFullRefreshCursor(12-93)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
ConcurrentPerPartitionCursor(99-596)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (5)
airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
CursorFactory(19-24)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor(73-353)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py (1)
DeclarativeCursor(9-13)
unit_tests/sources/declarative/checks/test_check_stream.py (3)
unit_tests/sources/test_source.py (2)
source(65-66)check(44-45)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
check(487-505)airbyte_cdk/sources/abstract_source.py (1)
check(92-99)
airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py (5)
airbyte_cdk/sources/source.py (1)
Source(55-95)airbyte_cdk/sources/declarative/checks/check_stream.py (1)
evaluate_availability(17-29)airbyte_cdk/sources/declarative/checks/connection_checker.py (1)
ConnectionChecker(12-38)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
streams(389-421)airbyte_cdk/sources/streams/core.py (1)
Stream(118-703)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
streams(389-421)check(487-505)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (2)
streams(301-338)check(412-414)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
state(573-574)state(577-580)SimpleRetriever(53-602)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (5)
iterate_with_last_flag_and_state(19-49)Timer(52-70)start(60-61)finish(63-67)is_running(69-70)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (8)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream(2406-2473)DatetimeBasedCursor(1781-1889)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (1)
CursorFactory(19-24)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py (1)
GlobalSubstreamCursor(73-353)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)airbyte_cdk/sources/declarative/partition_routers/partition_router.py (1)
PartitionRouter(14-62)airbyte_cdk/sources/declarative/requesters/request_options/default_request_options_provider.py (1)
DefaultRequestOptionsProvider(15-60)
airbyte_cdk/sources/declarative/checks/check_stream.py (1)
airbyte_cdk/sources/source.py (1)
Source(55-95)
unit_tests/sources/declarative/custom_state_migration.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
DeclarativeStream(2406-2473)Config(136-137)Config(150-151)Config(164-165)Config(178-179)Config(192-193)Config(206-207)Config(220-221)Config(234-235)Config(248-249)Config(262-263)Config(276-277)Config(292-293)Config(306-307)Config(320-321)Config(354-355)Config(378-379)CustomStateMigration(291-302)
🪛 GitHub Actions: Pytest (Fast)
unit_tests/connector_builder/test_connector_builder_handler.py
[error] 820-820: Test 'test_config_update' failed: latest_config_update was None.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Validate PR title
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a couple of comments. Should we redo a bit of regression tests once this is merged with the removal of DeclarativeStream stuff?
unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2065-2083: TypeError risk: passing both stream_slicer and partition_router into retriever constructioncreate_simple_retriever no longer accepts stream_slicer, so this call will raise got an unexpected keyword argument 'stream_slicer' when retriever is SimpleRetrieverModel. Can we branch by retriever type and pass only the param each constructor supports, wdyt?
Apply:
- retriever = self._create_component_from_model( - model=model.retriever, - config=config, - name=model.name, - primary_key=primary_key, - request_options_provider=request_options_provider, - stream_slicer=stream_slicer, - partition_router=partition_router, - stop_condition_cursor=concurrent_cursor - if self._is_stop_condition_on_cursor(model) - else None, - client_side_incremental_sync={"cursor": concurrent_cursor} - if self._is_client_side_filtering_enabled(model) - else None, - transformations=transformations, - file_uploader=file_uploader, - incremental_sync=model.incremental_sync, - ) + common_kwargs = dict( + model=model.retriever, + config=config, + name=model.name, + primary_key=primary_key, + request_options_provider=request_options_provider, + stop_condition_cursor=concurrent_cursor if self._is_stop_condition_on_cursor(model) else None, + client_side_incremental_sync={"cursor": concurrent_cursor} if self._is_client_side_filtering_enabled(model) else None, + transformations=transformations, + file_uploader=file_uploader, + incremental_sync=model.incremental_sync, + ) + if isinstance(model.retriever, AsyncRetrieverModel): + retriever = self._create_component_from_model(**common_kwargs, stream_slicer=stream_slicer) + else: + retriever = self._create_component_from_model(**common_kwargs, partition_router=partition_router)
3868-3875: Potential mismatch: passing stream_slicer to retriever may break when retriever is SimpleRetrieverModelSame issue as above: SimpleRetriever expects partition_router, AsyncRetriever expects stream_slicer. Can we branch on model.retriever type here too, wdyt?
- retriever = self._create_component_from_model( - model=model.retriever, - config=config, - name=f"{stream_name if stream_name else '__http_components_resolver'}", - primary_key=None, - stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config), - transformations=[], - ) + slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) + if isinstance(model.retriever, AsyncRetrieverModel): + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name=f"{stream_name if stream_name else '__http_components_resolver'}", + primary_key=None, + stream_slicer=slicer, + transformations=[], + ) + else: + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name=f"{stream_name if stream_name else '__http_components_resolver'}", + primary_key=None, + partition_router=slicer, + transformations=[], + )
🧹 Nitpick comments (5)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (4)
715-721: Prefer name-based stream lookup to avoid brittle index assertionsIndexing into
streams[...]ties tests to ordering in the manifest/resolution and makes them fragile when streams are added/removed/reordered. Can we key bynameinstead, and apply the same pattern in the other spots listed above, wdyt?Example for test_create_concurrent_cursor (apply similar changes in the other call sites):
- streams = source.streams(config=_CONFIG) - - party_members_stream = streams[0] + streams_by_name = {s.name: s for s in source.streams(config=_CONFIG)} + party_members_stream = streams_by_name["party_members"] @@ - locations_stream = streams[2] + locations_stream = streams_by_name["locations"] @@ - incremental_counting_stream = streams[7] + incremental_counting_stream = streams_by_name["incremental_counting_stream"]Also applies to: 733-737, 758-762, 1393-1396, 1637-1639, 1694-1697, 1754-1757, 2974-2983, 3853-3885, 4564-4567
1597-1606: Use the locally mutated manifest variable (minor correctness/clarity)You deepcopy
_MANIFESTintomanifestand mutate it, but then pass_MANIFESTtoConcurrentDeclarativeSource. This works today but is confusing and error-prone. Shall we passmanifestinstead, wdyt?- source = ConcurrentDeclarativeSource( - source_config=_MANIFEST, config=config, catalog=catalog, state=[] - ) + source = ConcurrentDeclarativeSource( + source_config=manifest, config=config, catalog=catalog, state=[] + )
2183-2187: Align mocked response bodies with configured record_selector pathsSeveral checks patch
SimpleRetriever._fetch_next_pagebut return payloads under keys that don’t match the configuredrecord_selector(e.g., using"records"or"students"while the selector is["result"]). It currently passes because the check logic doesn’t assert on extracted records, but keeping shapes consistent reduces future flakiness if check behavior evolves. Update the fixtures to use"result", wdyt?- pages = [_create_page({"records": [{"id": 0}], "_metadata": {}})] + pages = [_create_page({"result": [{"id": 0}], "_metadata": {}})]- pages = [ - _create_page( - { - "students": [{"id": 0, "first_name": "yu", "last_name": "narukami"}], - "_metadata": {}, - } - ) - ] + pages = [ + _create_page( + { + "result": [{"id": 0, "first_name": "yu", "last_name": "narukami"}], + "_metadata": {}, + } + ) + ]Also applies to: 2962-2973
1964-1969: Guard spec.yaml writes to avoid cross-test interferenceThe fixture writes to a fixed
spec.yamlpath under the tests tree. If tests run in parallel (xdist) or another test relies on that file, this can race. Would you consider backing up and restoring if the file already exists (or marking the test serial), wdyt?- with open(yaml_path, "w") as f: - f.write(yaml.dump(spec)) - yield - os.remove(yaml_path) + backup = None + if os.path.exists(yaml_path): + backup = yaml_path + ".bak" + os.replace(yaml_path, backup) + try: + with open(yaml_path, "w") as f: + f.write(yaml.dump(spec)) + yield + finally: + try: + os.remove(yaml_path) + finally: + if backup and os.path.exists(backup): + os.replace(backup, yaml_path)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2616-2632: Stub for IncrementingCountCursor: emit an explicit runtime warningSince this returns a DatetimeBasedCursor as a stub, can we warn loudly to aid debugging if it ever leaks to runtime, wdyt?
def create_incrementing_count_cursor( model: IncrementingCountCursorModel, config: Config, **kwargs: Any ) -> DatetimeBasedCursor: - # This should not actually get used anywhere at runtime, but needed to add this to pass checks since + # This should not actually get used anywhere at runtime, but needed to add this to pass checks since # we still parse models into components. The issue is that there's no runtime implementation of a # IncrementingCountCursor. # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. + logging.getLogger(__name__).warning( + "IncrementingCountCursor is deprecated in CDS runtime; returning placeholder DatetimeBasedCursor. " + "If you see this at runtime, please migrate off IncrementingCountCursor." + ) return DatetimeBasedCursor( cursor_field=model.cursor_field, datetime_format="%Y-%m-%d", start_datetime="2024-12-12", config=config, parameters={}, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py(1 hunks)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py(1 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(3 hunks)unit_tests/connector_builder/test_connector_builder_handler.py(1 hunks)unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py(1 hunks)unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py(0 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(2 hunks)unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py(1 hunks)unit_tests/sources/declarative/test_concurrent_declarative_source.py(15 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (6)
- unit_tests/connector_builder/test_connector_builder_handler.py
- airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py
- unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py
- unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
- airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py
- airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
🧬 Code graph analysis (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (5)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py (2)
CursorFactory(19-24)PerPartitionCursor(27-367)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py (1)
PerPartitionWithGlobalCursor(22-199)airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py (1)
ResumableFullRefreshCursor(12-93)
unit_tests/sources/declarative/test_concurrent_declarative_source.py (5)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream(2406-2473)SimpleRetriever(2786-2840)airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
streams(389-421)check(487-505)airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py (2)
streams(301-338)check(412-414)airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)
state(573-574)state(577-580)SimpleRetriever(53-602)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DeclarativeStream(2406-2473)DatetimeBasedCursor(1781-1889)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
54-54: ConcurrentPerPartitionCursor import path is correct
Verified with a ripgrep search that no imports of ConcurrentPerPartitionCursor remain underairbyte_cdk.legacy; approving the change.
18-25: Import path hygiene confirmed. All DeclarativeStream and cursor imports now reference airbyte_cdk.legacy and no stray non-legacy imports remain. LGTM!airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
3249-3254: Nice: routing DefaultRequestOptionsProvider through PartitionRouterGood improvement to avoid request option leakage across partitions. LGTM.
3148-3149: CustomIncrementalSync removal fully addressedThe migration guide in
cdk-migrations.mdclearly documents the removal in v7.0.0, and a grep over the codebase shows no remainingCustomIncrementalSyncreferences. wdyt, shall we resolve?
36-39: Update imports to legacy path and add deprecation notesThere are still direct imports from
airbyte_cdk.sources.declarative.declarative_streamand…incremental(e.g., in unit tests,incremental/__init__.py,model_to_component_factory.py), which should be changed toairbyte_cdk.legacy.sources.declarativeequivalents. Also ensure user-facing deprecation warnings are documented in the v7 release notes and relevant docs. wdyt?⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#174 File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102 Timestamp: 2025-01-14T00:20:32.310Z Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65 Timestamp: 2024-11-15T01:04:21.272Z Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15 Timestamp: 2024-11-15T00:59:08.154Z Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
unit_tests/sources/streams/concurrent/test_cursor.py (1)
23-35: Nice decoupling from legacy cursors.Switching tests to work directly with
ConcurrentCursorand concurrent state converters addresses the past concern about instantiatingDatetimeBasedCursorjust to extract values. LGTM.airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
373-382: Add a debug log when no streams selected (keep using initialized config).Totally get the intent to prefer
self._config. For observability, can we log when nothing is read to help users debug catalogs? This was mentioned earlier; reiterating just the logging piece, wdyt?if len(selected_concurrent_streams) > 0: yield from self._concurrent_source.read(selected_concurrent_streams) + else: + logger.debug("No streams selected in catalog; skipping concurrent read.")
383-386: Discover: acknowledge deprecation and add a gentle log?Previously suggested using
config or self._config; since you’re intentionally deprecating the arg, would adding a debug to signal it’s ignored help avoid confusion, wdyt?def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: + if config and config is not self._config: + logger.debug("discover(config=...) is deprecated; using initialized config.") return AirbyteCatalog( streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] )
487-509: Gracefully handle missing/unknown check types and improve error message.Could we default to
CheckStreamwhencheckis absent and validate the type mapping to avoid aKeyError? Also, usingstr(error)yields a cleaner user message thanrepr(error), wdyt?- check = self._source_config.get("check") - if not check: - raise ValueError(f"Missing 'check' component definition within the manifest.") + check = dict(self._source_config.get("check", {})) if "type" not in check: check["type"] = "CheckStream" - connection_checker = self._constructor.create_component( - COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], + checker_model = COMPONENTS_CHECKER_TYPE_MAPPING.get(check["type"]) + if not checker_model: + raise ValueError(f"Unknown check type '{check['type']}'. Expected one of {list(COMPONENTS_CHECKER_TYPE_MAPPING.keys())}.") + connection_checker = self._constructor.create_component( + checker_model, check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages, ) @@ - check_succeeded, error = connection_checker.check_connection(self, logger, self._config) + check_succeeded, error = connection_checker.check_connection(self, logger, self._config) if not check_succeeded: - return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) + return AirbyteConnectionStatus(status=Status.FAILED, message=str(error)) return AirbyteConnectionStatus(status=Status.SUCCEEDED)
🧹 Nitpick comments (14)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
18-19: Legacy imports look right; optionally avoid runtime dependency on DeclarativeStreamShifting DeclarativeStream/DatetimeBasedCursor to legacy matches the v7 plan. Since DeclarativeStream is only used for typing here, would you guard that import behind TYPE_CHECKING to avoid importing legacy code at runtime, wdyt?
Apply within-range change:
-from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream +if TYPE_CHECKING: + from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStreamAnd outside this hunk, update typing import and the annotation:
# near line 9 from typing import Any, Iterable, Mapping, Optional, Union, TYPE_CHECKING # near lines 4664-4670 def get_retriever(stream: Union["DeclarativeStream", DefaultStream]): ...
742-742: Fix nit: TODO wording mismatches (class vs test) and tie to v7This references a “class” but the next item is a test function. Update the note and mention the v7 removal for clarity, wdyt?
-# todo: delete this class once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods +# TODO: delete this test once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods (CDK v7)unit_tests/sources/streams/concurrent/test_cursor.py (6)
53-60: Defaulting_stream_nameto aMockcan mask issues; default to a real name instead.Returning a
Mockfrompartition.stream_name()can leak into assertions/logs; using the stable_A_STREAM_NAMEis safer. Wdyt?-def _partition( - _slice: Optional[Mapping[str, Any]], _stream_name: Optional[str] = Mock() -) -> Partition: +def _partition( + _slice: Optional[Mapping[str, Any]], _stream_name: str = _A_STREAM_NAME +) -> Partition: partition = Mock(spec=Partition) partition.to_slice.return_value = _slice partition.stream_name.return_value = _stream_name return partition
62-70: Avoid defaulting parameters toMockinstances.Default arguments are evaluated once; the same
Mockwill be reused across calls. Let’s default toNoneand create a partition on demand, ensuring isolation. Wdyt?-def _record( - cursor_value: CursorValueType, partition: Optional[Partition] = Mock(spec=Partition) -) -> Record: +def _record( + cursor_value: CursorValueType, partition: Optional[Partition] = None +) -> Record: + if partition is None: + partition = _partition(_NO_SLICE) return Record( data={_A_CURSOR_FIELD_KEY: cursor_value}, associated_slice=partition.to_slice(), stream_name=_A_STREAM_NAME, )
1214-1221: Unifystate_typeto the enum value for consistency.There’s a mix of
"date-range"string literals andConcurrencyCompatibleStateType.date_range.value. Prefer the enum for resilience to typos/refactors. Apply similarly to other occurrences in this file, wdyt?- { - "slices": [], - "state_type": "date-range", - }, + { + "slices": [], + "state_type": ConcurrencyCompatibleStateType.date_range.value, + },
1009-1034: Add a type hint forend_providerin_cursorhelper.It’s passed a
ClampingEndProvider; hinting improves readability and editor help. Wdyt?- def _cursor( + def _cursor( self, start: datetime, - end_provider, + end_provider: ClampingEndProvider, slice_range: timedelta, granularity: Optional[timedelta], clamping_strategy: ClampingStrategy, ) -> ConcurrentCursor:
1199-1210: Assert style nit: preferassert/assert notover== True/False.Cleaner and yields better failure messages. Wdyt?
- assert ( - cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: state_value - 1}, stream_name="test_stream") - ) - == False - ) - assert ( - cursor.should_be_synced( - Record(data={_A_CURSOR_FIELD_KEY: state_value}, stream_name="test_stream") - ) - == True - ) + assert not cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: state_value - 1}, stream_name="test_stream") + ) + assert cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: state_value}, stream_name="test_stream") + )
258-349: Consider a small factory to reduce repeatedConcurrentCursorconstruction.Many tests inline nearly identical constructors; extracting a parameterized factory (start/state/slice_range/granularity) would trim noise and make intent pop. Want me to draft it, wdyt?
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (6)
245-257: Docstring return type mismatch.
_pre_process_manifestreturns a dict but the docstring saysNone. Can we fix the docstring to prevent confusion in readers and tools, wdyt?- Returns: - None + Returns: + Dict[str, Any]: The fully resolved and transformed manifest.
290-295: Docstring claims a return value but method returns None.The method mutates state in place and returns nothing. Adjusting the docstring would remove ambiguity, wdyt?
- The original manifest is returned if any error occurs during migration. + Mutates self._source_config in place. If migration fails, the previous manifest remains.
346-349: Name the effective config and warn about deprecation.Since the goal is to prefer
self._config, would you like to (a) avoid shadowing the param and (b) emit a deprecation hint if the caller passesconfigwhich is ignored, wdyt?- def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: - config = self._config or config - return super().configure(config, temp_dir) + def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: + effective_config = self._config or config + if config and config is not self._config: + self.logger.debug("configure(config=...) is deprecated; using initialized config.") + return super().configure(effective_config, temp_dir)
388-401: Tighten signature and make the unused param explicit?Now that this class only deals with concurrent
AbstractStreams, would you like to (a) narrow the return type and (b) mark theconfigparam as intentionally unused to reduce confusion, wdyt?- def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder + def streams(self, _config: Mapping[str, Any]) -> List[AbstractStream]: # We only return concurrent-compatible streams @@ - if self._spec_component: - self._spec_component.validate_config(self._config) + if self._spec_component: + self._spec_component.validate_config(self._config)If external callers still rely on
Union[Stream, AbstractStream], we can keep the return type but add a docstring note that only concurrent-compatible streams are produced now. Wdyt?Also applies to: 409-421
525-531: Pass manifest parameters into ConditionalStreams evaluation?
InterpolatedBoolean(..., parameters={})ignores manifest-level params. Should we forwardself._source_config.get("parameters", {})(or per-stream parameters) so conditions can reference them, wdyt?- interpolated_boolean = InterpolatedBoolean( - condition=current_stream_config.get("condition"), - parameters={}, - ) + interpolated_boolean = InterpolatedBoolean( + condition=current_stream_config.get("condition"), + parameters=self._source_config.get("parameters", {}), + )
378-381: Optional: track the “empty streams causes loop” with a TODO or issue link.Since this notes an upstream behavior, would you like to add a TODO with an issue reference so we don’t lose it during the v7 migration, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(8 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(3 hunks)unit_tests/sources/streams/concurrent/test_cursor.py(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
unit_tests/sources/declarative/parsers/test_model_to_component_factory.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
48-48: Update deprecated declarative.incremental imports
Multiple tests and source files still import symbols (ConcurrentPerPartitionCursor,CursorFactory,PerPartitionWithGlobalCursor,ResumableFullRefreshCursor) fromairbyte_cdk.sources.declarative.incremental. Can you migrate these to the new export paths or remove legacy exports? wdyt?⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65 Timestamp: 2024-11-15T01:04:21.272Z Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.unit_tests/sources/streams/concurrent/test_cursor.py (1)
505-560: Confirm inclusive/exclusive upper-bound semantics with granularity.With a 1-second granularity, earlier slices use
[20,29]/[30,39]but the last slice is[40,50](not[40,49]). Is the final upper bound intentionally inclusive, or should all be consistent? Can you double-check againstConcurrentCursor’s contract, wdyt?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
46-47: No logger fallback neededThe base
Sourceclass defines a@property namereturning the class’s__name__, soself.nameis always available at initialization. We can safely uselogging.getLogger(f"airbyte.{self.name}")without a fallback—wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
372-381: Add a debug log when no streams are selected to avoid silent no-opWhen
selected_concurrent_streamsis empty,readreturns without emitting anything. Could we log a debug line to aid troubleshooting, without changing the config handling? Wdyt?Apply this diff:
if len(selected_concurrent_streams) > 0: yield from self._concurrent_source.read(selected_concurrent_streams) + else: + self.logger.debug("No streams selected in catalog; skipping concurrent read.")
486-508: Prefer a backward-compatible default for missing 'check' block (or confirm v7 intention)Currently a missing
checkraises, which could break existing manifests that relied on implicitCheckStream. Do we want to default toCheckStreamwhencheckis absent to preserve behavior, or is this an intentional v7 break? If it’s intentional, can we mention it in the migration notes? Wdyt?Apply this diff if we want the non-breaking path:
- check = self._source_config.get("check") - if not check: - raise ValueError(f"Missing 'check' component definition within the manifest.") + check = dict(self._source_config.get("check", {})) + # Default to CheckStream if not provided, for compatibility + if "type" not in check: + check["type"] = "CheckStream"
🧹 Nitpick comments (28)
unit_tests/sources/declarative/extractors/test_dpath_extractor.py (1)
11-12: Unify imports via package init for consistency?If
JsonLineParseris re-exported fromairbyte_cdk.sources.declarative.decoders.__init__, would you like to import it from the same package path to keep all decoder imports uniform, wdyt?-from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, Decoder, JsonLineParserunit_tests/sources/streams/concurrent/test_partition_reader.py (4)
32-44: Deflake: add a timeout and assert the queue is empty to truly enforce “only sentinel”
Queue.get()can block forever if a regression drops the sentinel. Also, the test doesn’t currently assert that nothing else was queued. Shall we add a timeout and an emptiness check, wdyt?- while queue_item := self._queue.get(): - if not isinstance(queue_item, PartitionCompleteSentinel): - pytest.fail("Only one PartitionCompleteSentinel is expected") - break + queue_item = self._queue.get(timeout=2) + if not isinstance(queue_item, PartitionCompleteSentinel): + pytest.fail("Only one PartitionCompleteSentinel is expected") + assert self._queue.empty()
62-67: Stabilize mock expectations with spec and explicit stream nameUsing a bare
Mock()may yield surprising equality behavior forpartition.stream_name()inStreamThreadException. Would you set a spec and a concrete return value to make this test stricter and less brittle, wdyt?- partition = Mock() + partition = Mock(spec=Partition) + partition.stream_name.return_value = "stream" cursor = Mock() exception = ValueError() partition.read.side_effect = self._read_with_exception(_RECORDS, exception) self._partition_reader.process_partition(partition, cursor) @@ - assert queue_content == _RECORDS + [ - StreamThreadException(exception, partition.stream_name()), + assert queue_content == _RECORDS + [ + StreamThreadException(exception, "stream"), PartitionCompleteSentinel(partition), ]Also applies to: 70-73
114-121: Deflake: avoid potential infinite wait in _consume_queueSame blocking-risk here; adding a timeout and handling
Emptykeeps the test from hanging on regressions. Good to keep CI resilient, wdyt?-from queue import Queue +from queue import Queue, Empty @@ - while queue_item := self._queue.get(): - queue_content.append(queue_item) - if isinstance(queue_item, PartitionCompleteSentinel): - break + while True: + try: + queue_item = self._queue.get(timeout=2) + except Empty: + pytest.fail("Timed out waiting for queue item") + queue_content.append(queue_item) + if isinstance(queue_item, PartitionCompleteSentinel): + breakAlso applies to: 4-4
75-78: Naming nit: align with “partition” terminologyTest name still says “close_slice.” For consistency with the code and sentinel type, shall we rename to
test_given_exception_from_close_partition_when_process_partition_then_queue_records_and_exception_and_sentinel, wdyt?airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
382-385: Document and surface that discover ignores the incoming configGiven the plan to deprecate the
configparameter, could we log once at debug level when a non-emptyconfigdiffers fromself._config, and add a short docstring note so callers aren’t surprised? Wdyt?Apply this diff:
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - return AirbyteCatalog( + # NOTE: discover ignores the incoming `config` and relies on the initialized config. + if config and config != self._config: + self.logger.debug("discover(config=...) argument is ignored; using initialized config.") + return AirbyteCatalog( streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] )
504-507: Use str(error) for user-facing message
repr(error)can include noisy type wrappers. Wouldstr(error)be cleaner for end users?Apply this diff:
- if not check_succeeded: - return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) + if not check_succeeded: + return AirbyteConnectionStatus(status=Status.FAILED, message=str(error))
566-568: Be defensive when toggling use_cache in dynamic resolverSome manifests may omit
requesterunderretriever. Should we guard these writes to avoid KeyErrors on misconfigurations, while still keeping the happy path unchanged?Apply this diff:
- if "retriever" in components_resolver_config: - components_resolver_config["retriever"]["requester"]["use_cache"] = True + retriever_cfg = components_resolver_config.get("retriever") + if isinstance(retriever_cfg, dict): + requester_cfg = retriever_cfg.get("requester") + if isinstance(requester_cfg, dict): + requester_cfg["use_cache"] = True
463-473: Optional: safer cache flagging for parent streamsDirect indexing into nested structures assumes exact shapes. Would using
get/type checks reduce surprises from schema drift in manifests, while leaving behavior intact?Apply this diff:
- if stream_config["type"] == "StateDelegatingStream": - stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( - True - ) - stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( - True - ) - else: - stream_config["retriever"]["requester"]["use_cache"] = True + if stream_config.get("type") == "StateDelegatingStream": + fr = stream_config.get("full_refresh_stream", {}).get("retriever", {}).get("requester") + inc = stream_config.get("incremental_stream", {}).get("retriever", {}).get("requester") + if isinstance(fr, dict): + fr["use_cache"] = True + if isinstance(inc, dict): + inc["use_cache"] = True + else: + requester = stream_config.get("retriever", {}).get("requester") + if isinstance(requester, dict): + requester["use_cache"] = Trueairbyte_cdk/sources/declarative/requesters/http_job_repository.py (6)
143-149: Preferis Noneover truthiness to avoid conflating non-2xx Responses with “no response.”This keeps semantics consistent with
_get_validated_polling_response. Shall we align it, wdyt?- if not response: + if response is None: raise AirbyteTracedException( internal_message="Always expect a response or an exception from creation_requester", failure_type=FailureType.system_error, )
62-74: Fix “pooling” typos in docstring.Minor text nit for clarity, wdyt?
- Validates and retrieves the pooling response for a given stream slice. + Validates and retrieves the polling response for a given stream slice. @@ - stream_slice (StreamSlice): The stream slice to send the pooling request for. + stream_slice (StreamSlice): The stream slice to send the polling request for. @@ - requests.Response: The validated pooling response. + requests.Response: The validated polling response.
286-297: PEP 8 membership style and tiny readability polish.Flip membership checks for readability; no behavior change, wdyt?
- if not "headers" in creation_response_context: + if "headers" not in creation_response_context: creation_response_context["headers"] = self._create_job_response_by_id[ job.api_job_id() ].headers - if not "request" in creation_response_context: + if "request" not in creation_response_context: creation_response_context["request"] = self._create_job_response_by_id[ job.api_job_id() ].request- if not "headers" in polling_response_context: + if "headers" not in polling_response_context: polling_response_context["headers"] = self._polling_job_response_by_id[ job.api_job_id() ].headers - if not "request" in polling_response_context: + if "request" not in polling_response_context: polling_response_context["request"] = self._polling_job_response_by_id[ job.api_job_id() ].requestAlso applies to: 312-320
259-269: Typo:delete_job_reponse→delete_job_response.Purely cosmetic but helps grep-ability, wdyt?
- delete_job_reponse = self.delete_requester.send_request( + delete_job_response = self.delete_requester.send_request( stream_slice=self._get_create_job_stream_slice(job), - log_formatter=lambda delete_job_reponse: format_http_message( - response=delete_job_reponse, + log_formatter=lambda delete_job_response: format_http_message( + response=delete_job_response, title="Async Job -- Delete", description="Delete the specified job from the list of Jobs.", stream_name=None, is_auxiliary=True, type="ASYNC_DELETE", ), )
273-275: Avoid KeyError on cleanup usingpop(..., None).This is safer if
delete()is called before a polling response is recorded, wdyt?- del self._create_job_response_by_id[job_id] - del self._polling_job_response_by_id[job_id] + self._create_job_response_by_id.pop(job_id, None) + self._polling_job_response_by_id.pop(job_id, None)
237-237: Remove no-opyield from [].Redundant; can be dropped, wdyt?
- yield from [] + # no-opunit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py (4)
21-46: Drop the unusedtest_namearg and use pytest ids for readability.The
test_nameparam isn’t used and duplicates some labels. Usingpytest.param(..., id=...)keeps parametrization self-documenting. Apply?-@pytest.mark.parametrize( - "test_name, header, header_value, regex, expected_backoff_time", - [ - ("test_wait_time_from_header", "wait_time", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME), - ("test_wait_time_from_header_string", "wait_time", "60", None, SOME_BACKOFF_TIME), - ( - "test_wait_time_from_header_parameters", - "{{ parameters['wait_time'] }}", - "60", - None, - SOME_BACKOFF_TIME, - ), - ( - "test_wait_time_from_header_config", - "{{ config['wait_time'] }}", - "60", - None, - SOME_BACKOFF_TIME, - ), - ("test_wait_time_from_header_not_a_number", "wait_time", "61,60", None, None), - ("test_wait_time_from_header_with_regex", "wait_time", "61,60", r"([-+]?\d+)", 61), # noqa - ("test_wait_time_fœrom_header_with_regex_no_match", "wait_time", "...", "[-+]?\d+", None), # noqa - ("test_wait_time_from_header", "absent_header", None, None, None), - ], -) -def test_wait_time_from_header(test_name, header, header_value, regex, expected_backoff_time): +@pytest.mark.parametrize( + "header, header_value, regex, expected_backoff_time", + [ + pytest.param("wait_time", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME, id="int-header"), + pytest.param("wait_time", "60", None, SOME_BACKOFF_TIME, id="str-header"), + pytest.param("{{ parameters['wait_time'] }}", "60", None, SOME_BACKOFF_TIME, id="parameter-ref"), + pytest.param("{{ config['wait_time'] }}", "60", None, SOME_BACKOFF_TIME, id="config-ref"), + pytest.param("wait_time", "61,60", None, None, id="not-a-number"), + pytest.param("wait_time", "61,60", r"([-+]?\d+)", 61, id="regex-first-number"), + pytest.param("wait_time", "...", "[-+]?\d+", None, id="regex-no-match"), + pytest.param("absent_header", None, None, None, id="missing-header"), + ], +) +def test_wait_time_from_header(header, header_value, regex, expected_backoff_time):Also applies to: 46-46
41-44: Minor label nits: fix typo and duplicate label while you’re here?The id string “test_wait_time_fœrom_header_with_regex_no_match” includes a typo (“fœrom”), and “test_wait_time_from_header” appears twice. If you adopt parametrization ids (prior comment), these go away—otherwise, shall we correct them, wdyt?
47-49: Mimic requests’ case-insensitive headers to avoid surprises.Requests’
Response.headersis aCaseInsensitiveDict. Using it in the mock keeps behavior realistic. Update?+from requests.structures import CaseInsensitiveDict @@ - response_mock = MagicMock(spec=Response) - response_mock.headers = {"wait_time": header_value} + response_mock = MagicMock(spec=Response) + response_mock.headers = CaseInsensitiveDict({"wait_time": header_value})
70-80: Add boundary test for== max_waiting_time_in_seconds.You cover “< max” and “> max”. Adding “== max” locks intended behavior. Should it return
maxor raise? Suggest returningmax. Add?+def test_given_retry_after_equal_to_max_time_then_use_max_time(): + response_mock = MagicMock(spec=Response) + response_mock.headers = {_A_RETRY_HEADER: str(_A_MAX_TIME)} + backoff_strategy = WaitTimeFromHeaderBackoffStrategy( + header=_A_RETRY_HEADER, max_waiting_time_in_seconds=_A_MAX_TIME, parameters={}, config={} + ) + assert backoff_strategy.backoff_time(response_mock, 1) == _A_MAX_TIMEunit_tests/sources/declarative/async_job/test_job_orchestrator.py (5)
59-64: Fix misnamed test: it returns COMPLETED, not RUNNING.Rename for clarity to match the asserted status, wdyt?
-def test_given_only_completed_jobs_when_status_then_return_running(self) -> None: +def test_given_only_completed_jobs_when_status_then_return_completed(self) -> None:
320-361: This method won’t run (missingtest_prefix) and is timing-sensitive; rename and patch sleep?Renaming ensures pytest collects it; patching
sleepbrings it in line with adjacent tests and reduces flakiness. Apply?- def given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( - self, - ) -> None: + @mock.patch(sleep_mock_target) + def test_given_budget_already_taken_before_start_when_create_and_get_completed_partitions_then_wait_for_budget_to_be_freed( + self, mock_sleep: MagicMock + ) -> None: @@ - threading.Thread(target=wait_and_free_intent, args=[job_tracker, intent_to_free]).start() - partitions = list(orchestrator.create_and_get_completed_partitions()) + t = threading.Thread(target=wait_and_free_intent, args=[job_tracker, intent_to_free]) + t.start() + partitions = list(orchestrator.create_and_get_completed_partitions()) + t.join()
26-29: Use realStreamSliceobjects to exercise types and reduce mocking.Switching from bare
Mock()s to minimalStreamSliceinstances tightens the contract and future-proofs assertions. Interested?-_ANY_STREAM_SLICE = Mock() -_A_STREAM_SLICE = Mock() -_ANOTHER_STREAM_SLICE = Mock() +_ANY_STREAM_SLICE = StreamSlice(partition={"p": "any"}, cursor_slice={"c": 0}) +_A_STREAM_SLICE = StreamSlice(partition={"p": "a"}, cursor_slice={"c": 1}) +_ANOTHER_STREAM_SLICE = StreamSlice(partition={"p": "b"}, cursor_slice={"c": 2})
31-31: Trim the buffer to speed tests.
_BUFFER = 10000may slow the tight loop; could we drop to 100 without losing intent, wdyt?-_BUFFER = 10000 # this buffer allows us to be unconcerned with the number of times the update status is called +_BUFFER = 100 # sufficient buffer for status updates in tests
66-78: Type hint consistency: prefertyping.Set(or drop the import).You import
Setbut annotate withset[...]. For consistency (and older tooling), shall we switch toSet[AsyncJob], wdyt?-) -> Callable[[set[AsyncJob]], None]: +) -> Callable[[Set[AsyncJob]], None]:unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
742-742: Nit: fix the TODO wording (it’s a test, not a class).Minor clarity tweak so future readers don’t trip over this. Also, would you like to track this with a GH issue ID in the comment, wdyt?
-# todo: delete this class once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods +# TODO: delete this test once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods
17-18: Add a re-export for DeclarativeStream and consider aliasing legacy imports?
- DeclarativeStream is defined in
legacy/sources/declarative/declarative_stream.pybut not re-exported fromlegacy/sources/declarative/__init__.py, so your explicit import is required—would you addto the package init to mirror the incremental export?from .declarative_stream import DeclarativeStream- DatetimeBasedCursor is already re-exported in
legacy/sources/declarative/incremental/__init__.py, so that import works. To avoid confusion with its model counterpart, would you alias it aswdyt?from airbyte_cdk.legacy.sources.declarative.incremental import DatetimeBasedCursor as LegacyDatetimeBasedCursorairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1973-1976: Return type now AbstractStream — align annotations and drop unused importcreate_default_stream now returns AbstractStream. However, create_state_delegating_stream is still annotated to return DeclarativeStream while it will also produce DefaultStream via the same mapping. To avoid confusion and type ignores, shall we update the annotation and remove the now-unneeded DeclarativeStream import, wdyt?
Apply:
- from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream + # (import no longer needed if not referenced elsewhere) @@ - def create_state_delegating_stream( + def create_state_delegating_stream( self, model: StateDelegatingStreamModel, config: Config, has_parent_state: Optional[bool] = None, **kwargs: Any, - ) -> DeclarativeStream: + ) -> AbstractStream: @@ - return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel + return self._create_component_from_model(stream_model, config=config, **kwargs)Also applies to: 3381-3395, 36-39
2616-2633: Stub for IncrementingCountCursor: prefer fail-fast or a stable epoch baselineThis stub returns a DatetimeBasedCursor with a hard-coded 2024-12-12 start. That can cause surprising behavior if it’s accidentally used outside tests. Would you either fail fast or choose a neutral epoch to minimize impact, wdyt?
Option A (fail fast):
def create_incrementing_count_cursor( model: IncrementingCountCursorModel, config: Config, **kwargs: Any ) -> DatetimeBasedCursor: - # This should not actually get used anywhere at runtime, but needed to add this to pass checks since - # we still parse models into components. The issue is that there's no runtime implementation of a - # IncrementingCountCursor. - # A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor. - return DatetimeBasedCursor( - cursor_field=model.cursor_field, - datetime_format="%Y-%m-%d", - start_datetime="2024-12-12", - config=config, - parameters={}, - ) + # This path should never be hit at runtime; IncrementingCount is only supported via ConcurrentCursor. + raise NotImplementedError("IncrementingCountCursor has no legacy runtime; use concurrent path.")Option B (safer default if fail-fast breaks checks):
- start_datetime="2024-12-12", + start_datetime="1970-01-01",[submitter can pick one]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (13)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(9 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(4 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py(2 hunks)airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py(1 hunks)unit_tests/sources/declarative/async_job/test_integration.py(1 hunks)unit_tests/sources/declarative/async_job/test_job_orchestrator.py(1 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py(1 hunks)unit_tests/sources/declarative/extractors/test_dpath_extractor.py(1 hunks)unit_tests/sources/declarative/interpolation/test_jinja.py(1 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(4 hunks)unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py(1 hunks)unit_tests/sources/declarative/retrievers/test_simple_retriever.py(1 hunks)unit_tests/sources/streams/concurrent/test_partition_reader.py(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py
🚧 Files skipped from review as they are similar to previous changes (1)
- unit_tests/sources/declarative/retrievers/test_simple_retriever.py
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.pyunit_tests/sources/declarative/async_job/test_integration.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-15T00:59:08.154Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15
Timestamp: 2024-11-15T00:59:08.154Z
Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.
Applied to files:
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.pyunit_tests/sources/declarative/async_job/test_integration.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.pyunit_tests/sources/declarative/async_job/test_job_orchestrator.pyunit_tests/sources/declarative/async_job/test_integration.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.py
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyairbyte_cdk/sources/declarative/concurrent_declarative_source.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.py
🧬 Code graph analysis (11)
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1)
airbyte_cdk/sources/declarative/yaml_declarative_source.py (1)
YamlDeclarativeSource(17-69)
unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py (1)
airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException(25-145)
unit_tests/sources/declarative/async_job/test_job_orchestrator.py (2)
airbyte_cdk/sources/types.py (1)
StreamSlice(75-169)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException(25-145)
unit_tests/sources/declarative/interpolation/test_jinja.py (1)
airbyte_cdk/sources/types.py (1)
StreamSlice(75-169)
unit_tests/sources/declarative/async_job/test_integration.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
DeclarativeStream(2406-2473)AsyncRetriever(2843-2946)InlineSchemaLoader(825-832)airbyte_cdk/sources/abstract_source.py (1)
AbstractSource(52-326)airbyte_cdk/sources/declarative/partition_routers/single_partition_router.py (1)
SinglePartitionRouter(13-57)airbyte_cdk/sources/types.py (1)
StreamSlice(75-169)
unit_tests/sources/declarative/extractors/test_dpath_extractor.py (2)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (1)
CompositeRawDecoder(130-221)airbyte_cdk/sources/declarative/decoders/decoder.py (1)
Decoder(15-32)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)
airbyte_cdk/models/airbyte_protocol.py (1)
AirbyteMessage(79-88)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
FailureType(743-746)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
AbstractStream(21-96)
unit_tests/sources/streams/concurrent/test_partition_reader.py (1)
airbyte_cdk/sources/message/repository.py (1)
InMemoryMessageRepository(74-95)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (3)
airbyte_cdk/sources/source.py (1)
Source(55-95)airbyte_cdk/sources/abstract_source.py (4)
streams(74-79)read(101-211)check(92-99)check_connection(59-71)airbyte_cdk/sources/declarative/checks/connection_checker.py (2)
ConnectionChecker(12-38)check_connection(18-38)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/legacy/sources/declarative/declarative_stream.py (1)
DeclarativeStream(32-241)airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py (1)
DatetimeBasedCursor(28-448)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException(25-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
🔇 Additional comments (14)
unit_tests/sources/declarative/extractors/test_dpath_extractor.py (2)
11-11: Import path consolidation LGTMSwitching to the package-level
airbyte_cdk.sources.declarative.decodersimport reads cleaner and matches the refactor direction. Nice touch.
11-11: Consolidate deep decoder imports
I’m still seeing direct imports fromairbyte_cdk.sources.declarative.decoders.decoderor…decoders.composite_raw_decoderin these files:
• unit_tests/sources/declarative/extractors/test_dpath_extractor.py
• unit_tests/sources/declarative/decoders/test_json_decoder.py
• airbyte_cdk/sources/declarative/decoders/xml_decoder.py
• airbyte_cdk/sources/declarative/decoders/noop_decoder.py
• airbyte_cdk/sources/declarative/decoders/json_decoder.py
• airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
• airbyte_cdk/sources/declarative/auth/token_provider.py
• airbyte_cdk/sources/declarative/decoders/init.pyCould we switch these to the package-level imports (e.g.
from airbyte_cdk.sources.declarative.decoders import Decoder, CompositeRawDecoder, JsonLineParser)? wdyt?⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65 Timestamp: 2024-11-15T01:04:21.272Z Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15 Timestamp: 2024-11-15T00:59:08.154Z Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.unit_tests/sources/declarative/interpolation/test_jinja.py (1)
12-12: Import path shift to sources.types looks goodThe switch to
airbyte_cdk.sources.types.StreamSlicealigns with the new typing module layout; no test logic change. LGTM.unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (1)
16-16: YamlDeclarativeSource import relocation is correctUsing
airbyte_cdk.sources.declarative.yaml_declarative_sourcematches the refactor. No other changes needed here.unit_tests/sources/declarative/async_job/test_integration.py (1)
8-10: Imports updated to new public/legacy paths — LGTM
DeclarativeStreamfrom legacy path,AbstractSourcefromairbyte_cdk.sources, andSinglePartitionRouter/Stream/StreamSlicefrom their new homes all look consistent with the v7 layout.Also applies to: 18-18, 27-28
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
128-128: Switching to Source (vs AbstractSource) matches PR goalsThe inheritance change is consistent with the v7 surface area reduction and keeps concerns localized.
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (2)
1-2: LGTM: Updated header year.The 2025 copyright header and spacing look good, wdyt?
13-13: Import-path consolidation verified — no FailureType collisions detected
No imports ofFailureTypefrom anyairbyte_cdk.sources.declarative.modelspath were found in this module or nearby packages.unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py (1)
14-14: Update stale import of AirbyteTracedExceptionIn
airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py(line 11), replace the import:- from airbyte_cdk import AirbyteTracedException + from airbyte_cdk.utils import AirbyteTracedExceptionwdyt?
⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/spec.json:9-15 Timestamp: 2024-11-15T00:59:08.154Z Learning: When code in `airbyte_cdk/cli/source_declarative_manifest/` is being imported from another repository, avoid suggesting modifications to it during the import process.Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65 Timestamp: 2024-11-15T01:04:21.272Z Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.unit_tests/sources/declarative/async_job/test_job_orchestrator.py (1)
23-25: Update stale StreamSlice and AirbyteTracedException imports to new public APICould you update the following occurrences to use
from airbyte_cdk.sources.types import StreamSlice
and
from airbyte_cdk.utils import AirbyteTracedException?
- unit_tests/sources/declarative/async_job/test_job.py: line 9
- unit_tests/sources/declarative/requesters/request_options/test_per_partition_request_option_provider.py: line 9
- unit_tests/sources/declarative/requesters/request_options/test_datetime_based_request_options_provider.py: line 14
- unit_tests/sources/declarative/extractors/test_record_filter.py: line 26
- airbyte_cdk/sources/file_based/stream/default_file_based_stream.py: line 36
- airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py: line 32
- airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py: line 14
- airbyte_cdk/sources/file_based/stream/concurrent/adapters.py: line 31
- airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py: line 11
⛔ Skipped due to learnings
Learnt from: aaronsteers PR: airbytehq/airbyte-python-cdk#58 File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65 Timestamp: 2024-11-15T01:04:21.272Z Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2)
177-177: No changes needed for AirbyteTracedException import
airbyte_cdk/utils/__init__.pyre-exportsAirbyteTracedExceptionin its__all__, so importing fromairbyte_cdk.utilsis valid.
47-47: Confirmed export of ConcurrentPerPartitionCursor
ConcurrentPerPartitionCursor is re-exported in airbyte_cdk/sources/declarative/incremental/init.py, so this import is safe.airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
36-39: Legacy imports alignment — LGTMRe-routing DeclarativeStream and DatetimeBasedCursor to legacy paths matches the v7 migration goals.
3146-3150: No remaining CustomIncrementalSync referencesCode and config searches return no lingering usages; only cdk-migrations.md mentions it (as intended).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (1)
35-39: Past TODO addressed; any remaining legacy references to prune?Looks like you’ve moved to concurrent components as planned. Do you want a quick sweep to ensure no tests still import legacy cursors, wdyt?
#!/bin/bash # Find any lingering legacy cursor usages rg -nP 'DeclarativeCursor|GlobalSubstreamCursor|PerPartitionWithGlobalCursor|DatetimeBasedCursor|MinMaxDatetime' -g '!**/site-packages/**' -C2
🧹 Nitpick comments (7)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (7)
49-67: Remove unused MockedCursorBuilder.This helper isn’t referenced anywhere; dropping it will keep the test leaner, wdyt?
-class MockedCursorBuilder: - def __init__(self): - self._stream_slices = [] - self._stream_state = {} - - def with_stream_slices(self, stream_slices): - self._stream_slices = stream_slices - return self - - def with_stream_state(self, stream_state): - self._stream_state = stream_state - return self - - def build(self): - cursor = Mock(spec=Cursor) - cursor.get_stream_state.return_value = self._stream_state - cursor.stream_slices.return_value = self._stream_slices - return cursor
81-98: PreferNoneover empty-string forstream_namespaceand use stable no-op collaborators.Using
Nonereads clearer for “no namespace”. Also, since this is a unit test that doesn’t inspect side-effects,NoopMessageRepository()and a realConnectorStateManager()reduce mocking overhead. Shall we simplify, wdyt?- return ConcurrentCursor( - stream_name="test", - stream_namespace="", - stream_state={}, - message_repository=mocked_message_repository(), - connector_state_manager=mocked_connector_state_manager(), + return ConcurrentCursor( + stream_name="test", + stream_namespace=None, + stream_state={}, + message_repository=NoopMessageRepository(), + connector_state_manager=ConnectorStateManager(), connector_state_converter=state_converter, cursor_field=CursorField("created_at"), slice_boundary_fields=None, start=ab_datetime_parse("2021-01-01"), end_provider=state_converter.get_end_provider(), )
193-204: Alignstream_namespaceand collaborators in ConcurrentPerPartitionCursor setup.For consistency with the earlier factory, would you switch
stream_namespacetoNoneand considerNoopMessageRepository()/ConnectorStateManager()to avoid brittle mocks, wdyt?- substream_cursor = ConcurrentPerPartitionCursor( + substream_cursor = ConcurrentPerPartitionCursor( cursor_factory=cursor_factory, partition_router=partition_router, - stream_name="test", - stream_namespace="", - stream_state={}, - message_repository=mocked_message_repository(), - connector_state_manager=mocked_connector_state_manager(), + stream_name="test", + stream_namespace=None, + stream_state={}, + message_repository=NoopMessageRepository(), + connector_state_manager=ConnectorStateManager(), connector_state_converter=connector_state_converter, cursor_field=CursorField(cursor_field_key="updated_at"), use_global_cursor=use_global_cursor, )
211-235: Avoid asserting on protected attributes; remove duplicate assertion.Asserting on underscored fields (
_cursor_factory,_partition_router,_use_global_cursor) couples tests to internals. Could we expose read-only properties (or validate via behavior) and forward them through the decorator instead, wdyt? Also, the_use_global_cursorcheck is duplicated.- assert wrapped_slicer._cursor_factory == cursor_factory + # Prefer public surface; consider adding properties for these if we want to assert identity. + assert wrapped_slicer._cursor_factory == cursor_factory @@ - assert wrapped_slicer._use_global_cursor == use_global_cursor + assert wrapped_slicer._use_global_cursor == use_global_cursor @@ - assert wrapped_slicer._use_global_cursor == use_global_cursor + # (duplicate) removed; already asserted aboveIf adding properties is feasible, something like:
# in ConcurrentPerPartitionCursor (production code) @property def cursor_factory(self) -> ConcurrentCursorFactory: return self._cursor_factory @property def partition_router(self) -> PartitionRouter: return self._partition_router @property def use_global_cursor(self) -> bool: return self._use_global_cursorThen tests can assert
wrapped_slicer.cursor_factory is cursor_factory, etc.
15-17: Minor: consistent import aliasing.Since both classes are used frequently, would aliasing to short names (
ConcurrentFactory,PerPartitionCursor) improve readability across assertions, or shall we keep the explicit names for clarity, wdyt?
141-149: Double-check intendedisinstancebehavior through the decorator.
StreamSlicerTestReadDecoratorpassingisinstance(..., ConcurrentPerPartitionCursor)/SubstreamPartitionRouterimplies intentional type proxying. If that’s the contract, all good; otherwise, consider usingisinstance(..., StreamSlicerTestReadDecorator)only to reduce type coupling, wdyt?
35-41: Import MessageRepository/NoopMessageRepository fromairbyte_cdk.sources.message.repositoryfor consistency? Some modules (e.g.partition_reader.py,file_based_concurrent_cursor.py) already use the concrete path—unifying the rest can avoid surprises, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py(7 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
🧬 Code graph analysis (1)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (6)
airbyte_cdk/sources/connector_state_manager.py (1)
ConnectorStateManager(32-161)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4)
ConcurrentCursorFactory(94-103)ConcurrentPerPartitionCursor(106-646)cursor_field(194-195)start(81-82)airbyte_cdk/sources/message/repository.py (2)
MessageRepository(45-60)NoopMessageRepository(63-71)airbyte_cdk/sources/streams/concurrent/cursor.py (2)
ConcurrentCursor(135-518)CursorField(40-48)airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py (1)
CustomFormatConcurrentStreamStateConverter(193-223)airbyte_cdk/utils/datetime_helpers.py (1)
ab_datetime_parse(361-442)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py (3)
119-149: Type assertions around AsyncJobPartitionRouter look good.These checks accurately validate the decorator/wrapper layering and ensure we’re not accidentally constructing a concurrent cursor here. LGTM.
160-173: SubstreamPartitionRouter assertions look good.This mirrors the async-job case and correctly avoids concurrent-cursor types here. LGTM.
176-183: Parametrization reads well.Clear IDs and coverage for both global and non-global cursor scenarios. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
364-380: Read path ignores incoming state; risks losing incremental progress.
statepassed toread(...)is not propagated into the existingConnectorStateManagernor the factory, so concurrent components likely initialize with empty state. Givenentrypoint.read(...)passes state at runtime, this can cause incorrect checkpoints and full re-syncs. Can we hydrate the state manager and factory at read time, before building streams? Example:def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: + # Ensure the concurrent components see the latest runtime state + if state is not None: + # Reuse the same message repository to preserve queueing behavior + self._connector_state_manager = ConnectorStateManager(state=state) + self._constructor = ModelToComponentFactory( + emit_connector_builder_messages=self._emit_connector_builder_messages, + message_repository=self._message_repository, + connector_state_manager=self._connector_state_manager, + max_concurrent_async_job_count=self._source_config.get("max_concurrent_async_job_count"), + limit_pages_fetched_per_slice=self._limits.max_pages_per_slice if self._limits else None, + limit_slices_fetched=self._limits.max_slices if self._limits else None, + disable_retries=True if self._limits else False, + disable_cache=True if self._limits else False, + ) selected_concurrent_streams = self._select_streams( streams=self.streams(config=self._config), # type: ignore configured_catalog=catalog, ) # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now if len(selected_concurrent_streams) > 0: yield from self._concurrent_source.read(selected_concurrent_streams) + else: + self.logger.debug("No streams selected in catalog; skipping concurrent read.")
♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
381-384: Discover should accept the passedconfig(duplicate of prior feedback).Understood you plan to deprecate the
configparameter, but until then, callers may expectdiscover(logger, config)to honor it. Do you want to keep this as-is and document the deprecation, or switch toconfig or self._configfor now? Wdyt?
485-507: Check should degrade gracefully and avoid uncaught ValueError; also pass config and return cleaner messages.
- Raising
ValueErroron missingcheckwill bypassentrypoint.check(...)’s exception handling (it only catchesAirbyteTracedException), potentially crashing the CLI.- Defaulting to
CheckStreampreserves legacy behavior.create_component(...)should receiveself._configto resolve interpolations in thecheckblock.- Prefer
str(error)overrepr(error)for user-facing messages.Proposed patch:
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - check = self._source_config.get("check") - if not check: - raise ValueError(f"Missing 'check' component definition within the manifest.") - - if "type" not in check: - check["type"] = "CheckStream" - connection_checker = self._constructor.create_component( + check = dict(self._source_config.get("check", {})) + if "type" not in check: + check["type"] = "CheckStream" + connection_checker = self._constructor.create_component( COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], check, - dict(), + self._config, emit_connector_builder_messages=self._emit_connector_builder_messages, ) if not isinstance(connection_checker, ConnectionChecker): raise ValueError( f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" ) - check_succeeded, error = connection_checker.check_connection(self, logger, self._config) - if not check_succeeded: - return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) + check_succeeded, error = connection_checker.check_connection(self, logger, self._config) + if not check_succeeded: + return AirbyteConnectionStatus(status=Status.FAILED, message=str(error)) return AirbyteConnectionStatus(status=Status.SUCCEEDED)
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
214-221: Pass transformed config to the ConcurrencyLevel component?
self._configis available by this point (Line 212), but the factory here usesconfig or {}. Using the migrated/transformed config keeps behavior consistent with the rest of the class. Wdyt?- config=config or {}, + config=self._config,
253-255: Docstring return type is wrong.
_pre_process_manifestreturns the processed manifest, notNone. Update to avoid misleading readers and tooling, wdyt?- Returns: - None + Returns: + Dict[str, Any]: The processed manifest with references resolved and types/parameters propagated.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py(9 hunks)airbyte_cdk/sources/streams/concurrent/abstract_stream.py(0 hunks)
💤 Files with no reviewable changes (1)
- airbyte_cdk/sources/streams/concurrent/abstract_stream.py
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.
Applied to files:
airbyte_cdk/sources/declarative/concurrent_declarative_source.py
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (4)
airbyte_cdk/sources/source.py (2)
Source(55-95)discover(48-52)airbyte_cdk/sources/abstract_source.py (5)
streams(74-79)read(101-211)discover(85-90)check(92-99)check_connection(59-71)airbyte_cdk/entrypoint.py (3)
read(271-283)discover(260-269)check(219-258)airbyte_cdk/sources/declarative/checks/connection_checker.py (2)
ConnectionChecker(12-38)check_connection(18-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
127-131: Good move: switch to Source interface is cleaner.Directly implementing
Sourcesimplifies the flow and decouples fromAbstractSource. Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's do this!
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Show resolved
Hide resolved
unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py
Show resolved
Hide resolved
unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Outdated
Show resolved
Hide resolved
|
Regression test results run against Results look good and based on the changes in the PR would catastrophically fail during a sync if there was an issue. There are a few small mismatches, but nothing beyond what was already being observed. I'll do a final local validation sanity check prior to the merge and release of the version. |
Note: This should not be merged until after #707 since this PR depends on logic implemented there. And this will fail a lot of tests until after that change is rebased into this one
What
Warning this will be breaking and part of the CDK release to v7
Now that the
ConcurrentDeclarativeSourcehas been updated to only receive and processDefaultStreamfrom the model to component factory here, we can significantly simplify how we handle connector operations and process syncs without account for the legacy synchronous flow.All the legacy
DeclarativeStreamandAbstractStreaminheritance has been extracted fromConcurrentDeclarativeSource.How
Broadly the big changes are:
ConcurrentDeclarativeSourceno longer implementsAbstractSourceand instead directly implements the primarySourceinterfaceDefaultStream.message_repository()are removed since onlyAbstractSourcerequired themConnectionCheckercomponent takes in the source itself. TheCheckStreamcomponent does a lot of shenanigans withe source object itself so extracting it is a bit annoyingDeclarativeStreamto the legacy folderSomething worth noting is that I did not move
AbstractSourceto the legacy package. It has a lot of dependencies in the file CDK, scenario builder, and various Legacy <> Synchronous CDK adapters, so its not quite a full on legacy class yet. Despite it still being in the main package, it has been fully extracted out of theConcurrentDeclarativeSourceflow so we're still left with a streamline code flow when processing all low-code syncs.Out of scope
Two things that I've left out of scope to make the change more manageable and because they should have minimal impact on the customer facing interface
SimpleRetrieverstill has state methods and fields for legacy cursors. These should be more easily extractable because they're not invoked in the flow anymoreAbstractStreamstill makes references to things like RFR and checkpoint readers. While these are effectively legacy components, they're not referenced by the concurrent CDK so cleanup is neither urgent nor breaking.Summary by CodeRabbit
New Features
Refactor
Removal
Tests