Skip to content

Commit

Permalink
Source Klaviyo: enable concurrency (#48452)
Browse files Browse the repository at this point in the history
Co-authored-by: btkcodedev <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent 6f25f82 commit cfcf6a2
Show file tree
Hide file tree
Showing 11 changed files with 609 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ acceptance_tests:
- config_path: secrets/config.json
configured_catalog_path: integration_tests/configured_catalog.json
future_state:
future_state_path: integration_tests/abnormal_state.json
bypass_reason: "This test does not make sense using Concurrent CDK"
skip_comprehensive_incremental_tests: true
timeout_seconds: 7200
spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
definitionId: 95e8cffd-b8c4-4039-968e-d32fb4a69bde
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916
dockerImageTag: 2.10.14
dockerImageTag: 2.11.0
dockerRepository: airbyte/source-klaviyo
githubIssueLabel: source-klaviyo
icon: klaviyo.svg
Expand Down
600 changes: 432 additions & 168 deletions airbyte-integrations/connectors/source-klaviyo/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-klaviyo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.10.14"
version = "2.11.0"
name = "source-klaviyo"
description = "Source implementation for Klaviyo."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -17,7 +17,7 @@ include = "source_klaviyo"

[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte_cdk = "^4"
airbyte_cdk = "^6"

[tool.poetry.scripts]
source-klaviyo = "source_klaviyo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ definitions:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%S%z"
- "%Y-%m-%d %H:%M:%S%z"
step: P1M
step: P7D
cursor_granularity: PT1S
retriever:
$ref: "#/definitions/base_retriever"
Expand Down Expand Up @@ -991,6 +991,18 @@ spec:
be improved by not fetching this field. WARNING: Enabling this setting will stop the
"predictive_analytics" column from being populated in your downstream destination.
order: 2
num_workers:
type: integer
title: Number of concurrent workers
minimum: 1
maximum: 50
default: 10
examples: [1, 2, 3]
description: >-
The number of worker threads to use for the sync.
The performance upper boundary is based on the limit of your Chargebee plan.
More info about the rate limit plan tiers can be found on Chargebee's API <a href="https://developers.klaviyo.com/en/docs/rate_limits_and_error_handling">docs</a>.
order: 3
required: ["api_key"]

metadata:
Expand Down Expand Up @@ -1044,3 +1056,29 @@ metadata:
hasRecords: true
primaryKeysArePresent: true
primaryKeysAreUnique: true

# Klaviyo's rate limiting is different by endpoints:
# - XS: 1/s burst; 15/m steady
# - S: 3/s burst; 60/m steady
# - M: 10/s burst; 150/m steady
# - L: 75/s burst; 700/m steady
# - XL: 350/s burst; 3500/m steady

# As of 2024-11-11, we have the following streams:
# | Stream | Endpoint | Klaviyo Rate Limit Size | Source Concurrency Between Streams | Source Concurrency Within Stream | Source Max Number of Threads Sharing Rate Limits | |
#|-------------------|----------------------------------------------------------------------|-------------------------|------------------------------------|---------------------------------------------------|------------------------------------------------------------------|---------------------------------------------------------------------------------|
#| profiles | https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles | M | Yes, shared with global_exclusions | No as `step` is not defined in `incremental_sync` | 2 | With other streams (global_exclusions), not within stream as `step` not defined |
#| global_exclusions | https://developers.klaviyo.com/en/v2023-02-22/reference/get_profiles | M | Yes, shared with profiles | No as `step` is not defined in `incremental_sync` | 2 | With other streams (profiles), not within stream as `step` not defined |
#| events | https://developers.klaviyo.com/en/reference/get_events | XL | Yes, shared with events_detailed | Yes | number of steps for events + number of steps for events_detailed | With other streams (events_detailed) and within stream as sliced on `datetime` |
#| events_detailed | https://developers.klaviyo.com/en/reference/get_events | XL | Yes, shared with events | Yes | number of steps for events + number of steps for events_detailed | With other streams (events) and within stream as sliced on `datetime` |
#| email_templates | https://developers.klaviyo.com/en/reference/get_templates | M | None | No as `step` is not defined in `incremental_sync` | 1 | None |
#| metrics | https://developers.klaviyo.com/en/reference/get_metrics | M | None | No as `step` is not defined in `incremental_sync` | 1 | None |
#| lists | https://developers.klaviyo.com/en/reference/get_lists | L | Yes, shared with lists_detailed | No as `step` is not defined in `incremental_sync` | 2 | With other streams (lists_detailed), not within stream as `step` not defined |
#| lists_detailed | https://developers.klaviyo.com/en/reference/get_lists | L | Yes, shared with lists | No as `step` is not defined in `incremental_sync` | 2 | With other streams (lists), not within stream as `step` not defined |
# Note: As of 2024-11-11, `metrics`, `lists` and `lists_detailed` are not supported by the Concurrent CDK as they do client side-filtering.

# Based on the above, the only threads that allow for slicing and hence might perform more concurrent HTTP requests are `events` and `events_detailed`. There are no slicing for the others and hence the concurrency is limited by the number of streams querying the same endpoint. Given that the event endpoint is XL, we will set a default concurrency to 10.
concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config.get('num_workers', 10) }}"
max_concurrency: 50
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,48 @@


