From 0f1eeb106dd7edc6a202056bebffb82231b17ba7 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:45:22 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Salesforce:=20Add=20ret?= =?UTF-8?q?ry=20on=20REST=20API=20(#36885)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source-salesforce/metadata.yaml | 2 +- .../source-salesforce/pyproject.toml | 2 +- .../source_salesforce/streams.py | 1 + .../unit_tests/config_builder.py | 39 ++++++++ .../unit_tests/integration/__init__.py | 0 .../test_rest_salesforce_stream.py | 92 +++++++++++++++++++ docs/integrations/sources/salesforce.md | 3 +- 7 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 airbyte-integrations/connectors/source-salesforce/unit_tests/config_builder.py create mode 100644 airbyte-integrations/connectors/source-salesforce/unit_tests/integration/__init__.py create mode 100644 airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_rest_salesforce_stream.py diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index a24e9c7b1072..37119483c88c 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.4.2 + dockerImageTag: 2.4.3 dockerRepository: airbyte/source-salesforce documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce githubIssueLabel: source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/pyproject.toml b/airbyte-integrations/connectors/source-salesforce/pyproject.toml index 5613076d9388..61e2ff31b1d8 100644 --- a/airbyte-integrations/connectors/source-salesforce/pyproject.toml +++ b/airbyte-integrations/connectors/source-salesforce/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.4.2" +version = "2.4.3" name = "source-salesforce" description = "Source implementation for Salesforce." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 6d94b9215efd..bf58a95602a1 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -296,6 +296,7 @@ def _read_pages( # Always return an empty generator just in case no records were ever yielded yield from [] + @default_backoff_handler(max_tries=5, backoff_method=backoff.constant, backoff_params={"interval": 5}) def _fetch_next_page_for_chunk( self, stream_slice: Mapping[str, Any] = None, diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/config_builder.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/config_builder.py new file mode 100644 index 000000000000..7807f80744af --- /dev/null +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/config_builder.py @@ -0,0 +1,39 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from datetime import datetime +from typing import Any, Mapping + + +class ConfigBuilder: + def __init__(self) -> None: + self._config = { + "client_id": "fake_client_id", + "client_secret": "fake_client_secret", + "refresh_token": "fake_refresh_token", + "start_date": "2010-01-18T21:18:20Z", + "is_sandbox": False, + "wait_timeout": 15, + } + + def start_date(self, start_date: datetime) -> "ConfigBuilder": + self._config["start_date"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") + return self + + def stream_slice_step(self, stream_slice_step: str) -> "ConfigBuilder": + self._config["stream_slice_step"] = stream_slice_step + return self + + def client_id(self, client_id: str) -> "ConfigBuilder": + self._config["client_id"] = client_id + return self + + def client_secret(self, client_secret: str) -> "ConfigBuilder": + self._config["client_secret"] = client_secret + return self + + def refresh_token(self, refresh_token: str) -> "ConfigBuilder": + self._config["refresh_token"] = refresh_token + return self + + def build(self) -> Mapping[str, Any]: + return self._config diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/integration/__init__.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/integration/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_rest_salesforce_stream.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_rest_salesforce_stream.py new file mode 100644 index 000000000000..fe2c4e6db156 --- /dev/null +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/integration/test_rest_salesforce_stream.py @@ -0,0 +1,92 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +import json +from datetime import datetime, timezone +from typing import Any, Dict, Optional +from unittest import TestCase + +import freezegun +from airbyte_cdk.sources.source import TState +from airbyte_cdk.test.catalog_builder import CatalogBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse +from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS +from airbyte_cdk.test.state_builder import StateBuilder +from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode +from config_builder import ConfigBuilder +from source_salesforce import SourceSalesforce +from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS + +_A_FIELD_NAME = "a_field" +_ACCESS_TOKEN = "an_access_token" +_API_VERSION = "v57.0" +_CLIENT_ID = "a_client_id" +_CLIENT_SECRET = "a_client_secret" +_INSTANCE_URL = "https://instance.salesforce.com" +_NOW = datetime.now(timezone.utc) +_REFRESH_TOKEN = "a_refresh_token" +_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0] + + +def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: + return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build() + + +def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceSalesforce: + return SourceSalesforce(catalog, config, state) + + +def _read( + sync_mode: SyncMode, + config_builder: Optional[ConfigBuilder] = None, + expecting_exception: bool = False +) -> EntrypointOutput: + catalog = _catalog(sync_mode) + config = config_builder.build() if config_builder else ConfigBuilder().build() + state = StateBuilder().build() + return read(_source(catalog, config, state), config, catalog, state, expecting_exception) + + +def _given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str) -> None: + http_mocker.post( + HttpRequest( + "https://login.salesforce.com/services/oauth2/token", + query_params=ANY_QUERY_PARAMS, + body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}" + ), + HttpResponse(json.dumps({"access_token": _ACCESS_TOKEN, "instance_url": _INSTANCE_URL})), + ) + + +def _given_stream(http_mocker: HttpMocker, stream_name: str, field_name: str) -> None: + http_mocker.get( + HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects"), + HttpResponse(json.dumps({"sobjects": [{"name": stream_name, "queryable": True}]})), + ) + http_mocker.get( + HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects/AcceptedEventRelation/describe"), + HttpResponse(json.dumps({"fields": [{"name": field_name, "type": "string"}]})), + ) + + +@freezegun.freeze_time(_NOW.isoformat()) +class FullRefreshTest(TestCase): + + def setUp(self) -> None: + self._config = ConfigBuilder().client_id(_CLIENT_ID).client_secret(_CLIENT_SECRET).refresh_token(_REFRESH_TOKEN) + + @HttpMocker() + def test_given_error_on_fetch_chunk_when_read_then_retry(self, http_mocker: HttpMocker) -> None: + _given_authentication(http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN) + _given_stream(http_mocker, _STREAM_NAME, _A_FIELD_NAME) + http_mocker.get( + HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/queryAll?q=SELECT+{_A_FIELD_NAME}+FROM+{_STREAM_NAME}+"), + [ + HttpResponse("", status_code=406), + HttpResponse(json.dumps({"records": [{"a_field": "a_value"}]})), + ] + ) + + output = _read(SyncMode.full_refresh, self._config) + + assert len(output.records) == 1 diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index 60c2229f09b3..7a417d9c2647 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| -| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams | +| 2.4.3 | 2024-04-08 | [36885](https://github.com/airbytehq/airbyte/pull/36885) | Add missing retry on REST API | +| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams | | 2.4.1 | 2024-04-03 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests and jobs on various cases | | 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state | | 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) |