From 86976c25914ee4e4877d00bc33e5e0f2e34d0797 Mon Sep 17 00:00:00 2001 From: Anton Karpets Date: Mon, 23 Oct 2023 18:17:11 +0300 Subject: [PATCH 1/5] Source Klaviyo: make start_date optional --- .../connectors/source-klaviyo/metadata.yaml | 2 +- .../source-klaviyo/source_klaviyo/source.py | 10 ++++- .../source-klaviyo/source_klaviyo/spec.json | 10 +++-- .../source-klaviyo/source_klaviyo/streams.py | 41 ++++++++++--------- .../source-klaviyo/unit_tests/test_source.py | 4 +- docs/integrations/sources/klaviyo.md | 18 +++++--- 6 files changed, 53 insertions(+), 32 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/metadata.yaml b/airbyte-integrations/connectors/source-klaviyo/metadata.yaml index 844e4840d221..4b3a93f7f7d0 100644 --- a/airbyte-integrations/connectors/source-klaviyo/metadata.yaml +++ b/airbyte-integrations/connectors/source-klaviyo/metadata.yaml @@ -8,7 +8,7 @@ data: definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde connectorBuildOptions: baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c - dockerImageTag: 1.0.0 + dockerImageTag: 1.1.0 dockerRepository: airbyte/source-klaviyo githubIssueLabel: source-klaviyo icon: klaviyo.svg diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 4843cd213aa1..ed4000da57d3 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -3,7 +3,9 @@ # import re +from http import HTTPStatus from typing import Any, List, Mapping, Tuple +from requests.exceptions import HTTPError from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource @@ -21,6 +23,12 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any try: # we use metrics endpoint because it never returns an error _ = list(Metrics(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh)) + except HTTPError as e: + if e.response.status_code in (HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED): + message = "Please provide a valid API key and make sure it has permissions to read specified streams." + else: + message = f"Unable to connect to Klaviyo API with provided credentials." + return False, message except Exception as e: original_error_message = repr(e) @@ -39,7 +47,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: :param config: A Mapping of the user input configuration as defined in the connector spec. """ api_key = config["api_key"] - start_date = config["start_date"] + start_date = config.get("start_date") return [ Campaigns(api_key=api_key), Events(api_key=api_key, start_date=start_date), diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json index e5b1408af651..1f03cf982e63 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/spec.json @@ -10,17 +10,19 @@ "title": "Api Key", "description": "Klaviyo API Key. See our docs if you need help finding this key.", "airbyte_secret": true, - "type": "string" + "type": "string", + "order": 0 }, "start_date": { "title": "Start Date", - "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.", + "description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field is optional - if not provided, all data will be replicated.", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", "examples": ["2017-01-25T00:00:00Z"], "type": "string", - "format": "date-time" + "format": "date-time", + "order": 1 } }, - "required": ["api_key", "start_date"] + "required": ["api_key"] } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index a6671796bb61..2d47b6017e2d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -83,7 +83,7 @@ def map_record(self, record: Mapping): class IncrementalKlaviyoStreamLatest(KlaviyoStreamLatest, ABC): """Base class for all incremental streams, requires cursor_field to be declared""" - def __init__(self, start_date: str, **kwargs): + def __init__(self, start_date: Optional[str], **kwargs): super().__init__(**kwargs) self._start_ts = start_date @@ -103,11 +103,13 @@ def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs) if not params.get("filter"): - latest_cursor = pendulum.parse(self._start_ts) stream_state_cursor_value = stream_state.get(self.cursor_field) - if stream_state_cursor_value: - latest_cursor = max(latest_cursor, pendulum.parse(stream_state[self.cursor_field])) - params["filter"] = "greater-than(" + self.cursor_field + "," + latest_cursor.isoformat() + ")" + latest_cursor = self._start_ts or stream_state_cursor_value + if latest_cursor: + latest_cursor = pendulum.parse(latest_cursor) + if stream_state_cursor_value: + latest_cursor = max(latest_cursor, pendulum.parse(stream_state_cursor_value)) + params["filter"] = f"greater-than({self.cursor_field},{latest_cursor.isoformat()})" params["sort"] = self.cursor_field return params @@ -118,8 +120,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late Required for incremental. """ current_stream_cursor_value = current_stream_state.get(self.cursor_field, self._start_ts) - latest_record_cursor_value = latest_record[self.cursor_field] - latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) + latest_cursor = pendulum.parse(latest_record[self.cursor_field]) + if current_stream_cursor_value: + latest_cursor = max(latest_cursor, pendulum.parse(current_stream_cursor_value)) return {self.cursor_field: latest_cursor.isoformat()} @@ -193,9 +196,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class IncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC): """Base class for all incremental streams, requires cursor_field to be declared""" - def __init__(self, start_date: str, **kwargs): + def __init__(self, start_date: Optional[str], **kwargs): super().__init__(**kwargs) - self._start_ts = int(pendulum.parse(start_date).timestamp()) + self._start_ts = int(pendulum.parse(start_date).timestamp()) if start_date else 0 self._start_sync = int(pendulum.now().timestamp()) @property @@ -254,9 +257,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, class ReverseIncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC): """Base class for all streams that natively incremental but supports desc & asc order""" - def __init__(self, start_date: str, **kwargs): + def __init__(self, start_date: Optional[str], **kwargs): super().__init__(**kwargs) - self._start_datetime = pendulum.parse(start_date) + self._start_datetime = pendulum.parse(start_date) if start_date else None self._reversed = False self._reached_old_records = False self._low_boundary = None @@ -280,7 +283,9 @@ def request_params(self, stream_state=None, **kwargs): stream_state = stream_state or {} if stream_state: self._reversed = True - self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime) + self._low_boundary = pendulum.parse(stream_state[self.cursor_field]) + if self._start_datetime: + self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime) params = super().request_params(stream_state=stream_state, **kwargs) params["sort"] = "desc" if self._reversed else "asc" @@ -317,13 +322,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp """:return an iterable containing each record in the response""" for record in super().parse_response(response=response, **kwargs): - if self._reversed: - if pendulum.parse(record[self.cursor_field]) < self._low_boundary: - self._reached_old_records = True - continue - else: - if pendulum.parse(record[self.cursor_field]) < self._start_datetime: - continue + if self._reversed and pendulum.parse(record[self.cursor_field]) < self._low_boundary: + self._reached_old_records = True + continue + elif self._start_datetime and pendulum.parse(record[self.cursor_field]) < self._start_datetime: + continue yield record diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py index c9176b00cacb..6844ef5edf11 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py @@ -15,13 +15,13 @@ 400, "Bad request", False, - "HTTPError('400 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')", + "Unable to connect to Klaviyo API with provided credentials.", ), ( 403, "Forbidden", False, - "HTTPError('403 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')", + "Please provide a valid API key and make sure it has permissions to read specified streams.", ), ), ) diff --git a/docs/integrations/sources/klaviyo.md b/docs/integrations/sources/klaviyo.md index b6408c4ffa99..c0eebd8b5e05 100644 --- a/docs/integrations/sources/klaviyo.md +++ b/docs/integrations/sources/klaviyo.md @@ -4,14 +4,22 @@ This page contains the setup guide and reference information for the Klaviyo sou ## Prerequisites -To set up the Klaviyo source connector, you'll need the [Klaviyo Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). +- Klaviyo [account](https://www.klaviyo.com) +- [Klaviyo Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3) -## Set up the Klaviyo connector in Airbyte +## Setup guide -1. [Log into your Airbyte Cloud](https://cloud.airbyte.com/workspaces) or navigate to the Airbyte Open Source dashboard. +### Step 1: Set up Klaviyo + +1. Create a [Klaviyo account](https://www.klaviyo.com) +2. Create a [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). Make sure you selected all [scopes](https://help.klaviyo.com/hc/en-us/articles/7423954176283) corresponding to the streams you would like to replicate. + +### Step 2: Set up the Klaviyo connector in Airbyte + +1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account. 2. Click **Sources** and then click **+ New source**. -3. On the Set up the source page, select **Klaviyo** from the Source type dropdown. -4. Enter the name for the Klaviyo connector. +3. On the Set up the source page, select **Klaviyo** from the **Source type** dropdown. +4. Enter a name for the Klaviyo connector. 5. For **Api Key**, enter the Klaviyo [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). 6. For **Start Date**, enter the date in YYYY-MM-DD format. The data added on and after this date will be replicated. 7. Click **Set up source**. From f44787473f9d0726a5a3c0151664e07fcf6a9462 Mon Sep 17 00:00:00 2001 From: askarpets Date: Mon, 23 Oct 2023 15:38:48 +0000 Subject: [PATCH 2/5] Automated Commit - Formatting Changes --- .../connectors/source-klaviyo/source_klaviyo/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index ed4000da57d3..96e6bf50d584 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -5,11 +5,11 @@ import re from http import HTTPStatus from typing import Any, List, Mapping, Tuple -from requests.exceptions import HTTPError from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +from requests.exceptions import HTTPError from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles From 451efb7e82110af5bd16b920c16b55b12fbb2d9b Mon Sep 17 00:00:00 2001 From: Anton Karpets Date: Mon, 23 Oct 2023 18:40:04 +0300 Subject: [PATCH 3/5] Update changelog --- .../source-klaviyo/source_klaviyo/source.py | 2 +- docs/integrations/sources/klaviyo.md | 41 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index ed4000da57d3..96e6bf50d584 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -5,11 +5,11 @@ import re from http import HTTPStatus from typing import Any, List, Mapping, Tuple -from requests.exceptions import HTTPError from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +from requests.exceptions import HTTPError from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles diff --git a/docs/integrations/sources/klaviyo.md b/docs/integrations/sources/klaviyo.md index c0eebd8b5e05..1dd066442fdd 100644 --- a/docs/integrations/sources/klaviyo.md +++ b/docs/integrations/sources/klaviyo.md @@ -60,23 +60,24 @@ The Klaviyo connector should not run into Klaviyo API limitations under normal u ## Changelog -| Version | Date | Pull Request | Subject | -|:---------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------------------| -| `1.0.0` | 2023-10-18 | [31565](https://github.com/airbytehq/airbyte/pull/31565) | added new known fields for 'events' stream | -| `0.5.0` | 2023-10-19 | [31611](https://github.com/airbytehq/airbyte/pull/31611) | Add `date-time` format for `datetime` field in `Events` stream | -| `0.4.0` | 2023-10-18 | [31562](https://github.com/airbytehq/airbyte/pull/31562) | Add `archived` field to `Flows` stream | -| `0.3.3` | 2023-10-13 | [31379](https://github.com/airbytehq/airbyte/pull/31379) | Skip streams that the connector no longer has access to | -| `0.3.2` | 2023-06-20 | [27498](https://github.com/airbytehq/airbyte/pull/27498) | Do not store state in the future | -| `0.3.1` | 2023-06-08 | [27162](https://github.com/airbytehq/airbyte/pull/27162) | Anonymize check connection error message | -| `0.3.0` | 2023-02-18 | [23236](https://github.com/airbytehq/airbyte/pull/23236) | Add ` Email Templates` stream | -| `0.2.0` | 2023-03-13 | [22942](https://github.com/airbytehq/airbyte/pull/23968) | Add `Profiles` stream | -| `0.1.13` | 2023-02-13 | [22942](https://github.com/airbytehq/airbyte/pull/22942) | Specified date formatting in specification | -| `0.1.12` | 2023-01-30 | [22071](https://github.com/airbytehq/airbyte/pull/22071) | Fix `Events` stream schema | -| `0.1.11` | 2023-01-27 | [22012](https://github.com/airbytehq/airbyte/pull/22012) | Set `AvailabilityStrategy` for streams explicitly to `None` | -| `0.1.10` | 2022-09-29 | [17422](https://github.com/airbytehq/airbyte/issues/17422) | Update CDK dependency | -| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/issues/17304) | Migrate to per-stream state. | -| `0.1.6` | 2022-07-20 | [14872](https://github.com/airbytehq/airbyte/issues/14872) | Increase test coverage | -| `0.1.5` | 2022-07-12 | [14617](https://github.com/airbytehq/airbyte/issues/14617) | Set max_retries = 10 for `lists` stream. | -| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. | -| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. | -| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT | +| Version | Date | Pull Request | Subject | +|:---------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------| +| `1.1.0` | 2023-10-23 | [31710](https://github.com/airbytehq/airbyte/pull/31710) | Make `start_date` config field optional | +| `1.0.0` | 2023-10-18 | [31565](https://github.com/airbytehq/airbyte/pull/31565) | added new known fields for 'events' stream | +| `0.5.0` | 2023-10-19 | [31611](https://github.com/airbytehq/airbyte/pull/31611) | Add `date-time` format for `datetime` field in `Events` stream | +| `0.4.0` | 2023-10-18 | [31562](https://github.com/airbytehq/airbyte/pull/31562) | Add `archived` field to `Flows` stream | +| `0.3.3` | 2023-10-13 | [31379](https://github.com/airbytehq/airbyte/pull/31379) | Skip streams that the connector no longer has access to | +| `0.3.2` | 2023-06-20 | [27498](https://github.com/airbytehq/airbyte/pull/27498) | Do not store state in the future | +| `0.3.1` | 2023-06-08 | [27162](https://github.com/airbytehq/airbyte/pull/27162) | Anonymize check connection error message | +| `0.3.0` | 2023-02-18 | [23236](https://github.com/airbytehq/airbyte/pull/23236) | Add ` Email Templates` stream | +| `0.2.0` | 2023-03-13 | [22942](https://github.com/airbytehq/airbyte/pull/23968) | Add `Profiles` stream | +| `0.1.13` | 2023-02-13 | [22942](https://github.com/airbytehq/airbyte/pull/22942) | Specified date formatting in specification | +| `0.1.12` | 2023-01-30 | [22071](https://github.com/airbytehq/airbyte/pull/22071) | Fix `Events` stream schema | +| `0.1.11` | 2023-01-27 | [22012](https://github.com/airbytehq/airbyte/pull/22012) | Set `AvailabilityStrategy` for streams explicitly to `None` | +| `0.1.10` | 2022-09-29 | [17422](https://github.com/airbytehq/airbyte/issues/17422) | Update CDK dependency | +| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/issues/17304) | Migrate to per-stream state. | +| `0.1.6` | 2022-07-20 | [14872](https://github.com/airbytehq/airbyte/issues/14872) | Increase test coverage | +| `0.1.5` | 2022-07-12 | [14617](https://github.com/airbytehq/airbyte/issues/14617) | Set max_retries = 10 for `lists` stream. | +| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. | +| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. | +| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT | From 912a7218cdbb030fc61b6b810acb886fd61ec7c1 Mon Sep 17 00:00:00 2001 From: Anton Karpets Date: Mon, 23 Oct 2023 19:07:50 +0300 Subject: [PATCH 4/5] Fix formatting --- .../connectors/source-klaviyo/source_klaviyo/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 96e6bf50d584..d7a66141be92 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -27,7 +27,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any if e.response.status_code in (HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED): message = "Please provide a valid API key and make sure it has permissions to read specified streams." else: - message = f"Unable to connect to Klaviyo API with provided credentials." + message = "Unable to connect to Klaviyo API with provided credentials." return False, message except Exception as e: original_error_message = repr(e) From 3305c043f14fb06ff5cdc87ccf32b836c11a588d Mon Sep 17 00:00:00 2001 From: Anton Karpets Date: Tue, 24 Oct 2023 11:16:12 +0300 Subject: [PATCH 5/5] Update tests --- .../unit_tests/test_latest_streams.py | 47 ++++++++++++++----- .../source-klaviyo/unit_tests/test_source.py | 12 +++++ .../source-klaviyo/unit_tests/test_streams.py | 26 ++++++++-- 3 files changed, 69 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py index da8768375f3d..7e816782a052 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py @@ -113,18 +113,18 @@ def test_availability_strategy(self): class TestIncrementalKlaviyoStreamLatest: api_key = "some_key" - start_date = START_DATE.isoformat() def test_cursor_field_is_required(self): with pytest.raises( TypeError, match="Can't instantiate abstract class IncrementalKlaviyoStreamLatest with abstract methods cursor_field, path" ): - IncrementalKlaviyoStreamLatest(api_key=self.api_key, start_date=self.start_date) + IncrementalKlaviyoStreamLatest(api_key=self.api_key, start_date=START_DATE.isoformat()) @pytest.mark.parametrize( - ("stream_state_date", "next_page_token", "expected_params"), + ("config_start_date", "stream_state_date", "next_page_token", "expected_params"), ( ( + START_DATE.isoformat(), {"updated": "2023-01-01T00:00:00+00:00"}, {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, { @@ -134,6 +134,7 @@ def test_cursor_field_is_required(self): }, ), ( + START_DATE.isoformat(), None, {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, { @@ -143,28 +144,52 @@ def test_cursor_field_is_required(self): }, ), ( + START_DATE.isoformat(), None, {"filter": "some_filter"}, {"filter": "some_filter"}, ), + ( + None, + None, + {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, + { + "page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa", + "sort": "updated", + }, + ), + ( + None, + {"updated": "2023-01-01T00:00:00+00:00"}, + {"page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa"}, + { + "filter": "greater-than(updated,2023-01-01T00:00:00+00:00)", + "page[cursor]": "aaA0aAo0aAA0AaAaAaa0AaaAAAaaA00AAAa0AA00A0AAAaAa", + "sort": "updated", + }, + ), ), ) - def test_request_params(self, stream_state_date, next_page_token, expected_params): - stream = SomeIncrementalStream(api_key=self.api_key, start_date=self.start_date) + def test_request_params(self, config_start_date, stream_state_date, next_page_token, expected_params): + stream = SomeIncrementalStream(api_key=self.api_key, start_date=config_start_date) inputs = {"stream_state": stream_state_date, "next_page_token": next_page_token} assert stream.request_params(**inputs) == expected_params @pytest.mark.parametrize( - ("current_cursor", "latest_cursor", "expected_cursor"), + ("config_start_date", "current_cursor", "latest_cursor", "expected_cursor"), ( - ("2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00", "2023-01-02T00:00:00+00:00"), - ("2023-01-02T00:00:00+00:00", "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00"), + (START_DATE.isoformat(), "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00", "2023-01-02T00:00:00+00:00"), + (START_DATE.isoformat(), "2023-01-02T00:00:00+00:00", "2023-01-01T00:00:00+00:00", "2023-01-02T00:00:00+00:00"), + (START_DATE.isoformat(), None, "2019-01-01T00:00:00+00:00", "2020-10-10T00:00:00+00:00"), + (None, "2020-10-10T00:00:00+00:00", "2019-01-01T00:00:00+00:00", "2020-10-10T00:00:00+00:00"), + (None, None, "2019-01-01T00:00:00+00:00", "2019-01-01T00:00:00+00:00"), ), ) - def test_get_updated_state(self, current_cursor, latest_cursor, expected_cursor): - stream = SomeIncrementalStream(api_key=self.api_key, start_date=self.start_date) + def test_get_updated_state(self, config_start_date, current_cursor, latest_cursor, expected_cursor): + stream = SomeIncrementalStream(api_key=self.api_key, start_date=config_start_date) inputs = { - "current_stream_state": {stream.cursor_field: current_cursor}, + # {"key": "value"} is needed to mimic the case when current_stream_state doesn't have cursor key + "current_stream_state": {stream.cursor_field: current_cursor} if current_cursor else {"key": "value"}, "latest_record": {stream.cursor_field: latest_cursor}, } assert stream.get_updated_state(**inputs) == {stream.cursor_field: expected_cursor} diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py index 6844ef5edf11..1781ec0683ae 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_source.py @@ -38,6 +38,18 @@ def test_check_connection(requests_mock, status_code, response, is_connection_su assert error == error_msg +def test_check_connection_unexpected_error(requests_mock): + requests_mock.register_uri( + "GET", + "https://a.klaviyo.com/api/v1/metrics?api_key=api_key&count=100", + exc=Exception("Something went wrong, api_key=some_api_key"), + ) + source = SourceKlaviyo() + success, error = source.check_connection(logger=None, config={"api_key": "api_key"}) + assert success is False + assert error == "Exception('Something went wrong, api_key=***')" + + def test_streams(): source = SourceKlaviyo() config = {"api_key": "some_key", "start_date": pendulum.datetime(2020, 10, 10).isoformat()} diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py index caa292b65e15..ad678a18ab0c 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py @@ -92,40 +92,56 @@ def test_cursor_field_is_required(self): IncrementalKlaviyoStreamV1(api_key="some_key", start_date=START_DATE.isoformat()) @pytest.mark.parametrize( - ["next_page_token", "stream_state", "expected_params"], + ["config_start_date", "next_page_token", "stream_state", "expected_params"], [ # start with start_date - (None, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp}), + (START_DATE.isoformat(), None, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp}), # pagination overrule - ({"since": 123}, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}), + (START_DATE.isoformat(), {"since": 123}, {}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}), # start_date overrule state if state < start_date ( + START_DATE.isoformat(), None, {"updated_at": START_DATE.int_timestamp - 1}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp}, ), # but pagination still overrule ( + START_DATE.isoformat(), {"since": 123}, {"updated_at": START_DATE.int_timestamp - 1}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}, ), # and again ( + START_DATE.isoformat(), {"since": 123}, {"updated_at": START_DATE.int_timestamp + 1}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": 123}, ), # finally state > start_date and can be used ( + START_DATE.isoformat(), + None, + {"updated_at": START_DATE.int_timestamp + 1}, + {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp + 1}, + ), + ( + None, None, {"updated_at": START_DATE.int_timestamp + 1}, {"api_key": "some_key", "count": 100, "sort": "asc", "since": START_DATE.int_timestamp + 1}, ), + ( + None, + None, + None, + {"api_key": "some_key", "count": 100, "sort": "asc", "since": 0}, + ), ], ) - def test_request_params(self, next_page_token, stream_state, expected_params): - stream = SomeIncrementalStream(api_key="some_key", start_date=START_DATE.isoformat()) + def test_request_params(self, config_start_date, next_page_token, stream_state, expected_params): + stream = SomeIncrementalStream(api_key="some_key", start_date=config_start_date) result = stream.request_params(stream_state=stream_state, next_page_token=next_page_token) assert result == expected_params