import sys
import time
import traceback
from typing import List

from airbyte_cdk.entrypoint import launch
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteMessageSerializer, AirbyteTraceMessage, TraceType, Type
from orjson import orjson
from source_klaviyo import SourceKlaviyo


def run():
source = SourceKlaviyo()
launch(source, sys.argv[1:])
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceKlaviyo(
SourceKlaviyo.read_catalog(catalog_path) if catalog_path else None,
SourceKlaviyo.read_config(config_path) if config_path else None,
SourceKlaviyo.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=time.time_ns() // 1_000_000,
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
raise


def run() -> None:
args = sys.argv[1:]
source = _get_source(args)
launch(source, args)
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
#


from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional

from airbyte_cdk import TState
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from source_klaviyo.streams import Campaigns, CampaignsDetailed, Flows


class SourceKlaviyo(YamlDeclarativeSource):
def __init__(self) -> None:
super().__init__(**{"path_to_yaml": "manifest.yaml"})
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Expand All @@ -31,6 +33,3 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
]
)
return streams

def continue_sync_on_stream_failure(self) -> bool:
return True
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Optional
from unittest import TestCase

from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest
Expand All @@ -15,7 +16,6 @@
create_response_builder,
find_template,
)
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode
from integration.config import KlaviyoConfigBuilder
from source_klaviyo import SourceKlaviyo

