From f53ecac813c84a91351c970e57e3fc84169e71f4 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 18 Dec 2023 13:12:55 -0500 Subject: [PATCH] [ISSUE #32871] adding integration tests for UpdatedCursorIncrementalStripeStream empty streams --- .../unit_tests/integration/request_builder.py | 19 + .../integration/test_early_fraud_warnings.py | 342 +++++++++++++++++ .../test_external_account_bank_accounts.py | 361 ++++++++++++++++++ .../test_external_account_cards.py | 361 ++++++++++++++++++ .../integration/test_payment_methods.py | 347 +++++++++++++++++ .../http/response/external_account_cards.json | 34 ++ .../http/response/external_bank_accounts.json | 23 ++ .../http/response/payment_methods.json | 52 +++ .../response/radar_early_fraud_warnings.json | 16 + 9 files changed, 1555 insertions(+) create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json create mode 100644 airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py index 56b475f8907b..d221337fdd4a 100644 --- a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/request_builder.py @@ -17,6 +17,10 @@ def application_fees_endpoint(cls, account_id: str, client_secret: str) -> "Stri def events_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("events", account_id, client_secret) + @classmethod + def external_accounts_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls(f"accounts/{account_id}/external_accounts", account_id, client_secret) + @classmethod def issuing_authorizations_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("issuing/authorizations", account_id, client_secret) @@ -29,6 +33,14 @@ def issuing_cards_endpoint(cls, account_id: str, client_secret: str) -> "StripeR def issuing_transactions_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("issuing/transactions", account_id, client_secret) + @classmethod + def payment_methods_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls("payment_methods", account_id, client_secret) + + @classmethod + def radar_early_fraud_warnings_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": + return cls("radar/early_fraud_warnings", account_id, client_secret) + @classmethod def reviews_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder": return cls("reviews", account_id, client_secret) @@ -45,6 +57,7 @@ def __init__(self, resource: str, account_id: str, client_secret: str) -> None: self._created_gte: Optional[datetime] = None self._created_lte: Optional[datetime] = None self._limit: Optional[int] = None + self._object: Optional[str] = None self._starting_after_id: Optional[str] = None self._types: List[str] = [] @@ -60,6 +73,10 @@ def with_limit(self, limit: int) -> "StripeRequestBuilder": self._limit = limit return self + def with_object(self, object_name: str) -> "StripeRequestBuilder": + self._object = object_name + return self + def with_starting_after(self, starting_after_id: str) -> "StripeRequestBuilder": self._starting_after_id = starting_after_id return self @@ -84,6 +101,8 @@ def build(self) -> HttpRequest: query_params["starting_after"] = self._starting_after_id if self._types: query_params["types[]"] = self._types + if self._object: + query_params["object"] = self._object if self._any_query_params: if query_params: diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py new file mode 100644 index 000000000000..7673ee72b14c --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_early_fraud_warnings.py @@ -0,0 +1,342 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["radar.early_fraud_warning.created", "radar.early_fraud_warning.updated"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_STREAM_NAME = "early_fraud_warnings" +_ENDPOINT_TEMPLATE_NAME = "radar_early_fraud_warnings" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _early_fraud_warnings_request() -> StripeRequestBuilder: + return StripeRequestBuilder.radar_early_fraud_warnings_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_early_fraud_warning() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _early_fraud_warnings_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_early_fraud_warnings_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.radar_early_fraud_warnings_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _early_fraud_warnings_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_pagination().with_record(_an_early_fraud_warning().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _early_fraud_warnings_request().with_starting_after("last_record_id_from_first_page").with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == output.records[0].record.data["created"] + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + [a_response_with_status(500), _early_fraud_warnings_response().with_record(_an_early_fraud_warning()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _early_fraud_warnings_request().with_any_query_params().build(), + _early_fraud_warnings_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_early_fraud_warnings_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + cursor_value = int(_A_START_DATE.timestamp()) + 1 + http_mocker.get( + _early_fraud_warnings_request().with_limit(100).build(), + _early_fraud_warnings_response().with_record(_an_early_fraud_warning().with_cursor(cursor_value)).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_early_fraud_warnings_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).with_record(self._an_early_fraud_warning_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # radar/early_fraud_warnings endpoint + _given_early_fraud_warnings_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_early_fraud_warning_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_early_fraud_warning_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_early_fraud_warning().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py new file mode 100644 index 000000000000..e872320e5f21 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_bank_accounts.py @@ -0,0 +1,361 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_OBJECT = "bank_account" +_STREAM_NAME = "external_account_bank_accounts" +_ENDPOINT_TEMPLATE_NAME = "external_bank_accounts" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _external_accounts_request() -> StripeRequestBuilder: + return StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_external_bank_account() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +def _external_bank_accounts_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_external_accounts_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _external_bank_accounts_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_pagination().with_record(_an_external_bank_account().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _external_accounts_request().with_starting_after("last_record_id_from_first_page").with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == int(_NOW.timestamp()) + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [a_response_with_status(500), _external_bank_accounts_response().with_record(_an_external_bank_account()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + _external_bank_accounts_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_external_accounts_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_bank_accounts_response().with_record(_an_external_bank_account()).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": int(_NOW.timestamp())}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_external_bank_account().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_object_is_not_back_account_when_read_then_filter_out(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + + _given_external_accounts_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().with_record( + _an_event().with_field(_DATA_FIELD, {"object": "not a bank account"}) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 0 + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_external_bank_account().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # external_accounts endpoint + _given_external_accounts_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_external_account_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_external_bank_account().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py new file mode 100644 index 000000000000..dfa0cc2946f2 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_external_account_cards.py @@ -0,0 +1,361 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = ["account.external_account.created", "account.external_account.updated", "account.external_account.deleted"] + +_DATA_FIELD = NestedPath(["data", "object"]) +_OBJECT = "card" +_STREAM_NAME = "external_account_cards" +_ENDPOINT_TEMPLATE_NAME = "external_account_cards" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _external_accounts_request() -> StripeRequestBuilder: + return StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _an_external_account_card() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + ) + + +def _external_accounts_card_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_external_accounts_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.external_accounts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _external_accounts_card_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_pagination().with_record(_an_external_account_card().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _external_accounts_request().with_starting_after("last_record_id_from_first_page").with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == int(_NOW.timestamp()) + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + [a_response_with_status(500), _external_accounts_card_response().with_record(_an_external_account_card()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _external_accounts_request().with_any_query_params().build(), + _external_accounts_card_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_external_accounts_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _external_accounts_request().with_object(_OBJECT).with_limit(100).build(), + _external_accounts_card_response().with_record(_an_external_account_card()).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": int(_NOW.timestamp())}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _an_external_account_card().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_object_is_not_back_account_when_read_then_filter_out(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + + _given_external_accounts_availability_check(http_mocker) + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().with_record( + _an_event().with_field(_DATA_FIELD, {"object": "not a card"}) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 0 + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _an_external_account_card().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_external_accounts_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).with_record(self._an_external_account_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # external_accounts endpoint + _given_external_accounts_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._an_external_account_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _an_external_account_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _an_external_account_card().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py new file mode 100644 index 000000000000..d0b3f6f7b4bd --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/integration/test_payment_methods.py @@ -0,0 +1,347 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.response_builder import ( + FieldPath, + HttpResponseBuilder, + NestedPath, + RecordBuilder, + create_record_builder, + create_response_builder, + find_template, +) +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, FailureType, SyncMode +from integration.config import ConfigBuilder +from integration.pagination import StripePaginationStrategy +from integration.request_builder import StripeRequestBuilder +from integration.response_builder import a_response_with_status +from source_stripe import SourceStripe + +_EVENT_TYPES = [ + "payment_method.attached", + "payment_method.automatically_updated", + "payment_method.detached", + "payment_method.updated", +] + +_DATA_FIELD = NestedPath(["data", "object"]) +_STREAM_NAME = "payment_methods" +_ENDPOINT_TEMPLATE_NAME = "payment_methods" +_NOW = datetime.now(timezone.utc) +_A_START_DATE = _NOW - timedelta(days=60) +_ACCOUNT_ID = "account_id" +_CLIENT_SECRET = "client_secret" +_NO_STATE = {} +_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1) + + +def _payment_methods_request() -> StripeRequestBuilder: + return StripeRequestBuilder.payment_methods_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _events_request() -> StripeRequestBuilder: + return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET) + + +def _config() -> ConfigBuilder: + return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET) + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any]) -> SourceStripe: + return SourceStripe(catalog, config) + + +def _an_event() -> RecordBuilder: + return create_record_builder( + find_template("events", __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _events_response() -> HttpResponseBuilder: + return create_response_builder( + find_template("events", __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _a_payment_method() -> RecordBuilder: + return create_record_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + record_id_path=FieldPath("id"), + record_cursor_path=FieldPath("created"), + ) + + +def _payment_methods_response() -> HttpResponseBuilder: + return create_response_builder( + find_template(_ENDPOINT_TEMPLATE_NAME, __file__), + FieldPath("data"), + pagination_strategy=StripePaginationStrategy() + ) + + +def _given_payment_methods_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.payment_methods_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _payment_methods_response().build() + ) + + +def _given_events_availability_check(http_mocker: HttpMocker) -> None: + http_mocker.get( + StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build(), + _events_response().build() + ) + + +def _read( + config_builder: ConfigBuilder, + sync_mode: SyncMode, + state: Optional[Dict[str, Any]] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() + return read(_source(catalog, config), config, catalog, state, expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + @HttpMocker() + def test_given_one_page_when_read_then_return_record(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_many_pages_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_pagination().with_record(_a_payment_method().with_id("last_record_id_from_first_page")).build(), + ) + http_mocker.get( + _payment_methods_request().with_starting_after("last_record_id_from_first_page").with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE)) + + assert len(output.records) == 3 + + @HttpMocker() + def test_when_read_then_add_cursor_field(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method()).build(), + ) + + output = self._read(_config().with_start_date(_A_START_DATE).with_lookback_window_in_days(10)) + + assert output.records[0].record.data["updated"] == output.records[0].record.data["created"] + + @HttpMocker() + def test_given_http_status_400_when_read_then_stream_is_ignored(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(400), + ) + output = self._read(_config()) + assert len(output.get_stream_statuses(_STREAM_NAME)) == 0 + + @HttpMocker() + def test_given_http_status_401_when_read_then_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(401), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_rate_limited_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + [ + a_response_with_status(429), + _payment_methods_response().with_record(_a_payment_method()).build(), + ], + ) + output = self._read(_config().with_start_date(_A_START_DATE)) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_once_before_200_when_read_then_retry_and_return_records(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + [a_response_with_status(500), _payment_methods_response().with_record(_a_payment_method()).build()], + ) + output = self._read(_config()) + assert len(output.records) == 1 + + @HttpMocker() + def test_given_http_status_500_on_availability_when_read_then_raise_system_error(self, http_mocker: HttpMocker) -> None: + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + a_response_with_status(500), + ) + output = self._read(_config(), expecting_exception=True) + assert output.errors[-1].trace.error.failure_type == FailureType.system_error + + @HttpMocker() + def test_given_small_slice_range_when_read_then_availability_check_performs_too_many_queries(self, http_mocker: HttpMocker) -> None: + # see https://github.com/airbytehq/airbyte/issues/33499 + events_requests = StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_any_query_params().build() + http_mocker.get( + events_requests, + _events_response().build() # it is important that the event response does not have a record. This is not far fetched as this is what would happend 30 days before now + ) + http_mocker.get( + _payment_methods_request().with_any_query_params().build(), + _payment_methods_response().build(), + ) + + self._read(_config().with_start_date(_NOW - timedelta(days=60)).with_slice_range_in_days(1)) + + http_mocker.assert_number_of_calls(events_requests, 30) + + def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception) + + +@freezegun.freeze_time(_NOW.isoformat()) +class IncrementalTest(TestCase): + + @HttpMocker() + def test_given_no_state_when_read_then_use_payment_methods_endpoint(self, http_mocker: HttpMocker) -> None: + _given_events_availability_check(http_mocker) + cursor_value = int(_A_START_DATE.timestamp()) + 1 + http_mocker.get( + _payment_methods_request().with_limit(100).build(), + _payment_methods_response().with_record(_a_payment_method().with_cursor(cursor_value)).build(), + ) + output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE) + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None: + start_date = _NOW - timedelta(days=40) + state_datetime = _NOW - timedelta(days=5) + cursor_value = int(state_datetime.timestamp()) + 1 + + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record( + _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _a_payment_method().build()) + ).build(), + ) + + output = self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert output.most_recent_state == {_STREAM_NAME: {"updated": cursor_value}} + + @HttpMocker() + def test_given_state_and_pagination_when_read_then_return_records(self, http_mocker: HttpMocker) -> None: + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) + state_datetime = _NOW - timedelta(days=5) + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_pagination().with_record( + _an_event().with_id("last_record_id_from_first_page").with_field(_DATA_FIELD, _a_payment_method().build()) + ).build(), + ) + http_mocker.get( + _events_request().with_starting_after("last_record_id_from_first_page").with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + + output = self._read( + _config(), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 2 + + @HttpMocker() + def test_given_state_and_small_slice_range_when_read_then_perform_multiple_queries(self, http_mocker: HttpMocker) -> None: + state_datetime = _NOW - timedelta(days=5) + slice_range = timedelta(days=3) + slice_datetime = state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES + slice_range + + _given_payment_methods_availability_check(http_mocker) + _given_events_availability_check(http_mocker) # the availability check does not consider the state so we need to define a generic availability check + http_mocker.get( + _events_request().with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(slice_datetime).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + http_mocker.get( + _events_request().with_created_gte(slice_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).with_record(self._a_payment_method_event()).build(), + ) + + output = self._read( + _config().with_start_date(_NOW - timedelta(days=30)).with_slice_range_in_days(slice_range.days), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_datetime.timestamp())}).build(), + ) + + assert len(output.records) == 3 + + @HttpMocker() + def test_given_state_earlier_than_30_days_when_read_then_query_events_using_types_and_event_lower_boundary(self, http_mocker: HttpMocker) -> None: + # this seems odd as we would miss some data between start_date and events_lower_boundary. In that case, we should hit the + # payment_methods endpoint + _given_payment_methods_availability_check(http_mocker) + start_date = _NOW - timedelta(days=40) + state_value = _NOW - timedelta(days=39) + events_lower_boundary = _NOW - timedelta(days=30) + http_mocker.get( + _events_request().with_created_gte(events_lower_boundary).with_created_lte(_NOW).with_limit(100).with_types(_EVENT_TYPES).build(), + _events_response().with_record(self._a_payment_method_event()).build(), + ) + + self._read( + _config().with_start_date(start_date), + StateBuilder().with_stream_state(_STREAM_NAME, {"updated": int(state_value.timestamp())}).build(), + ) + + # request matched http_mocker + + def _a_payment_method_event(self) -> RecordBuilder: + return _an_event().with_field(_DATA_FIELD, _a_payment_method().build()) + + def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput: + return _read(config, SyncMode.incremental, state, expecting_exception) diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json new file mode 100644 index 000000000000..c26bc36461cd --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_account_cards.json @@ -0,0 +1,34 @@ +{ + "object": "list", + "url": "/v1/accounts/acct_1032D82eZvKYlo2C/external_accounts", + "has_more": false, + "data": [ + { + "id": "card_1NAz2x2eZvKYlo2C75wJ1YUs", + "object": "card", + "address_city": null, + "address_country": null, + "address_line1": null, + "address_line1_check": null, + "address_line2": null, + "address_state": null, + "address_zip": null, + "address_zip_check": null, + "brand": "Visa", + "country": "US", + "cvc_check": "pass", + "dynamic_last4": null, + "exp_month": 8, + "exp_year": 2024, + "fingerprint": "Xt5EWLLDS7FJjR1c", + "funding": "credit", + "last4": "4242", + "metadata": {}, + "name": null, + "redaction": null, + "tokenization_method": null, + "wallet": null, + "account": "acct_1032D82eZvKYlo2C" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json new file mode 100644 index 000000000000..a8704270de2f --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/external_bank_accounts.json @@ -0,0 +1,23 @@ +{ + "object": "list", + "url": "/v1/accounts/acct_1032D82eZvKYlo2C/external_accounts", + "has_more": false, + "data": [ + { + "id": "ba_1NB1IV2eZvKYlo2CByiLrMWv", + "object": "bank_account", + "account_holder_name": "Jane Austen", + "account_holder_type": "company", + "account_type": null, + "bank_name": "STRIPE TEST BANK", + "country": "US", + "currency": "usd", + "fingerprint": "1JWtPxqbdX5Gamtc", + "last4": "6789", + "metadata": {}, + "routing_number": "110000000", + "status": "new", + "account": "acct_1032D82eZvKYlo2C" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json new file mode 100644 index 000000000000..59ced3939e62 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/payment_methods.json @@ -0,0 +1,52 @@ +{ + "object": "list", + "url": "/v1/payment_methods", + "has_more": false, + "data": [ + { + "id": "pm_1NO6mA2eZvKYlo2CEydeHsKT", + "object": "payment_method", + "billing_details": { + "address": { + "city": null, + "country": null, + "line1": null, + "line2": null, + "postal_code": null, + "state": null + }, + "email": null, + "name": null, + "phone": null + }, + "card": { + "brand": "visa", + "checks": { + "address_line1_check": null, + "address_postal_code_check": null, + "cvc_check": "unchecked" + }, + "country": "US", + "exp_month": 8, + "exp_year": 2024, + "fingerprint": "Xt5EWLLDS7FJjR1c", + "funding": "credit", + "generated_from": null, + "last4": "4242", + "networks": { + "available": ["visa"], + "preferred": null + }, + "three_d_secure_usage": { + "supported": true + }, + "wallet": null + }, + "created": 1687991030, + "customer": "cus_9s6XKzkNRiz8i3", + "livemode": false, + "metadata": {}, + "type": "card" + } + ] +} diff --git a/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json new file mode 100644 index 000000000000..8da264f5b475 --- /dev/null +++ b/airbyte-integrations/connectors/source-stripe/unit_tests/resource/http/response/radar_early_fraud_warnings.json @@ -0,0 +1,16 @@ +{ + "object": "list", + "url": "/v1/radar/early_fraud_warnings", + "has_more": false, + "data": [ + { + "id": "issfr_1NnrwHBw2dPENLoi9lnhV3RQ", + "object": "radar.early_fraud_warning", + "actionable": true, + "charge": "ch_1234", + "created": 123456789, + "fraud_type": "misc", + "livemode": false + } + ] +}