Skip to content

Commit

Permalink
Source Klaviyo: add availability strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Karpets committed Oct 13, 2023
1 parent e757fb8 commit d0e276a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Dict, Optional

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests import HTTPError, codes


class KlaviyoAvailabilityStrategyLatest(HttpAvailabilityStrategy):
def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional[Source], error: HTTPError
) -> Dict[int, str]:
reasons_for_codes: Dict[int, str] = super().reasons_for_unavailable_status_codes(stream, logger, source, error)
reasons_for_codes[codes.UNAUTHORIZED] = (
"This is most likely due to insufficient permissions on the credentials in use. "
"Try to grant required permissions/scopes or re-authenticate"
)

return reasons_for_codes
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

from .availability_strategy import KlaviyoAvailabilityStrategyLatest


class KlaviyoStreamLatest(HttpStream, ABC):
"""Base stream for api version v2023-02-22"""
Expand All @@ -26,8 +28,8 @@ def __init__(self, api_key: str, **kwargs):
self._api_key = api_key

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return KlaviyoAvailabilityStrategyLatest()

def request_headers(self, **kwargs) -> Mapping[str, Any]:
base_headers = super().request_headers(**kwargs)
Expand Down Expand Up @@ -148,10 +150,6 @@ def __init__(self, api_key: str, **kwargs):
transform_function = self.get_custom_transform()
self.transformer.registerCustomTransform(transform_function)

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None

def get_custom_transform(self):
def custom_transform_date_rfc3339(original_value, field_schema):
if original_value and "format" in field_schema and field_schema["format"] == "date-time":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import requests
from pydantic import BaseModel
from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategyLatest
from source_klaviyo.streams import IncrementalKlaviyoStreamLatest, Profiles

START_DATE = pendulum.datetime(2020, 10, 10)
Expand Down Expand Up @@ -100,3 +101,7 @@ def test_parse_response(self, mocker):
"properties": {"Status": "onboarding_started"},
},
]

def test_availability_strategy(self):
stream = Profiles(api_key="some_key", start_date=START_DATE.isoformat())
assert isinstance(stream.availability_strategy, KlaviyoAvailabilityStrategyLatest)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pendulum
import pytest
import requests
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from pydantic import BaseModel
from source_klaviyo.streams import EmailTemplates, Events, IncrementalKlaviyoStreamV1, KlaviyoStreamV1, ReverseIncrementalKlaviyoStreamV1

Expand Down Expand Up @@ -78,6 +79,10 @@ def test_parse_response(self, response):

assert list(result) == response.json.return_value["data"]

def test_availability_strategy(self):
stream = SomeStream(api_key="some_key")
assert isinstance(stream.availability_strategy, HttpAvailabilityStrategy)


class TestIncrementalKlaviyoStreamV1:
def test_cursor_field_is_required(self):
Expand Down

0 comments on commit d0e276a

Please sign in to comment.