Skip to content

Commit

Permalink
🐛 Source File: handle parse errors when running discovery (#30579)
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d authored Sep 25, 2023
1 parent 29addb8 commit 809d86b
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 117 deletions.
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file-secure/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# If you need to add a custom logic to build your connector image, you can do it by adding a finalize_build.sh or finalize_build.py script in the connector folder.
# Please reach out to the Connectors Operations team if you have any question.
FROM airbyte/source-file:0.3.11
FROM airbyte/source-file:0.3.12

WORKDIR /airbyte/integration_code
COPY source_file_secure ./source_file_secure
Expand All @@ -18,5 +18,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.11
LABEL io.airbyte.version=0.3.12
LABEL io.airbyte.name=airbyte/source-file-secure
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
dockerRepository: airbyte/source-file-secure
githubIssueLabel: source-file
icon: file.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_file ./source_file
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.11
LABEL io.airbyte.version=0.3.12
LABEL io.airbyte.name=airbyte/source-file
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.utils import AirbyteTracedException
from source_file import SourceFile
from source_file.client import Client, ConfigurationError
from source_file.client import Client

SAMPLE_DIRECTORY = Path(__file__).resolve().parent.joinpath("sample_files/formats")

Expand Down Expand Up @@ -59,7 +60,7 @@ def test_raises_file_wrong_format(file_format, extension, wrong_format, filename
file_path = str(file_directory.joinpath(f"{filename}.{extension}"))
configs = {"dataset_name": "test", "format": wrong_format, "url": file_path, "provider": {"storage": "local"}}
client = Client(**configs)
with pytest.raises((TypeError, ValueError, ConfigurationError)):
with pytest.raises((TypeError, ValueError, AirbyteTracedException)):
list(client.read())


Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-file/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
dockerImageTag: 0.3.11
dockerImageTag: 0.3.12
dockerRepository: airbyte/source-file
githubIssueLabel: source-file
icon: file.svg
Expand All @@ -14,7 +14,7 @@ data:
registries:
cloud:
dockerRepository: airbyte/source-file-secure
dockerImageTag: 0.3.11 # Dont forget to publish source-file-secure as well when updating this.
dockerImageTag: 0.3.12 # Dont forget to publish source-file-secure as well when updating this.
enabled: true
oss:
enabled: true
Expand Down
24 changes: 8 additions & 16 deletions airbyte-integrations/connectors/source-file/source_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@
logging.getLogger("smart_open").setLevel(logging.ERROR)


class ConfigurationError(Exception):
"""Client mis-configured"""


class PermissionsError(Exception):
"""User don't have enough permissions"""


class URLFile:
"""Class to manage read from file located at different providers
Expand Down Expand Up @@ -211,7 +203,7 @@ def _open_gcs_url(self) -> object:
except json.decoder.JSONDecodeError as err:
error_msg = f"Failed to parse gcs service account json: {repr(err)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise ConfigurationError(error_msg) from err
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err

if credentials:
credentials = service_account.Credentials.from_service_account_info(credentials)
Expand Down Expand Up @@ -341,7 +333,7 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
except KeyError as err:
error_msg = f"Reader {self._reader_format} is not supported."
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise ConfigurationError(error_msg) from err
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err

reader_options = {**self._reader_options}
try:
Expand All @@ -367,13 +359,17 @@ def load_dataframes(self, fp, skip_data=False, read_sample_chunk: bool = False)
yield reader(fp, **reader_options)
else:
yield reader(fp, **reader_options)
except ParserError as err:
error_msg = f"File {fp} can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err
except UnicodeDecodeError as err:
error_msg = (
f"File {fp} can't be parsed with reader of chosen type ({self._reader_format}). "
f"Please check provided Format and Reader Options. {repr(err)}."
)
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise ConfigurationError(error_msg) from err
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err

@staticmethod
def dtype_to_json_type(current_type: str, dtype) -> str:
Expand Down Expand Up @@ -430,11 +426,7 @@ def read(self, fields: Iterable = None) -> Iterable[dict]:
f"File {fp} can not be opened due to connection issues on provider side. Please check provided links and options"
)
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise ConfigurationError(error_msg) from err
except ParserError as err:
error_msg = f"File {fp} can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
raise ConfigurationError(error_msg) from err
raise AirbyteTracedException(message=error_msg, internal_message=error_msg, failure_type=FailureType.config_error) from err

def _cache_stream(self, fp):
"""cache stream to file"""
Expand Down
24 changes: 12 additions & 12 deletions airbyte-integrations/connectors/source-file/source_file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
AirbyteMessage,
AirbyteRecordMessage,
ConfiguredAirbyteCatalog,
FailureType,
Status,
Type,
)
from airbyte_cdk.sources import Source
from airbyte_cdk.utils import AirbyteTracedException

from .client import Client, ConfigurationError
from .client import Client
from .utils import dropbox_force_download


Expand Down Expand Up @@ -85,41 +87,39 @@ def _validate_and_transform(config: Mapping[str, Any]):
try:
config["reader_options"] = json.loads(config["reader_options"])
if not isinstance(config["reader_options"], dict):
raise ConfigurationError(
message = (
"Field 'reader_options' is not a valid JSON object. "
"Please provide key-value pairs, See field description for examples."
)
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
except ValueError:
raise ConfigurationError("Field 'reader_options' is not valid JSON object. https://www.json.org/")
message = "Field 'reader_options' is not valid JSON object. https://www.json.org/"
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
else:
config["reader_options"] = {}
config["url"] = dropbox_force_download(config["url"])

parse_result = urlparse(config["url"])
if parse_result.netloc == "docs.google.com" and parse_result.path.lower().startswith("/spreadsheets/"):
raise ConfigurationError(f'Failed to load {config["url"]}: please use the Official Google Sheets Source connector')
message = f'Failed to load {config["url"]}: please use the Official Google Sheets Source connector'
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
return config

def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
"""
Check involves verifying that the specified file is reachable with
our credentials.
"""
try:
config = self._validate_and_transform(config)
except ConfigurationError as e:
logger.error(str(e))
return AirbyteConnectionStatus(status=Status.FAILED, message=str(e))

config = self._validate_and_transform(config)
client = self._get_client(config)
source_url = client.reader.full_url
try:
list(client.streams(empty_schema=True))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except (TypeError, ValueError, ConfigurationError) as err:
except (TypeError, ValueError, AirbyteTracedException) as err:
reason = f"Failed to load {source_url}. Please check File Format and Reader Options are set correctly."
logger.error(f"{reason}\n{repr(err)}")
return AirbyteConnectionStatus(status=Status.FAILED, message=reason)
raise AirbyteTracedException(message=reason, internal_message=reason, failure_type=FailureType.config_error)
except Exception as err:
reason = f"Failed to load {source_url}. You could have provided an invalid URL, please verify it: {repr(err)}."
logger.error(reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from unittest.mock import patch, sentinel

import pytest
from airbyte_cdk.utils import AirbyteTracedException
from pandas import read_csv, read_excel
from paramiko import SSHException
from source_file.client import Client, ConfigurationError, URLFile
from source_file.client import Client, URLFile
from urllib3.exceptions import ProtocolError


Expand Down Expand Up @@ -57,7 +58,7 @@ def test_load_dataframes(client, wrong_format_client, absolute_path, test_files)
expected = read_csv(f)
assert read_file.equals(expected)

with pytest.raises(ConfigurationError):
with pytest.raises(AirbyteTracedException):
next(wrong_format_client.load_dataframes(fp=f))

with pytest.raises(StopIteration):
Expand All @@ -66,7 +67,7 @@ def test_load_dataframes(client, wrong_format_client, absolute_path, test_files)

def test_raises_configuration_error_with_incorrect_file_type(csv_format_client, absolute_path, test_files):
f = f"{absolute_path}/{test_files}/archive_with_test_xlsx.zip"
with pytest.raises(ConfigurationError):
with pytest.raises(AirbyteTracedException):
next(csv_format_client.load_dataframes(fp=f))


Expand Down Expand Up @@ -139,7 +140,7 @@ def test_open_gcs_url():
assert URLFile(url="", provider=provider)._open_gcs_url()

provider.update({"service_account_json": '{service_account_json": "service_account_json"}'})
with pytest.raises(ConfigurationError):
with pytest.raises(AirbyteTracedException):
assert URLFile(url="", provider=provider)._open_gcs_url()


Expand All @@ -158,7 +159,7 @@ def test_read_network_issues(test_read_config):
test_read_config.update(format='excel')
client = Client(**test_read_config)
client.sleep_on_retry_sec = 0 # just for test
with patch.object(client, "_cache_stream", side_effect=ProtocolError), pytest.raises(ConfigurationError):
with patch.object(client, "_cache_stream", side_effect=ProtocolError), pytest.raises(AirbyteTracedException):
next(client.read(["date", "key"]))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import json
import logging
from copy import deepcopy
from unittest.mock import PropertyMock

import jsonschema
import pytest
Expand All @@ -21,7 +20,7 @@
SyncMode,
Type,
)
from source_file.client import ConfigurationError
from airbyte_cdk.utils import AirbyteTracedException
from source_file.source import SourceFile

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -133,9 +132,8 @@ def test_check_invalid_config(source, invalid_config):


def test_check_invalid_reader_options(source, invalid_reader_options_config):
expected = AirbyteConnectionStatus(status=Status.FAILED)
actual = source.check(logger=logger, config=invalid_reader_options_config)
assert actual.status == expected.status
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not a valid JSON object. Please provide key-value pairs"):
source.check(logger=logger, config=invalid_reader_options_config)


def test_discover_dropbox_link(source, config_dropbox_link):
Expand All @@ -149,25 +147,17 @@ def test_discover(source, config, client):
for schema in schemas:
jsonschema.Draft7Validator.check_schema(schema)

type(client).streams = PropertyMock(side_effect=Exception)

with pytest.raises(Exception):
source.discover(logger=logger, config=config)


def test_check_wrong_reader_options(source, config):
config["reader_options"] = '{encoding":"utf_16"}'
assert source.check(logger=logger, config=config) == AirbyteConnectionStatus(
status=Status.FAILED, message="Field 'reader_options' is not valid JSON object. https://www.json.org/"
)
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not valid JSON object. https://www.json.org/"):
source.check(logger=logger, config=config)


def test_check_google_spreadsheets_url(source, config):
config["url"] = "https://docs.google.com/spreadsheets/d/"
assert source.check(logger=logger, config=config) == AirbyteConnectionStatus(
status=Status.FAILED,
message="Failed to load https://docs.google.com/spreadsheets/d/: please use the Official Google Sheets Source connector",
)
with pytest.raises(AirbyteTracedException, match="Failed to load https://docs.google.com/spreadsheets/d/: please use the Official Google Sheets Source connector"):
source.check(logger=logger, config=config)


def test_pandas_header_not_none(absolute_path, test_files):
Expand Down Expand Up @@ -218,9 +208,11 @@ def test_incorrect_reader_options(absolute_path, test_files):
"provider": {"storage": "local"},
}

catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
source = SourceFile()
with pytest.raises(ConfigurationError) as e:
with pytest.raises(AirbyteTracedException, match="can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"):
_ = source.discover(logger=logger, config=deepcopy(config))

with pytest.raises(AirbyteTracedException, match="can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html"):
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records]
assert "can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html" in str(e.value)
Loading

0 comments on commit 809d86b

Please sign in to comment.