Skip to content

Commit

Permalink
Refactor custom error raising
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Oct 18, 2023
1 parent 0e2bbb5 commit db3ff06
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ def check_availability_and_parsability(
def _check_list_files(self, stream: "AbstractFileBasedStream") -> List[RemoteFile]:
try:
files = stream.list_files()
except CustomFileBasedSourceException as custom_exc:
raise custom_exc
except Exception as exc:
raise CheckAvailabilityError(FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name) from exc
error_message = exc.error_message if hasattr(exc, "error_message") else FileBasedSourceError.ERROR_LISTING_FILES
raise CheckAvailabilityError(error_message, stream=stream.name) from exc

if not files:
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
Expand Down
14 changes: 6 additions & 8 deletions airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ class FileBasedSourceError(Enum):


class BaseFileBasedSourceError(Exception):
def __init__(self, error: FileBasedSourceError, **kwargs): # type: ignore # noqa
def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa
is_default_error = isinstance(error, FileBasedSourceError)
if is_default_error:
error = FileBasedSourceError(error).value
self.error_message = error
super().__init__(
f"{FileBasedSourceError(error).value} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
)


Expand Down Expand Up @@ -84,9 +88,3 @@ class StopSyncPerValidationPolicy(BaseFileBasedSourceError):

class ErrorListingFiles(BaseFileBasedSourceError):
pass


class CustomFileBasedSourceException(AirbyteTracedException):
"""Custom exception that can be raised in connectors using file_based to avoid catching in DefaultFileBasedAvailabilityStrategy"""

pass
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import logging
from datetime import datetime
from io import IOBase
from typing import Iterable, List, Optional, Set
from typing import Iterable, Union, List, Optional, Set

import boto3.session
import pytz
import smart_open
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.exceptions import CustomFileBasedSourceException, ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from botocore.client import BaseClient
Expand Down Expand Up @@ -68,37 +67,32 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo
prefixes = [prefix] if prefix else self.get_prefixes_from_globs(globs)
seen = set()
total_n_keys = 0
raise_exception = None

try:
if prefixes:
for prefix in prefixes:
for remote_file in self._page(s3, globs, self.config.bucket, prefix, seen, logger):
total_n_keys += 1
yield remote_file
else:
for remote_file in self._page(s3, globs, self.config.bucket, None, seen, logger):
for current_prefix in (prefixes if prefixes else [None]):
for remote_file in self._page(s3, globs, self.config.bucket, current_prefix, seen, logger):
total_n_keys += 1
yield remote_file

logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).")
except ClientError as exc:
if exc.response["Error"]["Code"] == "NoSuchBucket":
raise CustomFileBasedSourceException.from_exception(
exc, message=f"The bucket {self.config.bucket} does not exist.", failure_type=FailureType.config_error
) from exc
raise_exception = exc
error = f"The bucket {self.config.bucket} does not exist."
else:
error = FileBasedSourceError.ERROR_LISTING_FILES
self.raise_error_listing_files(error, globs, exc)
except Exception as exc:
raise_exception = exc

if raise_exception:
raise ErrorListingFiles(
FileBasedSourceError.ERROR_LISTING_FILES,
source="s3",
bucket=self.config.bucket,
globs=globs,
endpoint=self.config.endpoint,
) from raise_exception
self.raise_error_listing_files(FileBasedSourceError.ERROR_LISTING_FILES, globs, exc)

def raise_error_listing_files(self, error_message: Union[str, FileBasedSourceError], globs: List[str], exc: Optional[Exception] = None):
"""Helper method to raise the ErrorListingFiles exception."""
raise ErrorListingFiles(
error_message,
source="s3",
bucket=self.config.bucket,
globs=globs,
endpoint=self.config.endpoint,
) from exc

def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase:
try:
Expand Down

0 comments on commit db3ff06

Please sign in to comment.