Skip to content

Commit

Permalink
🐛 Source Salesforce: Add retry on REST API (#36885)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Apr 8, 2024
1 parent b753ade commit 0f1eeb1
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.4.2
dockerImageTag: 2.4.3
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.2"
version = "2.4.3"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def _read_pages(
# Always return an empty generator just in case no records were ever yielded
yield from []

@default_backoff_handler(max_tries=5, backoff_method=backoff.constant, backoff_params={"interval": 5})
def _fetch_next_page_for_chunk(
self,
stream_slice: Mapping[str, Any] = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from datetime import datetime
from typing import Any, Mapping


class ConfigBuilder:
def __init__(self) -> None:
self._config = {
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
"refresh_token": "fake_refresh_token",
"start_date": "2010-01-18T21:18:20Z",
"is_sandbox": False,
"wait_timeout": 15,
}

def start_date(self, start_date: datetime) -> "ConfigBuilder":
self._config["start_date"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
return self

def stream_slice_step(self, stream_slice_step: str) -> "ConfigBuilder":
self._config["stream_slice_step"] = stream_slice_step
return self

def client_id(self, client_id: str) -> "ConfigBuilder":
self._config["client_id"] = client_id
return self

def client_secret(self, client_secret: str) -> "ConfigBuilder":
self._config["client_secret"] = client_secret
return self

def refresh_token(self, refresh_token: str) -> "ConfigBuilder":
self._config["refresh_token"] = refresh_token
return self

def build(self) -> Mapping[str, Any]:
return self._config
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import json
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.test.mock_http.request import ANY_QUERY_PARAMS
from airbyte_cdk.test.state_builder import StateBuilder
from airbyte_protocol.models import ConfiguredAirbyteCatalog, SyncMode
from config_builder import ConfigBuilder
from source_salesforce import SourceSalesforce
from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS

_A_FIELD_NAME = "a_field"
_ACCESS_TOKEN = "an_access_token"
_API_VERSION = "v57.0"
_CLIENT_ID = "a_client_id"
_CLIENT_SECRET = "a_client_secret"
_INSTANCE_URL = "https://instance.salesforce.com"
_NOW = datetime.now(timezone.utc)
_REFRESH_TOKEN = "a_refresh_token"
_STREAM_NAME = UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS[0]


def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceSalesforce:
return SourceSalesforce(catalog, config, state)


def _read(
sync_mode: SyncMode,
config_builder: Optional[ConfigBuilder] = None,
expecting_exception: bool = False
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build() if config_builder else ConfigBuilder().build()
state = StateBuilder().build()
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


def _given_authentication(http_mocker: HttpMocker, client_id: str, client_secret: str, refresh_token: str) -> None:
http_mocker.post(
HttpRequest(
"https://login.salesforce.com/services/oauth2/token",
query_params=ANY_QUERY_PARAMS,
body=f"grant_type=refresh_token&client_id={client_id}&client_secret={client_secret}&refresh_token={refresh_token}"
),
HttpResponse(json.dumps({"access_token": _ACCESS_TOKEN, "instance_url": _INSTANCE_URL})),
)


def _given_stream(http_mocker: HttpMocker, stream_name: str, field_name: str) -> None:
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects"),
HttpResponse(json.dumps({"sobjects": [{"name": stream_name, "queryable": True}]})),
)
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/sobjects/AcceptedEventRelation/describe"),
HttpResponse(json.dumps({"fields": [{"name": field_name, "type": "string"}]})),
)


@freezegun.freeze_time(_NOW.isoformat())
class FullRefreshTest(TestCase):

def setUp(self) -> None:
self._config = ConfigBuilder().client_id(_CLIENT_ID).client_secret(_CLIENT_SECRET).refresh_token(_REFRESH_TOKEN)

@HttpMocker()
def test_given_error_on_fetch_chunk_when_read_then_retry(self, http_mocker: HttpMocker) -> None:
_given_authentication(http_mocker, _CLIENT_ID, _CLIENT_SECRET, _REFRESH_TOKEN)
_given_stream(http_mocker, _STREAM_NAME, _A_FIELD_NAME)
http_mocker.get(
HttpRequest(f"{_INSTANCE_URL}/services/data/{_API_VERSION}/queryAll?q=SELECT+{_A_FIELD_NAME}+FROM+{_STREAM_NAME}+"),
[
HttpResponse("", status_code=406),
HttpResponse(json.dumps({"records": [{"a_field": "a_value"}]})),
]
)

output = _read(SyncMode.full_refresh, self._config)

assert len(output.records) == 1
3 changes: 2 additions & 1 deletion docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
| 2.4.3 | 2024-04-08 | [36885](https://github.com/airbytehq/airbyte/pull/36885) | Add missing retry on REST API |
| 2.4.2 | 2024-04-05 | [36862](https://github.com/airbytehq/airbyte/pull/36862) | Upgrade CDK for updated error messaging regarding missing streams |
| 2.4.1 | 2024-04-03 | [36385](https://github.com/airbytehq/airbyte/pull/36385) | Retry HTTP requests and jobs on various cases |
| 2.4.0 | 2024-03-12 | [35978](https://github.com/airbytehq/airbyte/pull/35978) | Upgrade CDK to start emitting record counts with state and full refresh state |
| 2.3.3 | 2024-03-04 | [35791](https://github.com/airbytehq/airbyte/pull/35791) | Fix memory leak (OOM) |
Expand Down

0 comments on commit 0f1eeb1

Please sign in to comment.