Expand Down Expand Up @@ -66,7 +66,7 @@ def _read(
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(SourceKlaviyo(), config, catalog, state, expecting_exception)
return read(SourceKlaviyo(catalog, config, state), config, catalog, state, expecting_exception)


class FullRefreshTest(TestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@

import pendulum
import pytest
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.state_builder import StateBuilder
from integration.config import KlaviyoConfigBuilder
from source_klaviyo.source import SourceKlaviyo

logger = logging.getLogger("airbyte")


def _source() -> SourceKlaviyo:
catalog = CatalogBuilder().build()
config = KlaviyoConfigBuilder().build()
state = StateBuilder().build()
return SourceKlaviyo(catalog, config, state)


@pytest.mark.parametrize(
("status_code", "is_connection_successful", "error_msg"),
(
Expand Down Expand Up @@ -39,7 +49,7 @@ def test_check_connection(requests_mock, status_code, is_connection_successful,
status_code=status_code,
json={"end": 1, "total": 1} if 200 >= status_code < 300 else {},
)
source = SourceKlaviyo()
source = _source()
success, error = source.check_connection(logger=logger, config={"api_key": "api_key"})
assert success is is_connection_successful
assert error == error_msg
Expand All @@ -48,14 +58,14 @@ def test_check_connection(requests_mock, status_code, is_connection_successful,
def test_check_connection_unexpected_error(requests_mock):
exception_info = "Something went wrong"
requests_mock.register_uri("GET", "https://a.klaviyo.com/api/metrics", exc=Exception(exception_info))
source = SourceKlaviyo()
source = _source()
success, error = source.check_connection(logger=logger, config={"api_key": "api_key"})
assert success is False
assert error == f"Unable to connect to stream metrics - {exception_info}"


def test_streams():
source = SourceKlaviyo()
source = _source()
config = {"api_key": "some_key", "start_date": pendulum.datetime(2020, 10, 10).isoformat()}
streams = source.streams(config)
expected_streams_number = 11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import math
import urllib.parse
from datetime import datetime, timedelta
from typing import Any, List, Mapping, Optional
Expand All @@ -16,7 +17,10 @@
from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.state_builder import StateBuilder
from dateutil.relativedelta import relativedelta
from integration.config import KlaviyoConfigBuilder
from pydantic import BaseModel
from source_klaviyo.availability_strategy import KlaviyoAvailabilityStrategy
from source_klaviyo.source import SourceKlaviyo
Expand All @@ -32,17 +36,16 @@
EVENTS_STREAM_STATE_DATE = (datetime.fromisoformat(EVENTS_STREAM_CONFIG_START_DATE) + relativedelta(years=1)).isoformat()
EVENTS_STREAM_TESTING_FREEZE_TIME = "2023-12-12 12:00:00"

def get_months_diff(provided_date: str) -> int:
def get_step_diff(provided_date: str) -> int:
"""
This function returns the difference in months between provided date and freeze time.
This function returns the difference in weeks between provided date and freeze time.
"""
provided_date = datetime.fromisoformat(provided_date).replace(tzinfo=None)
freeze_date = datetime.strptime(EVENTS_STREAM_TESTING_FREEZE_TIME, "%Y-%m-%d %H:%M:%S")
difference = relativedelta(freeze_date, provided_date)
return difference.years * 12 + difference.months
return (freeze_date - provided_date).days // 7

def get_stream_by_name(stream_name: str, config: Mapping[str, Any]) -> Stream:
source = SourceKlaviyo()
source = SourceKlaviyo(CatalogBuilder().build(), KlaviyoConfigBuilder().build(), StateBuilder().build())
matches_by_name = [stream_config for stream_config in source.streams(config) if stream_config.name == stream_name]
if not matches_by_name:
raise ValueError("Please provide a valid stream name.")
Expand Down Expand Up @@ -187,8 +190,9 @@ def generate_api_urls(start_date_str: str) -> list[(str, str)]:
start_date = datetime.fromisoformat(start_date_str)
current_date = datetime.now(start_date.tzinfo)
urls = []
step = relativedelta(days=7)
while start_date < current_date:
end_date = start_date + relativedelta(months=1) - timedelta(seconds=1)
end_date = start_date + step - timedelta(seconds=1)
if end_date > current_date:
end_date = current_date
start_date_str = start_date.strftime("%Y-%m-%dT%H:%M:%S") + start_date.strftime("%z")
Expand All @@ -204,7 +208,7 @@ def generate_api_urls(start_date_str: str) -> list[(str, str)]:
encoded_url = f"{base_url}?{encoded_query}"
dummy_record = {"attributes": {"datetime": start_date_str}, "datetime": start_date_str}
urls.append((encoded_url, dummy_record))
start_date = start_date + relativedelta(months=1)
start_date = start_date + step
return urls

def test_cursor_field_is_required(self):
Expand Down Expand Up @@ -293,20 +297,20 @@ def test_get_updated_state(self, config_start_date, current_cursor, latest_curso
(
(
# we pick the state
EVENTS_STREAM_CONFIG_START_DATE,
EVENTS_STREAM_STATE_DATE,
get_months_diff(EVENTS_STREAM_STATE_DATE) + 1 # adding last request
EVENTS_STREAM_CONFIG_START_DATE,
EVENTS_STREAM_STATE_DATE,
get_step_diff(EVENTS_STREAM_STATE_DATE) + 1 # adding last request
),
(
# we pick the config start date
EVENTS_STREAM_CONFIG_START_DATE,
None,
get_months_diff(EVENTS_STREAM_CONFIG_START_DATE) + 1 # adding last request
get_step_diff(EVENTS_STREAM_CONFIG_START_DATE) + 1 # adding last request
),
(
"",
"",
get_months_diff(EVENTS_STREAM_DEFAULT_START_DATE) + 1 # adding last request
get_step_diff(EVENTS_STREAM_DEFAULT_START_DATE) + 1 # adding last request
),
),
)
Expand Down
Loading

0 comments on commit cfcf6a2

Please sign in to comment.