Skip to content

Commit

Permalink
🐛Source Klaviyo: add availability strategy (airbytehq#31379)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Karpets authored and ariesgun committed Oct 23, 2023
1 parent 43b3189 commit 7f489c8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-klaviyo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_klaviyo ./source_klaviyo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.2
LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.name=airbyte/source-klaviyo

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Dict, Optional

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests import HTTPError, codes


class KlaviyoAvailabilityStrategyLatest(HttpAvailabilityStrategy):
def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional[Source], error: HTTPError
) -> Dict[int, str]:
reasons_for_codes: Dict[int, str] = super().reasons_for_unavailable_status_codes(stream, logger, source, error)
reasons_for_codes[codes.UNAUTHORIZED] = (
"This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate"
)

return reasons_for_codes
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from .availability_strategy import KlaviyoAvailabilityStrategyLatest


class KlaviyoStreamLatest(HttpStream, ABC):
"""Base stream for api version v2023-02-22"""
Expand All @@ -26,8 +28,8 @@ def __init__(self, api_key: str, **kwargs):
self._api_key = api_key

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return KlaviyoAvailabilityStrategyLatest()

def request_headers(self, **kwargs) -> Mapping[str, Any]:
base_headers = super().request_headers(**kwargs)
Expand Down Expand Up @@ -148,10 +150,6 @@ def __init__(self, api_key: str, **kwargs):
transform_function = self.get_custom_transform()
self.transformer.registerCustomTransform(transform_function)

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def get_custom_transform(self):
def custom_transform_date_rfc3339(original_value, field_schema):
if original_value and "format" in field_schema and field_schema["format"] == "date-time":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import requests
from pydantic import BaseModel
from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategyLatest
from source_klaviyo.streams import IncrementalKlaviyoStreamLatest, Profiles

START_DATE = pendulum.datetime(2020, 10, 10)
Expand Down Expand Up @@ -100,3 +101,7 @@ def test_parse_response(self, mocker):
"properties": {"Status": "onboarding_started"},
},
]

def test_availability_strategy(self):
stream = Profiles(api_key="some_key", start_date=START_DATE.isoformat())
assert isinstance(stream.availability_strategy, KlaviyoAvailabilityStrategyLatest)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pendulum
import pytest
import requests
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from pydantic import BaseModel
from source_klaviyo.streams import EmailTemplates, Events, IncrementalKlaviyoStreamV1, KlaviyoStreamV1, ReverseIncrementalKlaviyoStreamV1

Expand Down Expand Up @@ -78,6 +79,10 @@ def test_parse_response(self, response):

assert list(result) == response.json.return_value["data"]

def test_availability_strategy(self):
stream = SomeStream(api_key="some_key")
assert isinstance(stream.availability_strategy, HttpAvailabilityStrategy)


class TestIncrementalKlaviyoStreamV1:
def test_cursor_field_is_required(self):
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ The Klaviyo connector should not run into Klaviyo API limitations under normal u
## Changelog

| Version | Date | Pull Request | Subject |
| :------- | :--------- | :--------------------------------------------------------- | :---------------------------------------------------------------------------------------- |
|:---------|:-----------| :--------------------------------------------------------- |:------------------------------------------------------------------------------------------|
| `0.3.3` | 2023-10-13 | [31379](https://github.com/airbytehq/airbyte/pull/31379) | Skip streams that the connector no longer has access to |
| `0.3.2` | 2023-06-20 | [27498](https://github.com/airbytehq/airbyte/pull/27498) | Do not store state in the future |
| `0.3.1` | 2023-06-08 | [27162](https://github.com/airbytehq/airbyte/pull/27162) | Anonymize check connection error message |
| `0.3.0` | 2023-02-18 | [23236](https://github.com/airbytehq/airbyte/pull/23236) | Add ` Email Templates` stream |
Expand Down

0 comments on commit 7f489c8

Please sign in to comment.