Skip to content

Commit

Permalink
[low-code CDK] Rsumable full refresh support for low-code streams (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjlai authored May 22, 2024
1 parent 29723a7 commit 040f141
Show file tree
Hide file tree
Showing 36 changed files with 1,318 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.streams.checkpoint import Cursor
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import Config, StreamSlice

Expand Down Expand Up @@ -157,3 +159,8 @@ def state_checkpoint_interval(self) -> Optional[int]:
important state is the one at the beginning of the slice
"""
return None

def get_cursor(self) -> Optional[Cursor]:
if self.retriever and isinstance(self.retriever, SimpleRetriever):
return self.retriever.cursor
return None
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor

__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor"]
__all__ = ["CursorFactory", "DatetimeBasedCursor", "DeclarativeCursor", "PerPartitionCursor", "ResumableFullRefreshCursor"]
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ def stream_slices(self) -> Iterable[StreamSlice]:
start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
return self._partition_daterange(start_datetime, end_datetime, self._step)

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
# Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress
# through each slice and does not belong to a specific slice. We just return stream state as it is.
return self.get_stream_state()

def _calculate_earliest_possible_value(self, end_datetime: datetime.datetime) -> datetime.datetime:
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self.config) if self._lookback_window else "P0D")
earliest_possible_start_datetime = min(self._start_datetime.get_datetime(self.config), end_datetime)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState


@dataclass
class ResumableFullRefreshCursor(DeclarativeCursor):
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._cursor: StreamState = {}

def get_stream_state(self) -> StreamState:
return self._cursor

def set_initial_state(self, stream_state: StreamState) -> None:
self._cursor = stream_state

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
"""
pass

def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
# The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected
if stream_slice.partition:
raise ValueError(f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}.")
self._cursor = stream_slice.cursor_slice

def should_be_synced(self, record: Record) -> bool:
"""
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages
that don't have filterable bounds. We should always return them.
"""
return True

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
RFR record don't have ordering to be compared between one another.
"""
return False

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
# A top-level RFR cursor only manages the state of a single partition
return self._cursor

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page
along an unbounded set.
"""
yield from [StreamSlice(cursor_slice=self._cursor, partition={})]

# This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to
# inject any request values into the outbound response because the up-to-date pagination state is already loaded and
# maintained by the paginator component
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeBasedCursor, DeclarativeCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import (
CursorFactory,
DatetimeBasedCursor,
DeclarativeCursor,
PerPartitionCursor,
ResumableFullRefreshCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
Expand Down Expand Up @@ -668,6 +674,10 @@ def _merge_stream_slicers(self, model: DeclarativeStreamModel, config: Config) -
)
elif model.incremental_sync:
return self._create_component_from_model(model=model.incremental_sync, config=config) if model.incremental_sync else None
elif hasattr(model.retriever, "paginator") and model.retriever.paginator and not stream_slicer:
# To incrementally deliver RFR for low-code we're first implementing this for streams that do not use
# nested state like substreams or those using list partition routers
return ResumableFullRefreshCursor(parameters={})
elif stream_slicer:
return stream_slicer
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
raise ValueError("page_size_option cannot be set if the pagination strategy does not have a page_size")
if isinstance(self.url_base, str):
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token = self.pagination_strategy.initial_token
self._token: Optional[Any] = self.pagination_strategy.initial_token

def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
Expand Down Expand Up @@ -153,8 +153,11 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def reset(self) -> None:
self.pagination_strategy.reset()
def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value:
self.pagination_strategy.reset(reset_value=reset_value)
else:
self.pagination_strategy.reset()
self._token = self.pagination_strategy.initial_token

def _get_request_options(self, option_type: RequestOptionType) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -235,6 +238,6 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return self._decorated.get_request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
self._decorated.reset()
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ def get_request_body_json(
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
return {}

def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
# No state to reset
pass
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Paginator(ABC, RequestOptionsProvider):
"""

@abstractmethod
def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CursorPaginationStrategy(PaginationStrategy):
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._initial_cursor = None
if isinstance(self.cursor_value, str):
self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters)
else:
Expand All @@ -46,7 +47,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

@property
def initial_token(self) -> Optional[Any]:
return None
return self._initial_cursor

def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)
Expand Down Expand Up @@ -74,9 +75,8 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
)
return token if token else None

def reset(self) -> None:
# No state to reset
pass
def reset(self, reset_value: Optional[Any] = None) -> None:
self._initial_cursor = reset_value

def get_page_size(self) -> Optional[int]:
return self.page_size
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
self._offset += last_page_size
return self._offset

def reset(self) -> None:
self._offset = 0
def reset(self, reset_value: Optional[Any] = 0) -> None:
if not isinstance(reset_value, int):
raise ValueError(f"Reset value {reset_value} for OffsetIncrement pagination strategy was not an integer")
else:
self._offset = reset_value

def get_page_size(self) -> Optional[int]:
if self._page_size:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
self._page += 1
return self._page

def reset(self) -> None:
self._page = self.start_from_page
def reset(self, reset_value: Optional[Any] = None) -> None:
if reset_value is None:
self._page = self.start_from_page
elif not isinstance(reset_value, int):
raise ValueError(f"Reset value {reset_value} for PageIncrement pagination strategy was not an integer")
else:
self._page = reset_value

def get_page_size(self) -> Optional[int]:
return self._page_size
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def next_page_token(self, response: requests.Response, last_page_size: int, last
pass

@abstractmethod
def reset(self) -> None:
def reset(self, reset_value: Optional[Any] = None) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Loading

0 comments on commit 040f141

Please sign in to comment.