Skip to content

Commit

Permalink
Merge branch 'master' into feat/databricks-normalize-brigad
Browse files Browse the repository at this point in the history
  • Loading branch information
shrodingers authored Aug 17, 2023
2 parents 58cf9ca + f05c032 commit 7520701
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=3.1.6
LABEL io.airbyte.version=3.1.7
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"redshift_result_3.csv": "2022-05-26T09:55:15.000000Z",
"redshift_result_2.csv": "2022-05-26T09:55:16.000000Z"
},
"_ab_source_file_last_modified": "2999-01-01T00:00:00.000000Z"
"_ab_source_file_last_modified": "2999-01-01T00:00:00.000000Z_redshift_result_2.csv"
},
"stream_descriptor": { "name": "test" }
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 3.1.6
dockerImageTag: 3.1.7
dockerRepository: airbyte/source-s3
githubIssueLabel: source-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timedelta
from typing import Any, MutableMapping

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamState
Expand All @@ -16,6 +17,11 @@ class Cursor(DefaultFileBasedCursor):
_V4_MIGRATION_BUFFER = timedelta(hours=1)
_V3_MIN_SYNC_DATE_FIELD = "v3_min_sync_date"

def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
super().__init__(stream_config)
self._running_migration = False
self._v3_migration_start_datetime = None

def set_initial_state(self, value: StreamState) -> None:
if self._is_legacy_state(value):
self._running_migration = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Iterable, List, Optional, Set

import boto3.session
import pytz
import smart_open
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
Expand Down Expand Up @@ -131,7 +132,7 @@ def _page(
for file in response["Contents"]:
if self._is_folder(file):
continue
remote_file = RemoteFile(uri=file["Key"], last_modified=file["LastModified"])
remote_file = RemoteFile(uri=file["Key"], last_modified=file["LastModified"].astimezone(pytz.utc).replace(tzinfo=None))
if self.file_matches_globs(remote_file, globs) and remote_file.uri not in seen:
seen.add(remote_file.uri)
yield remote_file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=2.1.2
LABEL io.airbyte.version=2.1.3
LABEL io.airbyte.name=airbyte/source-salesforce
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.1.2
dockerImageTag: 2.1.3
dockerRepository: airbyte/source-salesforce
githubIssueLabel: source-salesforce
icon: salesforce.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

import requests # type: ignore[import]
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_protocol.models import FailureType
from requests import adapters as request_adapters
from requests.exceptions import HTTPError, RequestException # type: ignore[import]

from .exceptions import TypeSalesforceException
from .exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING, TypeSalesforceException
from .rate_limiting import default_backoff_handler
from .utils import filter_streams_by_criteria

Expand Down Expand Up @@ -308,9 +310,13 @@ def login(self):
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
}

resp = self._make_request("POST", login_url, body=login_body, headers={"Content-Type": "application/x-www-form-urlencoded"})

try:
resp = self._make_request("POST", login_url, body=login_body, headers={"Content-Type": "application/x-www-form-urlencoded"})
except HTTPError as err:
if err.response.status_code == requests.codes.BAD_REQUEST:
if error_message := AUTHENTICATION_ERROR_MESSAGE_MAPPING.get(err.response.json().get("error_description")):
raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)
raise
auth = resp.json()
self.access_token = auth["access_token"]
self.instance_url = auth["instance_url"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@ class TypeSalesforceException(SalesforceException):
class TmpFileIOError(Error):
def __init__(self, msg: str, err: str = None):
self.logger.fatal(f"{msg}. Error: {err}")


AUTHENTICATION_ERROR_MESSAGE_MAPPING = {
"expired access/refresh token": "The authentication to SalesForce has expired. Re-authenticate to restore access to SalesForce."
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from conftest import encoding_symbols_parameters, generate_stream
from requests.exceptions import HTTPError
from source_salesforce.api import Salesforce
from source_salesforce.exceptions import AUTHENTICATION_ERROR_MESSAGE_MAPPING
from source_salesforce.source import SourceSalesforce
from source_salesforce.streams import (
CSV_FIELD_SIZE_LIMIT,
Expand All @@ -29,6 +30,29 @@
)


@pytest.mark.parametrize(
"login_status_code, login_json_resp, expected_error_msg, is_config_error",
[
(400, {"error": "invalid_grant", "error_description": "expired access/refresh token"}, AUTHENTICATION_ERROR_MESSAGE_MAPPING.get("expired access/refresh token"), True),
(400, {"error": "invalid_grant", "error_description": "Authentication failure."}, 'An error occurred: {"error": "invalid_grant", "error_description": "Authentication failure."}', False),
(401, {"error": "Unauthorized", "error_description": "Unautorized"}, 'An error occurred: {"error": "Unauthorized", "error_description": "Unautorized"}', False),
]
)
def test_login_authentication_error_handler(stream_config, requests_mock, login_status_code, login_json_resp, expected_error_msg, is_config_error):
source = SourceSalesforce()
logger = logging.getLogger("airbyte")
requests_mock.register_uri("POST", "https://login.salesforce.com/services/oauth2/token", json=login_json_resp, status_code=login_status_code)

if is_config_error:
with pytest.raises(AirbyteTracedException) as err:
source.check_connection(logger, stream_config)
assert err.value.message == expected_error_msg
else:
result, msg = source.check_connection(logger, stream_config)
assert result is False
assert msg == expected_error_msg


def test_bulk_sync_creation_failed(stream_config, stream_api):
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
with requests_mock.Mocker() as m:
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ Be cautious when raising this value too high, as it may result in Out Of Memory

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 3.1.7 | 2023-08-17 | [29505](https://github.com/airbytehq/airbyte/pull/29505) | v4 StreamReader and Cursor fixes |
| 3.1.6 | 2023-08-16 | [29480](https://github.com/airbytehq/airbyte/pull/29480) | update Pyarrow to version 12.0.1 |
| 3.1.5 | 2023-08-15 | [29418](https://github.com/airbytehq/airbyte/pull/29418) | Avoid duplicate syncs when migrating from v3 to v4 |
| 3.1.4 | 2023-08-15 | [29382](https://github.com/airbytehq/airbyte/pull/29382) | Handle legacy path prefix & path pattern |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/salesforce.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Now that you have set up the Salesforce source connector, check out the followin

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.3 | 2023-08-17 | [29500](https://github.com/airbytehq/airbyte/pull/29500) | handle expired refresh token error |
| 2.1.2 | 2023-08-10 | [28781](https://github.com/airbytehq/airbyte/pull/28781) | Fix pagination for BULK API jobs; Add option to force use BULK API |
| 2.1.1 | 2023-07-06 | [28021](https://github.com/airbytehq/airbyte/pull/28021) | Several Vulnerabilities Fixes; switched to use alpine instead of slim, CVE-2022-40897, CVE-2023-29383, CVE-2023-31484, CVE-2016-2781 |
| 2.1.0 | 2023-06-26 | [27726](https://github.com/airbytehq/airbyte/pull/27726) | License Update: Elv2 |
Expand Down

0 comments on commit 7520701

Please sign in to comment.