Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code CDK] Rsumable full refresh support for low-code streams #38300

Merged
merged 16 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.streams.checkpoint import Cursor
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import Config, StreamSlice

Expand Down Expand Up @@ -157,3 +159,8 @@ def state_checkpoint_interval(self) -> Optional[int]:
important state is the one at the beginning of the slice
"""
return None

def get_cursor(self) -> Optional[Cursor]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more evidence the cursor belongs to the stream, not the retriever

if self.retriever and isinstance(self.retriever, SimpleRetriever):
return self.retriever.cursor
return None
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor

__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor"]
__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor", "ResumableFullRefreshCursor"]
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ def stream_slices(self) -> Iterable[StreamSlice]:
start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
return self._partition_daterange(start_datetime, end_datetime, self._step)

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
# Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
# through each slice and does not belong to a specific slice. We just return stream state as it is.
return self.get_stream_state()

def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) -> datetime.datetime:
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")
earliest_possible_start_datetime = min(self._start_datetime.get_datetime(self.config), end_datetime)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState


@dataclass
class ResumableFullRefreshCursor(DeclarativeCursor):
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._cursor: StreamState = {}

def get_stream_state(self) -> StreamState:
return self._cursor

def set_initial_state(self, stream_state: StreamState) -> None:
self._cursor = stream_state

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
"""
pass

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
# The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
if stream_slice.partition:
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
self._cursor = stream_slice.cursor_slice

def should_be_synced(self, record: Record) -> bool:
"""
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
that don't have filterable bounds. We should always return them.
"""
return True

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the one field that really doesn't fit the existing cursor interface. We can default to false, and ultimately we don't even care about the record since we close the slice based on the page number.

"""
RFR record don't have ordering to be compared between one another.
"""
return False

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new interface method that we need in order to sanely read through state from the CheckpointReader. It isn't that useful for unnested streams, but this is critical for us to be able to sanely parse substream state as we iterate over parent record state in the CheckpointReader when we do the work

# A top-level RFR cursor only manages the state of a single partition
return self._cursor

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
along an unbounded set.
"""
yield from [StreamSlice(cursor_slice=self._cursor, partition={})]

# This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to
# inject any request values into the outbound response because the up-to-date pagination state is already loaded and
# maintained by the paginator component
erohmensing marked this conversation as resolved.
Show resolved Hide resolved
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, DeclarativeCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import (
CursorFactory,
DatetimeBasedCursor,
DeclarativeCursor,
PerPartitionCursor,
ResumableFullRefreshCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
Expand Down Expand Up @@ -668,6 +674,10 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -
)
elif model.incremental_sync:
return self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None
elif hasattr(model.retriever, "paginator") and model.retriever.paginator and not stream_slicer:
# To incrementally deliver RFR for low-code we're first implementing this for streams that do not use
# nested state like substreams or those using list partition routers
return ResumableFullRefreshCursor(parameters={})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Is there a follow up issue to support substreams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep I have it filed here when the original spec was written: https://github.com/airbytehq/airbyte-internal-issues/issues/7528

elif stream_slicer:
return stream_slicer
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
raise ValueError("page_size_option cannot be set if the pagination strategy does not have a page_size")
if isinstance(self.url_base, str):
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token = self.pagination_strategy.initial_token
self._token: Optional[Any] = self.pagination_strategy.initial_token

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
Expand Down Expand Up @@ -153,8 +153,8 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def reset(self) -> None:
self.pagination_strategy.reset()
def reset(self, reset_value: Optional[Any] = None) -> None:
self.pagination_strategy.reset(reset_value=reset_value)
self._token = self.pagination_strategy.initial_token

def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -235,6 +235,6 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
self._decorated.reset()
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ def get_request_body_json(
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
return {}

def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
# No state to reset
pass
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Paginator(ABC, RequestOptionsProvider):
"""

@abstractmethod
def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CursorPaginationStrategy(PaginationStrategy):
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._initial_cursor = None
if isinstance(self.cursor_value, str):
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
else:
Expand All @@ -46,7 +47,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

@property
def initial_token(self) -> Optional[Any]:
return None
return self._initial_cursor

def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)
Expand Down Expand Up @@ -74,9 +75,8 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
)
return token if token else None

def reset(self) -> None:
# No state to reset
pass
def reset(self, reset_value: Optional[Any] = None) -> None:
self._initial_cursor = reset_value

def get_page_size(self) -> Optional[int]:
return self.page_size
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
self._offset += last_page_size
return self._offset

def reset(self) -> None:
self._offset = 0
def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value is None:
self._offset = 0
elif not isinstance(reset_value, int):
raise ValueError(f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer")
else:
self._offset = reset_value
erohmensing marked this conversation as resolved.
Show resolved Hide resolved

def get_page_size(self) -> Optional[int]:
if self._page_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
self._page += 1
return self._page

def reset(self) -> None:
self._page = self.start_from_page
def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value is None:
self._page = self.start_from_page
elif not isinstance(reset_value, int):
raise ValueError(f"Reset value {reset_value} for PageIncrement pagination strategy was not an integer")
else:
self._page = reset_value

def get_page_size(self) -> Optional[int]:
return self._page_size
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
pass

@abstractmethod
def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Loading
Loading