From 7b4adceec3b000757eb5214bd539aa588c587638 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc Date: Thu, 4 Jan 2024 10:06:37 -0500 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#32871]=20adding=20integration=20tests?= =?UTF-8?q?=20for=20UpdatedCursorIncrementalS=E2=80=A6=20(#33597)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../unit_tests/integration/request_builder.py | 40 ++ .../test_application_fees_refunds.py | 518 ++++++++++++++++ .../integration/test_bank_accounts.py | 563 ++++++++++++++++++ .../integration/test_early_fraud_warnings.py | 342 +++++++++++ .../test_external_account_bank_accounts.py | 361 +++++++++++ .../test_external_account_cards.py | 366 ++++++++++++ .../integration/test_payment_methods.py | 347 +++++++++++ .../response/application_fees_refunds.json | 17 + .../resource/http/response/bank_accounts.json | 23 + .../customers_expand_data_source.json | 43 ++ .../http/response/external_account_cards.json | 34 ++ .../http/response/external_bank_accounts.json | 23 + .../http/response/payment_methods.json | 52 ++ .../response/radar_early_fraud_warnings.json | 16 + .../resource/http/response/refunds.json | 32 + 15 files changed, 2777 insertions(+) create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/application_fees_refunds.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/bank_accounts.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/customers_expand_data_source.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/refunds.json diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py index 56b475f8907b..1c8a8bc39bd7 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py @@ -13,10 +13,28 @@ class StripeRequestBuilder: def application_fees_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("application_fees", account_id, client_secret) + @classmethod + def application_fees_refunds_endpoint(cls, application_fee_id: str, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls(f"application_fees/{application_fee_id}/refunds", account_id, client_secret) + + @classmethod + def customers_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls("customers", account_id, client_secret) + + @classmethod + def customers_sources_endpoint(cls, customer_id: str, account_id: str, client_secret: str) -> "StripeRequestBuilder": + # FIXME this endpoint is not available in the documentation and stripe mentions explicitly that the sources API is deprecated + # (see https://stripe.com/docs/sources/customers and https://github.com/airbytehq/airbyte/issues/33714) + return cls(f"customers/{customer_id}/sources", account_id, client_secret) + @classmethod def events_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("events", account_id, client_secret) + @classmethod + def external_accounts_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls(f"accounts/{account_id}/external_accounts", account_id, client_secret) + @classmethod def issuing_authorizations_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("issuing/authorizations", account_id, client_secret) @@ -29,6 +47,14 @@ def issuing_cards_endpoint(cls, account_id: str, client_secret: str) -> "StripeR def issuing_transactions_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("issuing/transactions", account_id, client_secret) + @classmethod + def payment_methods_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls("payment_methods", account_id, client_secret) + + @classmethod + def radar_early_fraud_warnings_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls("radar/early_fraud_warnings", account_id, client_secret) + @classmethod def reviews_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("reviews", account_id, client_secret) @@ -45,8 +71,10 @@ def __init__(self, resource: str, account_id: str, client_secret: str) -> None: self._created_gte: Optional[datetime] = None self._created_lte: Optional[datetime] = None self._limit: Optional[int] = None + self._object: Optional[str] = None self._starting_after_id: Optional[str] = None self._types: List[str] = [] + self._expands: List[str] = [] def with_created_gte(self, created_gte: datetime) -> "StripeRequestBuilder": self._created_gte = created_gte @@ -60,6 +88,10 @@ def with_limit(self, limit: int) -> "StripeRequestBuilder": self._limit = limit return self + def with_object(self, object_name: str) -> "StripeRequestBuilder": + self._object = object_name + return self + def with_starting_after(self, starting_after_id: str) -> "StripeRequestBuilder": self._starting_after_id = starting_after_id return self @@ -72,6 +104,10 @@ def with_types(self, types: List[str]) -> "StripeRequestBuilder": self._types = types return self + def with_expands(self, expands: List[str]) -> "StripeRequestBuilder": + self._expands = expands + return self + def build(self) -> HttpRequest: query_params = {} if self._created_gte: @@ -84,6 +120,10 @@ def build(self) -> HttpRequest: query_params["starting_after"] = self._starting_after_id if self._types: query_params["types[]"] = self._types + if self._object: + query_params["object"] = self._object + if self._expands: + query_params["expand[]"] = self._expands if self._any_query_params: if query_params: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py new file mode 100644 index 000000000000..245e743636f7 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_application_fees_refunds.py @@ -0,0 +1,518 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["application_fee.refund.updated"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_REFUNDS_FIELD = FieldPath("refunds") +_STREAM_NAME = "application_fees_refunds" +_APPLICATION_FEES_TEMPLATE_NAME = "application_fees" +_REFUNDS_TEMPLATE_NAME = "application_fees_refunds" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _application_fees_request() -> StripeRequestBuilder: + return StripeRequestBuilder.application_fees_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _application_fees_refunds_request(application_fee_id: str) -> StripeRequestBuilder: + return StripeRequestBuilder.application_fees_refunds_endpoint(application_fee_id, _ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_application_fee() -> RecordBuilder: + return create_record_builder( + find_template(_APPLICATION_FEES_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _application_fees_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_APPLICATION_FEES_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _a_refund() -> RecordBuilder: + return create_record_builder( + find_template(_REFUNDS_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _refunds_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_REFUNDS_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_application_fees_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.application_fees_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _application_fees_response().with_record(_an_application_fee()).build() # there needs to be a record in the parent stream for the child to be available + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _as_dict(response_builder: HttpResponseBuilder) -> Dict[str, Any]: + return json.loads(response_builder.build().body) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +def _assert_not_available(output: EntrypointOutput) -> None: + # right now, no stream statuses means stream unavailable + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response() + .with_record( + _an_application_fee() + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_record(_a_refund()) + .with_record(_a_refund()) + ) + ) + ) + .with_record( + _an_application_fee() + .with_field(_REFUNDS_FIELD, _as_dict(_refunds_response().with_record(_a_refund()))) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_multiple_refunds_pages_when_read_then_query_pagination_on_child(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response() + .with_record( + _an_application_fee() + .with_id("parent_id") + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_pagination() + .with_record(_a_refund().with_id("latest_refund_id")) + ) + ) + ).build(), + ) + http_mocker.get( + # we do not use slice boundaries here because: + # * there should be no duplicates parents (application fees) returned by the stripe API as it is using cursor pagination + # * it is implicitly lower bounder by the parent creation + # * the upper boundary is not configurable and is always + _application_fees_refunds_request("parent_id").with_limit(100).with_starting_after("latest_refund_id").build(), + _refunds_response().with_record(_a_refund()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_multiple_application_fees_pages_when_read_then_query_pagination_on_parent(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response() + .with_pagination() + .with_record( + _an_application_fee() + .with_id("parent_id") + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_record(_a_refund()) + ) + ) + ).build(), + ) + http_mocker.get( + _application_fees_request().with_starting_after("parent_id").with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response() + .with_record( + _an_application_fee() + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_record(_a_refund()) + ) + ) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_parent_stream_without_refund_when_read_then_stream_is_unavailable(self, http_mocker: HttpMocker) -> None: + # events stream is not validated as application fees is validated first + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response().build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + _assert_not_available(output) + + @HttpMocker() + def test_given_slice_range_when_read_then_perform_multiple_requests(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=30) + slice_range = timedelta(days=20) + slice_datetime = start_date + slice_range + + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(start_date).with_created_lte(slice_datetime).with_limit(100).build(), + _application_fees_response().with_record( + _an_application_fee() + .with_field(_REFUNDS_FIELD, _as_dict(_refunds_response().with_record(_a_refund()))) + ).build(), + ) + http_mocker.get( + _application_fees_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response().with_record( + _an_application_fee() + .with_field(_REFUNDS_FIELD, _as_dict(_refunds_response().with_record(_a_refund()))) + ).build(), + ) + + output = self._read(_config().with_start_date(start_date).with_slice_range_in_days(slice_range.days)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_slice_range_and_refunds_pagination_when_read_then_do_not_slice_child(self, http_mocker: HttpMocker) -> None: + """ + This means that if the user attempt to configure the slice range, it will only apply on the parent stream + """ + start_date = _NOW - timedelta(days=30) + slice_range = timedelta(days=20) + slice_datetime = start_date + slice_range + + _given_events_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.application_fees_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _application_fees_response().build() + ) # catching subsequent slicing request that we don't really care for this test + http_mocker.get( + _application_fees_request().with_created_gte(start_date).with_created_lte(slice_datetime).with_limit(100).build(), + _application_fees_response().with_record( + _an_application_fee() + .with_id("parent_id") + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_pagination() + .with_record(_a_refund().with_id("latest_refund_id")) + ) + ) + ).build(), + ) + http_mocker.get( + # slice range is not applied here + _application_fees_refunds_request("parent_id").with_limit(100).with_starting_after("latest_refund_id").build(), + _refunds_response().with_record(_a_refund()).build(), + ) + + self._read(_config().with_start_date(start_date).with_slice_range_in_days(slice_range.days)) + + # request matched http_mocker + + @HttpMocker() + def test_given_no_state_when_read_then_return_ignore_lookback(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response().with_record(_an_application_fee()).build(), + ) + + self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + # request matched http_mocker + + @HttpMocker() + def test_given_one_page_when_read_then_cursor_field_is_set(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response() + .with_record( + _an_application_fee() + .with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_record(_a_refund()) + ) + ) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert output.records[0].record.data["updated"] == output.records[0].record.data["created"] + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _application_fees_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _application_fees_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _application_fees_response().with_record(_an_application_fee().with_field( + _REFUNDS_FIELD, + _as_dict( + _refunds_response() + .with_record(_a_refund()) + ) + )).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + request = _application_fees_request().with_any_query_params().build() + http_mocker.get( + request, + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_application_fees_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + cursor_value = int(_A_START_DATE.timestamp()) + 1 + http_mocker.get( + _application_fees_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _application_fees_response().with_record( + _an_application_fee() + .with_field(_REFUNDS_FIELD, _as_dict(_refunds_response().with_record(_a_refund().with_cursor(cursor_value)))) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_application_fees_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _a_refund().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_application_fees_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _a_refund().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_refund_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_application_fees_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_refund_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_refund_event()).with_record(self._a_refund_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # application fees endpoint + _given_application_fees_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_refund_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _a_refund_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _a_refund().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py new file mode 100644 index 000000000000..6d858eb398a6 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_bank_accounts.py @@ -0,0 +1,563 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["customer.source.created", "customer.source.expiring", "customer.source.updated", "customer.source.deleted"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_SOURCES_FIELD = FieldPath("sources") +_STREAM_NAME = "bank_accounts" +_CUSTOMERS_TEMPLATE_NAME = "customers_expand_data_source" +_BANK_ACCOUNTS_TEMPLATE_NAME = "bank_accounts" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +# FIXME expand[] is not documented anymore in stripe API doc (see https://github.com/airbytehq/airbyte/issues/33714) +_EXPANDS = ["data.sources"] +_OBJECT = "bank_account" +_NOT_A_BANK_ACCOUNT = RecordBuilder({"object": "NOT a bank account"}, None, None) +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _customers_request() -> StripeRequestBuilder: + return StripeRequestBuilder.customers_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _customers_sources_request(customer_id: str) -> StripeRequestBuilder: + return StripeRequestBuilder.customers_sources_endpoint(customer_id, _ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _a_customer() -> RecordBuilder: + return create_record_builder( + find_template(_CUSTOMERS_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _customers_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_CUSTOMERS_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _a_bank_account() -> RecordBuilder: + return create_record_builder( + find_template(_BANK_ACCOUNTS_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +def _bank_accounts_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_BANK_ACCOUNTS_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_customers_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.customers_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _customers_response().with_record(_a_customer()).build() # there needs to be a record in the parent stream for the child to be available + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _as_dict(response_builder: HttpResponseBuilder) -> Dict[str, Any]: + return json.loads(response_builder.build().body) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +def _assert_not_available(output: EntrypointOutput) -> None: + # right now, no stream statuses means stream unavailable + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_record( + _a_customer() + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_a_bank_account()) + .with_record(_a_bank_account()) + ) + ) + ) + .with_record( + _a_customer() + .with_field(_SOURCES_FIELD, _as_dict(_bank_accounts_response().with_record(_a_bank_account()))) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_source_is_not_bank_account_when_read_then_filter_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_record( + _a_customer() + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_NOT_A_BANK_ACCOUNT) + ) + ) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 0 + + @HttpMocker() + def test_given_multiple_bank_accounts_pages_when_read_then_query_pagination_on_child(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_record( + _a_customer() + .with_id("parent_id") + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_pagination() + .with_record(_a_bank_account().with_id("latest_bank_account_id")) + ) + ) + ).build(), + ) + http_mocker.get( + # we do not use slice boundaries here because: + # * there should be no duplicates parents (application fees) returned by the stripe API as it is using cursor pagination + # * it is implicitly lower bounder by the parent creation + # * the upper boundary is not configurable and is always + _customers_sources_request("parent_id").with_object(_OBJECT).with_limit(100).with_starting_after("latest_bank_account_id").build(), + _bank_accounts_response().with_record(_a_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_multiple_customers_pages_when_read_then_query_pagination_on_parent(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_pagination() + .with_record( + _a_customer() + .with_id("parent_id") + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_a_bank_account()) + ) + ) + ).build(), + ) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_starting_after("parent_id").with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_record( + _a_customer() + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_a_bank_account()) + ) + ) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_parent_stream_without_bank_accounts_when_read_then_stream_is_unavailable(self, http_mocker: HttpMocker) -> None: + # events stream is not validated as application fees is validated first + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response().build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + _assert_not_available(output) + + @HttpMocker() + def test_given_slice_range_when_read_then_perform_multiple_requests(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=30) + slice_range = timedelta(days=20) + slice_datetime = start_date + slice_range + + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(start_date).with_created_lte(slice_datetime).with_limit(100).build(), + _customers_response().with_record( + _a_customer() + .with_field(_SOURCES_FIELD, _as_dict(_bank_accounts_response().with_record(_a_bank_account()))) + ).build(), + ) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).build(), + _customers_response().with_record( + _a_customer() + .with_field(_SOURCES_FIELD, _as_dict(_bank_accounts_response().with_record(_a_bank_account()))) + ).build(), + ) + + output = self._read(_config().with_start_date(start_date).with_slice_range_in_days(slice_range.days)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_slice_range_and_bank_accounts_pagination_when_read_then_do_not_slice_child(self, http_mocker: HttpMocker) -> None: + """ + This means that if the user attempt to configure the slice range, it will only apply on the parent stream + """ + start_date = _NOW - timedelta(days=30) + slice_range = timedelta(days=20) + slice_datetime = start_date + slice_range + + _given_events_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.customers_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _customers_response().build() + ) # catching subsequent slicing request that we don't really care for this test + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(start_date).with_created_lte(slice_datetime).with_limit(100).build(), + _customers_response().with_record( + _a_customer() + .with_id("parent_id") + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_pagination() + .with_record(_a_bank_account().with_id("latest_bank_account_id")) + ) + ) + ).build(), + ) + http_mocker.get( + # slice range is not applied here + _customers_sources_request("parent_id").with_object(_OBJECT).with_limit(100).with_starting_after("latest_bank_account_id").build(), + _bank_accounts_response().with_record(_a_bank_account()).build(), + ) + + self._read(_config().with_start_date(start_date).with_slice_range_in_days(slice_range.days)) + + # request matched http_mocker + + @HttpMocker() + def test_given_no_state_when_read_then_return_ignore_lookback(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response().with_record(_a_customer()).build(), + ) + + self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + # request matched http_mocker + + @HttpMocker() + def test_given_one_page_when_read_then_cursor_field_is_set(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response() + .with_record( + _a_customer() + .with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_a_bank_account()) + ) + ) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert output.records[0].record.data["updated"] == int(_NOW.timestamp()) + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _customers_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _customers_response().with_record(_a_customer().with_field( + _SOURCES_FIELD, + _as_dict( + _bank_accounts_response() + .with_record(_a_bank_account()) + ) + )).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + request = _customers_request().with_any_query_params().build() + http_mocker.get( + request, + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_and_successful_sync_when_read_then_set_state_to_now(self, http_mocker: HttpMocker) -> None: + # If stripe takes some time to ingest the data, we should recommend to use a lookback window when syncing the bank_accounts stream + # to make sure that we don't lose data between the first and the second sync + _given_events_availability_check(http_mocker) + http_mocker.get( + _customers_request().with_expands(_EXPANDS).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(), + _customers_response().with_record( + _a_customer() + .with_field(_SOURCES_FIELD, _as_dict(_bank_accounts_response().with_record(_a_bank_account()))) + ).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": int(_NOW.timestamp())}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_customers_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _a_bank_account().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_customers_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _a_bank_account().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_bank_account_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_customers_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_bank_account_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_bank_account_event()).with_record(self._a_bank_account_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # customer endpoint + _given_customers_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_bank_account_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + @HttpMocker() + def test_given_source_is_not_bank_account_when_read_then_filter_record(self, http_mocker: HttpMocker) -> None: + _given_customers_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_field(_DATA_FIELD, _NOT_A_BANK_ACCOUNT.build()) + ).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 0 + + def _a_bank_account_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _a_bank_account().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py new file mode 100644 index 000000000000..7673ee72b14c --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py @@ -0,0 +1,342 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_STREAM_NAME = "early_fraud_warnings" +_ENDPOINT_TEMPLATE_NAME = "radar_early_fraud_warnings" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _early_fraud_warnings_request() -> StripeRequestBuilder: + return StripeRequestBuilder.radar_early_fraud_warnings_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_early_fraud_warning() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _early_fraud_warnings_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_early_fraud_warnings_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.radar_early_fraud_warnings_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _early_fraud_warnings_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_pagination().with_record(_an_early_fraud_warning().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _early_fraud_warnings_request().with_starting_after("last_record_id_from_first_page").with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == output.records[0].record.data["created"] + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + [a_response_with_status(500), _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + _early_fraud_warnings_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_early_fraud_warnings_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + cursor_value = int(_A_START_DATE.timestamp()) + 1 + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning().with_cursor(cursor_value)).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).with_record(self._an_early_fraud_warning_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # radar/early_fraud_warnings endpoint + _given_early_fraud_warnings_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_early_fraud_warning_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py new file mode 100644 index 000000000000..e872320e5f21 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py @@ -0,0 +1,361 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_OBJECT = "bank_account" +_STREAM_NAME = "external_account_bank_accounts" +_ENDPOINT_TEMPLATE_NAME = "external_bank_accounts" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _external_accounts_request() -> StripeRequestBuilder: + return StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_external_bank_account() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +def _external_bank_accounts_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_external_accounts_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _external_bank_accounts_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_pagination().with_record(_an_external_bank_account().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _external_accounts_request().with_starting_after("last_record_id_from_first_page").with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == int(_NOW.timestamp()) + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [a_response_with_status(500), _external_bank_accounts_response().with_record(_an_external_bank_account()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + _external_bank_accounts_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_external_accounts_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": int(_NOW.timestamp())}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_external_bank_account().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_object_is_not_back_account_when_read_then_filter_out(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + + _given_external_accounts_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().with_record( + _an_event().with_field(_DATA_FIELD, {"object": "not a bank account"}) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 0 + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_external_bank_account().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # external_accounts endpoint + _given_external_accounts_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_external_account_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_external_bank_account().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py new file mode 100644 index 000000000000..b5787ca4a2a2 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py @@ -0,0 +1,366 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_OBJECT = "card" +_STREAM_NAME = "external_account_cards" +_ENDPOINT_TEMPLATE_NAME = "external_account_cards" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _external_accounts_request() -> StripeRequestBuilder: + return StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_external_account_card() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +def _external_accounts_card_response() -> HttpResponseBuilder: + """ + WARNING: this response will not fully match the template as external accounts card are queried by ID and the field "url" is not updated + to match that (it is currently hardcoded to "/v1/accounts/acct_1032D82eZvKYlo2C/external_accounts"). As this has no impact on the + tests, we will leave it as is for now. + """ + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_external_accounts_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _external_accounts_card_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_pagination().with_record(_an_external_account_card().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _external_accounts_request().with_starting_after("last_record_id_from_first_page").with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == int(_NOW.timestamp()) + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [a_response_with_status(500), _external_accounts_card_response().with_record(_an_external_account_card()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + _external_accounts_card_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_external_accounts_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": int(_NOW.timestamp())}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_external_account_card().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_object_is_not_back_account_when_read_then_filter_out(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + + _given_external_accounts_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().with_record( + _an_event().with_field(_DATA_FIELD, {"object": "not a card"}) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 0 + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_external_account_card().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # external_accounts endpoint + _given_external_accounts_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_external_account_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_external_account_card().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py new file mode 100644 index 000000000000..d0b3f6f7b4bd --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py @@ -0,0 +1,347 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = [ + "payment_method.attached", + "payment_method.automatically_updated", + "payment_method.detached", + "payment_method.updated", +] + +_DATA_FIELD = NestedPath(["data", "object"]) +_STREAM_NAME = "payment_methods" +_ENDPOINT_TEMPLATE_NAME = "payment_methods" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _payment_methods_request() -> StripeRequestBuilder: + return StripeRequestBuilder.payment_methods_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _a_payment_method() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _payment_methods_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_payment_methods_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.payment_methods_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _payment_methods_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_pagination().with_record(_a_payment_method().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _payment_methods_request().with_starting_after("last_record_id_from_first_page").with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == output.records[0].record.data["created"] + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _payment_methods_response().with_record(_a_payment_method()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + [a_response_with_status(500), _payment_methods_response().with_record(_a_payment_method()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + _payment_methods_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_payment_methods_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + cursor_value = int(_A_START_DATE.timestamp()) + 1 + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method().with_cursor(cursor_value)).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _a_payment_method().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _a_payment_method().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).with_record(self._a_payment_method_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # payment_methods endpoint + _given_payment_methods_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _a_payment_method_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _a_payment_method().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/application_fees_refunds.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/application_fees_refunds.json new file mode 100644 index 000000000000..47eacf5fad9f --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/application_fees_refunds.json @@ -0,0 +1,17 @@ +{ + "object": "list", + "url": "/v1/application_fees/fr_1MtJRpKbnvuxQXGuM6Ww0D24/refunds", + "has_more": false, + "data": [ + { + "id": "fr_1MtJRpKbnvuxQXGuM6Ww0D24", + "object": "fee_refund", + "amount": 100, + "balance_transaction": null, + "created": 1680651573, + "currency": "usd", + "fee": "fee_1B73DOKbnvuxQXGuhY8Aw0TN", + "metadata": {} + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/bank_accounts.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/bank_accounts.json new file mode 100644 index 000000000000..bad75c218964 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/bank_accounts.json @@ -0,0 +1,23 @@ +{ + "object": "list", + "url": "/v1/customers/cus_9s6XI9OFIdpjIg/bank_accounts", + "has_more": false, + "data": [ + { + "id": "ba_1MvoIJ2eZvKYlo2CO9f0MabO", + "object": "bank_account", + "account_holder_name": "Jane Austen", + "account_holder_type": "company", + "account_type": null, + "bank_name": "STRIPE TEST BANK", + "country": "US", + "currency": "usd", + "customer": "cus_9s6XI9OFIdpjIg", + "fingerprint": "1JWtPxqbdX5Gamtc", + "last4": "6789", + "metadata": {}, + "routing_number": "110000000", + "status": "new" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/customers_expand_data_source.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/customers_expand_data_source.json new file mode 100644 index 000000000000..182aa0e44e9f --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/customers_expand_data_source.json @@ -0,0 +1,43 @@ +{ + "object": "list", + "url": "/v1/customers", + "has_more": false, + "data": [ + { + "id": "cus_NffrFeUfNV2Hib", + "object": "customer", + "address": null, + "balance": 0, + "created": 1680893993, + "currency": null, + "default_source": null, + "delinquent": false, + "description": null, + "discount": null, + "email": "jennyrosen@example.com", + "invoice_prefix": "0759376C", + "invoice_settings": { + "custom_fields": null, + "default_payment_method": null, + "footer": null, + "rendering_options": null + }, + "livemode": false, + "metadata": {}, + "name": "Jenny Rosen", + "next_invoice_sequence": 1, + "phone": null, + "preferred_locales": [], + "shipping": null, + "sources": { + "object": "list", + "data": [], + "has_more": false, + "total_count": 0, + "url": "/v1/customers/cus_NffrFeUfNV2Hib/sources" + }, + "tax_exempt": "none", + "test_clock": null + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json new file mode 100644 index 000000000000..c26bc36461cd --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json @@ -0,0 +1,34 @@ +{ + "object": "list", + "url": "/v1/accounts/acct_1032D82eZvKYlo2C/external_accounts", + "has_more": false, + "data": [ + { + "id": "card_1NAz2x2eZvKYlo2C75wJ1YUs", + "object": "card", + "address_city": null, + "address_country": null, + "address_line1": null, + "address_line1_check": null, + "address_line2": null, + "address_state": null, + "address_zip": null, + "address_zip_check": null, + "brand": "Visa", + "country": "US", + "cvc_check": "pass", + "dynamic_last4": null, + "exp_month": 8, + "exp_year": 2024, + "fingerprint": "Xt5EWLLDS7FJjR1c", + "funding": "credit", + "last4": "4242", + "metadata": {}, + "name": null, + "redaction": null, + "tokenization_method": null, + "wallet": null, + "account": "acct_1032D82eZvKYlo2C" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json new file mode 100644 index 000000000000..a8704270de2f --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json @@ -0,0 +1,23 @@ +{ + "object": "list", + "url": "/v1/accounts/acct_1032D82eZvKYlo2C/external_accounts", + "has_more": false, + "data": [ + { + "id": "ba_1NB1IV2eZvKYlo2CByiLrMWv", + "object": "bank_account", + "account_holder_name": "Jane Austen", + "account_holder_type": "company", + "account_type": null, + "bank_name": "STRIPE TEST BANK", + "country": "US", + "currency": "usd", + "fingerprint": "1JWtPxqbdX5Gamtc", + "last4": "6789", + "metadata": {}, + "routing_number": "110000000", + "status": "new", + "account": "acct_1032D82eZvKYlo2C" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json new file mode 100644 index 000000000000..59ced3939e62 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json @@ -0,0 +1,52 @@ +{ + "object": "list", + "url": "/v1/payment_methods", + "has_more": false, + "data": [ + { + "id": "pm_1NO6mA2eZvKYlo2CEydeHsKT", + "object": "payment_method", + "billing_details": { + "address": { + "city": null, + "country": null, + "line1": null, + "line2": null, + "postal_code": null, + "state": null + }, + "email": null, + "name": null, + "phone": null + }, + "card": { + "brand": "visa", + "checks": { + "address_line1_check": null, + "address_postal_code_check": null, + "cvc_check": "unchecked" + }, + "country": "US", + "exp_month": 8, + "exp_year": 2024, + "fingerprint": "Xt5EWLLDS7FJjR1c", + "funding": "credit", + "generated_from": null, + "last4": "4242", + "networks": { + "available": ["visa"], + "preferred": null + }, + "three_d_secure_usage": { + "supported": true + }, + "wallet": null + }, + "created": 1687991030, + "customer": "cus_9s6XKzkNRiz8i3", + "livemode": false, + "metadata": {}, + "type": "card" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json new file mode 100644 index 000000000000..8da264f5b475 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json @@ -0,0 +1,16 @@ +{ + "object": "list", + "url": "/v1/radar/early_fraud_warnings", + "has_more": false, + "data": [ + { + "id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ", + "object": "radar.early_fraud_warning", + "actionable": true, + "charge": "ch_1234", + "created": 123456789, + "fraud_type": "misc", + "livemode": false + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/refunds.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/refunds.json new file mode 100644 index 000000000000..af20ee7480d3 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/refunds.json @@ -0,0 +1,32 @@ +{ + "object": "list", + "url": "/v1/refunds", + "has_more": false, + "data": [ + { + "id": "re_1Nispe2eZvKYlo2Cd31jOCgZ", + "object": "refund", + "amount": 1000, + "balance_transaction": "txn_1Nispe2eZvKYlo2CYezqFhEx", + "charge": "ch_1NirD82eZvKYlo2CIvbtLWuY", + "created": 1692942318, + "currency": "usd", + "destination_details": { + "card": { + "reference": "123456789012", + "reference_status": "available", + "reference_type": "acquirer_reference_number", + "type": "refund" + }, + "type": "card" + }, + "metadata": {}, + "payment_intent": "pi_1GszsK2eZvKYlo2CfhZyoZLp", + "reason": null, + "receipt_number": null, + "source_transfer_reversal": null, + "status": "succeeded", + "transfer_reversal": null + } + ] +}