diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/availability_strategy.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/availability_strategy.py new file mode 100644 index 000000000000..6f71ec8d59ac --- /dev/null +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/availability_strategy.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Dict, Optional + +from airbyte_cdk.sources import Source +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +from requests import HTTPError, codes + + +class KlaviyoAvailabilityStrategyLatest(HttpAvailabilityStrategy): + def reasons_for_unavailable_status_codes( + self, stream: Stream, logger: logging.Logger, source: Optional[Source], error: HTTPError + ) -> Dict[int, str]: + reasons_for_codes: Dict[int, str] = super().reasons_for_unavailable_status_codes(stream, logger, source, error) + reasons_for_codes[codes.UNAUTHORIZED] = ( + "This is most likely due to insufficient permissions on the credentials in use. " + "Try to grant required permissions/scopes or re-authenticate" + ) + + return reasons_for_codes diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index f2b1b085449f..16a7ad0cf425 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -13,6 +13,8 @@ from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from .availability_strategy import KlaviyoAvailabilityStrategyLatest + class KlaviyoStreamLatest(HttpStream, ABC): """Base stream for api version v2023-02-22""" @@ -26,8 +28,8 @@ def __init__(self, api_key: str, **kwargs): self._api_key = api_key @property - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - return None + def availability_strategy(self) -> Optional[AvailabilityStrategy]: + return KlaviyoAvailabilityStrategyLatest() def request_headers(self, **kwargs) -> Mapping[str, Any]: base_headers = super().request_headers(**kwargs) @@ -148,10 +150,6 @@ def __init__(self, api_key: str, **kwargs): transform_function = self.get_custom_transform() self.transformer.registerCustomTransform(transform_function) - @property - def availability_strategy(self) -> Optional["AvailabilityStrategy"]: - return None - def get_custom_transform(self): def custom_transform_date_rfc3339(original_value, field_schema): if original_value and "format" in field_schema and field_schema["format"] == "date-time": diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py index e5101d04808d..47402095159a 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_latest_streams.py @@ -8,6 +8,7 @@ import pytest import requests from pydantic import BaseModel +from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategyLatest from source_klaviyo.streams import IncrementalKlaviyoStreamLatest, Profiles START_DATE = pendulum.datetime(2020, 10, 10) @@ -100,3 +101,7 @@ def test_parse_response(self, mocker): "properties": {"Status": "onboarding_started"}, }, ] + + def test_availability_strategy(self): + stream = Profiles(api_key="some_key", start_date=START_DATE.isoformat()) + assert isinstance(stream.availability_strategy, KlaviyoAvailabilityStrategyLatest) diff --git a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py index e71ea9aec942..caa292b65e15 100644 --- a/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/unit_tests/test_streams.py @@ -8,6 +8,7 @@ import pendulum import pytest import requests +from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy from pydantic import BaseModel from source_klaviyo.streams import EmailTemplates, Events, IncrementalKlaviyoStreamV1, KlaviyoStreamV1, ReverseIncrementalKlaviyoStreamV1 @@ -78,6 +79,10 @@ def test_parse_response(self, response): assert list(result) == response.json.return_value["data"] + def test_availability_strategy(self): + stream = SomeStream(api_key="some_key") + assert isinstance(stream.availability_strategy, HttpAvailabilityStrategy) + class TestIncrementalKlaviyoStreamV1: def test_cursor_field_is_required(self):