Skip to content

Commit

Permalink
Add CustomFileBasedException
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Oct 18, 2023
1 parent db3ff06 commit 3c18702
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedSourceException, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema
Expand Down Expand Up @@ -70,9 +70,10 @@ def check_availability_and_parsability(
def _check_list_files(self, stream: "AbstractFileBasedStream") -> List[RemoteFile]:
try:
files = stream.list_files()
except CustomFileBasedException as exc:
raise CheckAvailabilityError(str(exc), stream=stream.name) from exc
except Exception as exc:
error_message = exc.error_message if hasattr(exc, "error_message") else FileBasedSourceError.ERROR_LISTING_FILES
raise CheckAvailabilityError(error_message, stream=stream.name) from exc
raise CheckAvailabilityError(FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name) from exc

if not files:
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
Expand Down
13 changes: 10 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ class FileBasedSourceError(Enum):

class BaseFileBasedSourceError(Exception):
def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa
is_default_error = isinstance(error, FileBasedSourceError)
if is_default_error:
if isinstance(error, FileBasedSourceError):
error = FileBasedSourceError(error).value
self.error_message = error
super().__init__(
f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
)
Expand Down Expand Up @@ -88,3 +86,12 @@ class StopSyncPerValidationPolicy(BaseFileBasedSourceError):

class ErrorListingFiles(BaseFileBasedSourceError):
pass


class CustomFileBasedException(AirbyteTracedException):
"""
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import logging
from datetime import datetime
from io import IOBase
from typing import Iterable, Union, List, Optional, Set
from typing import Iterable, List, Optional, Set

import boto3.session
import pytz
import smart_open
from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from botocore.client import BaseClient
Expand Down Expand Up @@ -77,17 +78,17 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo
logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).")
except ClientError as exc:
if exc.response["Error"]["Code"] == "NoSuchBucket":
error = f"The bucket {self.config.bucket} does not exist."
else:
error = FileBasedSourceError.ERROR_LISTING_FILES
self.raise_error_listing_files(error, globs, exc)
raise CustomFileBasedException(
f"The bucket {self.config.bucket} does not exist.", failure_type=FailureType.config_error, exception=exc
)
self._raise_error_listing_files(globs, exc)
except Exception as exc:
self.raise_error_listing_files(FileBasedSourceError.ERROR_LISTING_FILES, globs, exc)
self._raise_error_listing_files(globs, exc)

def raise_error_listing_files(self, error_message: Union[str, FileBasedSourceError], globs: List[str], exc: Optional[Exception] = None):
def _raise_error_listing_files(self, globs: List[str], exc: Optional[Exception] = None):
"""Helper method to raise the ErrorListingFiles exception."""
raise ErrorListingFiles(
error_message,
FileBasedSourceError.ERROR_LISTING_FILES,
source="s3",
bucket=self.config.bucket,
globs=globs,
Expand Down

0 comments on commit 3c18702

Please sign in to comment.