Skip to content

Commit

Permalink
Merge branch 'christo/klaviyo/state' of https://github.com/airbytehq/…
Browse files Browse the repository at this point in the history
…airbyte into christo/klaviyo/state
  • Loading branch information
ChristoGrab committed Jun 3, 2024
2 parents 59a6add + 84df5c6 commit b90146b
Show file tree
Hide file tree
Showing 170 changed files with 7,497 additions and 2,437 deletions.
1 change: 1 addition & 0 deletions .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ jobs:
connectors_ci:
name: Connectors Version Increment Check
runs-on: connector-test-large
if: github.event.pull_request.head.repo.fork != true
timeout-minutes: 10
steps:
- name: Checkout Airbyte
Expand Down
3 changes: 0 additions & 3 deletions .github/workflows/format-fix-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ jobs:
continue-on-error: true
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish-cdk-command-manually.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ jobs:
uses: peter-evans/create-pull-request@v6
with:
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
commit-message: Updating CDK version following release
title: Updating CDK version following release
commit-message: "chore: update CDK version following release"
title: "chore: update CDK version following release"
body: This is an automatically generated PR triggered by a CDK release
branch: automatic-cdk-release
base: master
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 1.2.1
Python 3.11 compatibility bugfixes

## 1.2.0
add client side incremental sync

## 1.1.3
Removed experimental suffix for unstructured file type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing import Any, Dict, List, Literal, Optional, Union

import dpath.util
import dpath
from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from airbyte_cdk.utils.spec_schema_transformations import resolve_refs
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -264,7 +264,7 @@ class Config:
@staticmethod
def remove_discriminator(schema: Dict[str, Any]) -> None:
"""pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
dpath.util.delete(schema, "properties/**/discriminator")
dpath.delete(schema, "properties/**/discriminator")

@classmethod
def schema(cls, by_alias: bool = True, ref_template: str = "") -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple

import dpath.util
import dpath
from airbyte_cdk.destinations.vector_db_based.config import ProcessingConfigModel, SeparatorSplitterConfigModel, TextSplitterConfigModel
from airbyte_cdk.destinations.vector_db_based.utils import create_stream_identifier
from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode
Expand Down Expand Up @@ -137,7 +137,7 @@ def _extract_relevant_fields(self, record: AirbyteRecordMessage, fields: Optiona
relevant_fields = {}
if fields and len(fields) > 0:
for field in fields:
values = dpath.util.values(record.data, field, separator=".")
values = dpath.values(record.data, field, separator=".")
if values and len(values) > 0:
relevant_fields[field] = values if len(values) > 1 else values[0]
else:
Expand All @@ -162,7 +162,7 @@ def _extract_primary_key(self, record: AirbyteRecordMessage) -> Optional[str]:
primary_key = []
for key in current_stream.primary_key:
try:
primary_key.append(str(dpath.util.get(record.data, key)))
primary_key.append(str(dpath.get(record.data, key)))
except KeyError:
primary_key.append("__not_found__")
stringified_primary_key = "_".join(primary_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __new__( # type: ignore[misc]
**kwargs: Any,
) -> DeclarativeAuthenticator:
try:
selected_key = str(dpath.util.get(config, authenticator_selection_path))
selected_key = str(dpath.get(config, authenticator_selection_path))
except KeyError as err:
raise ValueError("The path from `authenticator_selection_path` is not found in the config.") from err

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

import datetime
from abc import abstractmethod
from dataclasses import InitVar, dataclass
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Union

import dpath.util
import dpath
import pendulum
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
Expand Down Expand Up @@ -36,7 +36,7 @@ class SessionTokenProvider(TokenProvider):
parameters: InitVar[Mapping[str, Any]]
message_repository: MessageRepository = NoopMessageRepository()

_decoder: Decoder = JsonDecoder(parameters={})
_decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
_next_expiration_time: Optional[DateTime] = None
_token: Optional[str] = None

Expand All @@ -62,7 +62,7 @@ def _refresh(self) -> None:
)
if response is None:
raise ReadException("Failed to get session token, response got ignored by requester")
session_token = dpath.util.get(self._decoder.decode(response), self.session_token_path)
session_token = dpath.get(self._decoder.decode(response), self.session_token_path)
if self.expiration_duration is not None:
self._next_expiration_time = pendulum.now() + self.expiration_duration
self._token = session_token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,10 @@ definitions:
title: Whether the target API is formatted as a data feed
description: A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.
type: boolean
is_client_side_incremental:
title: Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)
description: If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.
type: boolean
lookback_window:
title: Lookback Window
description: Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, Union

import dpath.util
import dpath
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
Expand Down Expand Up @@ -56,7 +56,7 @@ class DpathExtractor(RecordExtractor):
field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(parameters={})
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
Expand All @@ -71,9 +71,9 @@ def extract_records(self, response: requests.Response) -> Iterable[Mapping[str,
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.util.values(response_body, path)
extracted = dpath.values(response_body, path)
else:
extracted = dpath.util.get(response_body, path, default=[])
extracted = dpath.get(response_body, path, default=[])
if isinstance(extracted, list):
yield from extracted
elif extracted:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -36,3 +37,80 @@ def filter_records(
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
yield record


class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.
:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
"""

