diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py index d852a7297495..fbd8fc917ab2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py @@ -8,7 +8,7 @@ from airbyte_cdk.sources import Source from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy -from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedSourceException, FileBasedSourceError +from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, FileBasedSourceError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema @@ -70,9 +70,10 @@ def check_availability_and_parsability( def _check_list_files(self, stream: "AbstractFileBasedStream") -> List[RemoteFile]: try: files = stream.list_files() + except CustomFileBasedException as exc: + raise CheckAvailabilityError(str(exc), stream=stream.name) from exc except Exception as exc: - error_message = exc.error_message if hasattr(exc, "error_message") else FileBasedSourceError.ERROR_LISTING_FILES - raise CheckAvailabilityError(error_message, stream=stream.name) from exc + raise CheckAvailabilityError(FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name) from exc if not files: raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py index d7277592878d..36e837be11dd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py @@ -41,10 +41,8 @@ class FileBasedSourceError(Enum): class BaseFileBasedSourceError(Exception): def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa - is_default_error = isinstance(error, FileBasedSourceError) - if is_default_error: + if isinstance(error, FileBasedSourceError): error = FileBasedSourceError(error).value - self.error_message = error super().__init__( f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" ) @@ -88,3 +86,12 @@ class StopSyncPerValidationPolicy(BaseFileBasedSourceError): class ErrorListingFiles(BaseFileBasedSourceError): pass + + +class CustomFileBasedException(AirbyteTracedException): + """ + A specialized exception for file-based connectors. + + This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. + """ + pass diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py index ed866515a340..c0f7f7858bb6 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py @@ -5,12 +5,13 @@ import logging from datetime import datetime from io import IOBase -from typing import Iterable, Union, List, Optional, Set +from typing import Iterable, List, Optional, Set import boto3.session import pytz import smart_open -from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError +from airbyte_cdk.models import FailureType +from airbyte_cdk.sources.file_based.exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode from airbyte_cdk.sources.file_based.remote_file import RemoteFile from botocore.client import BaseClient @@ -77,17 +78,17 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).") except ClientError as exc: if exc.response["Error"]["Code"] == "NoSuchBucket": - error = f"The bucket {self.config.bucket} does not exist." - else: - error = FileBasedSourceError.ERROR_LISTING_FILES - self.raise_error_listing_files(error, globs, exc) + raise CustomFileBasedException( + f"The bucket {self.config.bucket} does not exist.", failure_type=FailureType.config_error, exception=exc + ) + self._raise_error_listing_files(globs, exc) except Exception as exc: - self.raise_error_listing_files(FileBasedSourceError.ERROR_LISTING_FILES, globs, exc) + self._raise_error_listing_files(globs, exc) - def raise_error_listing_files(self, error_message: Union[str, FileBasedSourceError], globs: List[str], exc: Optional[Exception] = None): + def _raise_error_listing_files(self, globs: List[str], exc: Optional[Exception] = None): """Helper method to raise the ErrorListingFiles exception.""" raise ErrorListingFiles( - error_message, + FileBasedSourceError.ERROR_LISTING_FILES, source="s3", bucket=self.config.bucket, globs=globs,