Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Source Klaviyo: make start_date optional #31710

Merged
merged 7 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
dockerImageTag: 1.0.0
dockerImageTag: 1.1.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
#

import re
from http import HTTPStatus
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from requests.exceptions import HTTPError
from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles


Expand All @@ -21,6 +23,12 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
try:
# we use metrics endpoint because it never returns an error
_ = list(Metrics(api_key=config["api_key"]).read_records(sync_mode=SyncMode.full_refresh))
except HTTPError as e:
if e.response.status_code in (HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED):
message = "Please provide a valid API key and make sure it has permissions to read specified streams."
else:
message = "Unable to connect to Klaviyo API with provided credentials."
return False, message
except Exception as e:
original_error_message = repr(e)

Expand All @@ -39,7 +47,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
api_key = config["api_key"]
start_date = config["start_date"]
start_date = config.get("start_date")
return [
Campaigns(api_key=api_key),
Events(api_key=api_key, start_date=start_date),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
"title": "Api Key",
"description": "Klaviyo API Key. See our <a href=\"https://docs.airbyte.com/integrations/sources/klaviyo\">docs</a> if you need help finding this key.",
"airbyte_secret": true,
"type": "string"
"type": "string",
"order": 0
},
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. This field is optional - if not provided, all data will be replicated.",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2017-01-25T00:00:00Z"],
"type": "string",
"format": "date-time"
"format": "date-time",
"order": 1
}
},
"required": ["api_key", "start_date"]
"required": ["api_key"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def map_record(self, record: Mapping):
class IncrementalKlaviyoStreamLatest(KlaviyoStreamLatest, ABC):
"""Base class for all incremental streams, requires cursor_field to be declared"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_ts = start_date

Expand All @@ -103,11 +103,13 @@ def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)

if not params.get("filter"):
latest_cursor = pendulum.parse(self._start_ts)
stream_state_cursor_value = stream_state.get(self.cursor_field)
if stream_state_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(stream_state[self.cursor_field]))
params["filter"] = "greater-than(" + self.cursor_field + "," + latest_cursor.isoformat() + ")"
latest_cursor = self._start_ts or stream_state_cursor_value
if latest_cursor:
latest_cursor = pendulum.parse(latest_cursor)
if stream_state_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(stream_state_cursor_value))
params["filter"] = f"greater-than({self.cursor_field},{latest_cursor.isoformat()})"
params["sort"] = self.cursor_field
return params

Expand All @@ -118,8 +120,9 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
Required for incremental.
"""
current_stream_cursor_value = current_stream_state.get(self.cursor_field, self._start_ts)
latest_record_cursor_value = latest_record[self.cursor_field]
latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value))
latest_cursor = pendulum.parse(latest_record[self.cursor_field])
if current_stream_cursor_value:
latest_cursor = max(latest_cursor, pendulum.parse(current_stream_cursor_value))
return {self.cursor_field: latest_cursor.isoformat()}