def __init__(
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor

@property
def _cursor_field(self) -> str:
return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context

@property
def _start_date_from_config(self) -> datetime.datetime:
return self._date_time_based_cursor._start_datetime.get_datetime(self._date_time_based_cursor.config)

@property
def _end_datetime(self) -> datetime.datetime:
return (
self._date_time_based_cursor._end_datetime.get_datetime(self._date_time_based_cursor.config)
if self._date_time_based_cursor._end_datetime
else datetime.datetime.max
)

def filter_records(
self,
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
state_value = self._get_state_value(stream_state, stream_slice or StreamSlice(partition={}, cursor_slice={}))
filter_date: datetime.datetime = self._get_filter_date(state_value)
records = (
record
for record in records
if self._end_datetime > self._date_time_based_cursor.parse_date(record[self._cursor_field]) > filter_date
)
if self.condition:
records = super().filter_records(
records=records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
yield from records

def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) -> Optional[str]:
"""
Return cursor_value or None in case it was not found.
Cursor_value may be empty if:
1. It is an initial sync => no stream_state exist at all.
2. In Parent-child stream, and we already make initial sync, so stream_state is present.
During the second read, we receive one extra record from parent and therefore no stream_state for this record will be found.
:param StreamState stream_state: State
:param StreamSlice stream_slice: Current Stream slice
:return Optional[str]: cursor_value in case it was found, otherwise None.
"""
if self._per_partition_cursor:
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None
return stream_state.get(self._cursor_field)

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
start_date_parsed = self._start_date_from_config
if state_value:
return max(start_date_parsed, self._date_time_based_cursor.parse_date(state_value))
else:
return start_date_parsed
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else datetime.timedelta.max
)
self._cursor_granularity = self._parse_timedelta(self.cursor_granularity)
self._cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters)
self._lookback_window = InterpolatedString.create(self.lookback_window, parameters=parameters) if self.lookback_window else None
self._partition_field_start = InterpolatedString.create(self.partition_field_start or "start_time", parameters=parameters)
self._partition_field_end = InterpolatedString.create(self.partition_field_end or "end_time", parameters=parameters)
Expand All @@ -103,7 +103,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.cursor_datetime_formats = [self.datetime_format]

def get_stream_state(self) -> StreamState:
return {self._cursor_field.eval(self.config): self._cursor} if self._cursor else {}
return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand All @@ -112,7 +112,7 @@ def set_initial_state(self, stream_state: StreamState) -> None:
:param stream_state: The state of the stream as returned by get_stream_state
"""
self._cursor = stream_state.get(self._cursor_field.eval(self.config)) if stream_state else None
self._cursor = stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__

def observe(self, stream_slice: StreamSlice, record: Record) -> None:
"""
Expand All @@ -122,7 +122,7 @@ def observe(self, stream_slice: StreamSlice, record: Record) -> None:
:param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the
stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
"""
record_cursor_value = record.get(self._cursor_field.eval(self.config))
record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
# if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do
if not record_cursor_value:
return
Expand Down Expand Up @@ -186,8 +186,8 @@ def _select_best_end_datetime(self) -> datetime.datetime:
return min(self._end_datetime.get_datetime(self.config), now)

def _calculate_cursor_datetime_from_state(self, stream_state: Mapping[str, Any]) -> datetime.datetime:
if self._cursor_field.eval(self.config, stream_state=stream_state) in stream_state:
return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

def _format_datetime(self, dt: datetime.datetime) -> str:
Expand Down Expand Up @@ -300,7 +300,7 @@ def _get_request_options(self, option_type: RequestOptionType, stream_slice: Opt
return options

def should_be_synced(self, record: Record) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
record_cursor_value = record.get(cursor_field)
if not record_cursor_value:
self._send_log(
Expand All @@ -315,7 +315,7 @@ def should_be_synced(self, record: Record) -> bool:
def _is_within_daterange_boundaries(
self, record: Record, start_datetime_boundary: Union[datetime.datetime, str], end_datetime_boundary: Union[datetime.datetime, str]
) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
record_cursor_value = record.get(cursor_field)
if not record_cursor_value:
self._send_log(
Expand All @@ -339,7 +339,7 @@ def _send_log(self, level: Level, message: str) -> None:
)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
cursor_field = self._cursor_field.eval(self.config)
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
first_cursor_value = first.get(cursor_field)
second_cursor_value = second.get(cursor_field)
if first_cursor_value and second_cursor_value:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,11 @@ class DatetimeBasedCursor(BaseModel):
description='A data feed API is an API that does not allow filtering and paginates the content from the most recent to the least recent. Given this, the CDK needs to know when to stop paginating and this field will generate a stop condition for pagination.',
title='Whether the target API is formatted as a data feed',
)
is_client_side_incremental: Optional[bool] = Field(
None,
description='If the target API endpoint does not take cursor values to filter records and returns all records anyway, the connector with this cursor will filter out records locally, and only emit new records from the last sync, hence incremental. This means that all records would be read from the API, but only new records will be emitted to the destination.',
title='Whether the target API does not support filtering and returns all data (the cursor filters records in the client instead of the API side)',
)
lookback_window: Optional[str] = Field(
None,
description='Time interval before the start_datetime to read data for, e.g. P1M for looking back one month.',
Expand Down
Loading

0 comments on commit b90146b

Please sign in to comment.