Expand Down Expand Up @@ -193,9 +196,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
class IncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC):
"""Base class for all incremental streams, requires cursor_field to be declared"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_ts = int(pendulum.parse(start_date).timestamp())
self._start_ts = int(pendulum.parse(start_date).timestamp()) if start_date else 0
self._start_sync = int(pendulum.now().timestamp())

@property
Expand Down Expand Up @@ -254,9 +257,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
class ReverseIncrementalKlaviyoStreamV1(KlaviyoStreamV1, ABC):
"""Base class for all streams that natively incremental but supports desc & asc order"""

def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: Optional[str], **kwargs):
super().__init__(**kwargs)
self._start_datetime = pendulum.parse(start_date)
self._start_datetime = pendulum.parse(start_date) if start_date else None
self._reversed = False
self._reached_old_records = False
self._low_boundary = None
Expand All @@ -280,7 +283,9 @@ def request_params(self, stream_state=None, **kwargs):
stream_state = stream_state or {}
if stream_state:
self._reversed = True
self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime)
self._low_boundary = pendulum.parse(stream_state[self.cursor_field])
if self._start_datetime:
self._low_boundary = max(pendulum.parse(stream_state[self.cursor_field]), self._start_datetime)
params = super().request_params(stream_state=stream_state, **kwargs)
params["sort"] = "desc" if self._reversed else "asc"

Expand Down Expand Up @@ -317,13 +322,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
""":return an iterable containing each record in the response"""

for record in super().parse_response(response=response, **kwargs):
if self._reversed:
if pendulum.parse(record[self.cursor_field]) < self._low_boundary:
self._reached_old_records = True
continue
else:
if pendulum.parse(record[self.cursor_field]) < self._start_datetime:
continue
if self._reversed and pendulum.parse(record[self.cursor_field]) < self._low_boundary:
self._reached_old_records = True
continue
elif self._start_datetime and pendulum.parse(record[self.cursor_field]) < self._start_datetime:
continue
yield record


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
400,
"Bad request",
False,
"HTTPError('400 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')",
"Unable to connect to Klaviyo API with provided credentials.",
),
(
403,
"Forbidden",
False,
"HTTPError('403 Client Error: None for url: https://a.klaviyo.com/api/v1/metrics?api_key=***&count=100')",
"Please provide a valid API key and make sure it has permissions to read specified streams.",
),
),
)
Expand Down
59 changes: 34 additions & 25 deletions docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ This page contains the setup guide and reference information for the Klaviyo sou

## Prerequisites

To set up the Klaviyo source connector, you'll need the [Klaviyo Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3).
- Klaviyo [account](https://www.klaviyo.com)
- [Klaviyo Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3)

## Set up the Klaviyo connector in Airbyte
## Setup guide

1. [Log into your Airbyte Cloud](https://cloud.airbyte.com/workspaces) or navigate to the Airbyte Open Source dashboard.
### Step 1: Set up Klaviyo

1. Create a [Klaviyo account](https://www.klaviyo.com)
2. Create a [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3). Make sure you selected all [scopes](https://help.klaviyo.com/hc/en-us/articles/7423954176283) corresponding to the streams you would like to replicate.

### Step 2: Set up the Klaviyo connector in Airbyte

1. [Log into your Airbyte Cloud](https://cloud.airbyte.io/workspaces) account.
2. Click **Sources** and then click **+ New source**.
3. On the Set up the source page, select **Klaviyo** from the Source type dropdown.
4. Enter the name for the Klaviyo connector.
3. On the Set up the source page, select **Klaviyo** from the **Source type** dropdown.
4. Enter a name for the Klaviyo connector.
5. For **Api Key**, enter the Klaviyo [Private API key](https://help.klaviyo.com/hc/en-us/articles/115005062267-How-to-Manage-Your-Account-s-API-Keys#your-private-api-keys3).
6. For **Start Date**, enter the date in YYYY-MM-DD format. The data added on and after this date will be replicated.
7. Click **Set up source**.
Expand Down Expand Up @@ -52,23 +60,24 @@ The Klaviyo connector should not run into Klaviyo API limitations under normal u

## Changelog

| Version | Date | Pull Request | Subject |
|:---------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------------------|
| `1.0.0` | 2023-10-18 | [31565](https://github.com/airbytehq/airbyte/pull/31565) | added new known fields for 'events' stream |
| `0.5.0` | 2023-10-19 | [31611](https://github.com/airbytehq/airbyte/pull/31611) | Add `date-time` format for `datetime` field in `Events` stream |
| `0.4.0` | 2023-10-18 | [31562](https://github.com/airbytehq/airbyte/pull/31562) | Add `archived` field to `Flows` stream |
| `0.3.3` | 2023-10-13 | [31379](https://github.com/airbytehq/airbyte/pull/31379) | Skip streams that the connector no longer has access to |
| `0.3.2` | 2023-06-20 | [27498](https://github.com/airbytehq/airbyte/pull/27498) | Do not store state in the future |
| `0.3.1` | 2023-06-08 | [27162](https://github.com/airbytehq/airbyte/pull/27162) | Anonymize check connection error message |
| `0.3.0` | 2023-02-18 | [23236](https://github.com/airbytehq/airbyte/pull/23236) | Add ` Email Templates` stream |
| `0.2.0` | 2023-03-13 | [22942](https://github.com/airbytehq/airbyte/pull/23968) | Add `Profiles` stream |
| `0.1.13` | 2023-02-13 | [22942](https://github.com/airbytehq/airbyte/pull/22942) | Specified date formatting in specification |
| `0.1.12` | 2023-01-30 | [22071](https://github.com/airbytehq/airbyte/pull/22071) | Fix `Events` stream schema |
| `0.1.11` | 2023-01-27 | [22012](https://github.com/airbytehq/airbyte/pull/22012) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `0.1.10` | 2022-09-29 | [17422](https://github.com/airbytehq/airbyte/issues/17422) | Update CDK dependency |
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/issues/17304) | Migrate to per-stream state. |
| `0.1.6` | 2022-07-20 | [14872](https://github.com/airbytehq/airbyte/issues/14872) | Increase test coverage |
| `0.1.5` | 2022-07-12 | [14617](https://github.com/airbytehq/airbyte/issues/14617) | Set max_retries = 10 for `lists` stream. |
| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. |
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |
| Version | Date | Pull Request | Subject |
|:---------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------|
| `1.1.0` | 2023-10-23 | [31710](https://github.com/airbytehq/airbyte/pull/31710) | Make `start_date` config field optional |
| `1.0.0` | 2023-10-18 | [31565](https://github.com/airbytehq/airbyte/pull/31565) | added new known fields for 'events' stream |
| `0.5.0` | 2023-10-19 | [31611](https://github.com/airbytehq/airbyte/pull/31611) | Add `date-time` format for `datetime` field in `Events` stream |
| `0.4.0` | 2023-10-18 | [31562](https://github.com/airbytehq/airbyte/pull/31562) | Add `archived` field to `Flows` stream |
| `0.3.3` | 2023-10-13 | [31379](https://github.com/airbytehq/airbyte/pull/31379) | Skip streams that the connector no longer has access to |
| `0.3.2` | 2023-06-20 | [27498](https://github.com/airbytehq/airbyte/pull/27498) | Do not store state in the future |
| `0.3.1` | 2023-06-08 | [27162](https://github.com/airbytehq/airbyte/pull/27162) | Anonymize check connection error message |
| `0.3.0` | 2023-02-18 | [23236](https://github.com/airbytehq/airbyte/pull/23236) | Add ` Email Templates` stream |
| `0.2.0` | 2023-03-13 | [22942](https://github.com/airbytehq/airbyte/pull/23968) | Add `Profiles` stream |
| `0.1.13` | 2023-02-13 | [22942](https://github.com/airbytehq/airbyte/pull/22942) | Specified date formatting in specification |
| `0.1.12` | 2023-01-30 | [22071](https://github.com/airbytehq/airbyte/pull/22071) | Fix `Events` stream schema |
| `0.1.11` | 2023-01-27 | [22012](https://github.com/airbytehq/airbyte/pull/22012) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| `0.1.10` | 2022-09-29 | [17422](https://github.com/airbytehq/airbyte/issues/17422) | Update CDK dependency |
| `0.1.9` | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/issues/17304) | Migrate to per-stream state. |
| `0.1.6` | 2022-07-20 | [14872](https://github.com/airbytehq/airbyte/issues/14872) | Increase test coverage |
| `0.1.5` | 2022-07-12 | [14617](https://github.com/airbytehq/airbyte/issues/14617) | Set max_retries = 10 for `lists` stream. |
| `0.1.4` | 2022-04-15 | [11723](https://github.com/airbytehq/airbyte/issues/11723) | Enhance klaviyo source for flows stream and update to events stream. |
| `0.1.3` | 2021-12-09 | [8592](https://github.com/airbytehq/airbyte/pull/8592) | Improve performance, make Global Exclusions stream incremental and enable Metrics stream. |
| `0.1.2` | 2021-10-19 | [6952](https://github.com/airbytehq/airbyte/pull/6952) | Update schema validation in SAT |
Loading