diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py index 8096f72448c5..d852a7297495 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py @@ -70,10 +70,9 @@ def check_availability_and_parsability( def _check_list_files(self, stream: "AbstractFileBasedStream") -> List[RemoteFile]: try: files = stream.list_files() - except CustomFileBasedSourceException as custom_exc: - raise custom_exc except Exception as exc: - raise CheckAvailabilityError(FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name) from exc + error_message = exc.error_message if hasattr(exc, "error_message") else FileBasedSourceError.ERROR_LISTING_FILES + raise CheckAvailabilityError(error_message, stream=stream.name) from exc if not files: raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py index cb2a6a6b2891..d7277592878d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py @@ -40,9 +40,13 @@ class FileBasedSourceError(Enum): class BaseFileBasedSourceError(Exception): - def __init__(self, error: FileBasedSourceError, **kwargs): # type: ignore # noqa + def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa + is_default_error = isinstance(error, FileBasedSourceError) + if is_default_error: + error = FileBasedSourceError(error).value + self.error_message = error super().__init__( - f"{FileBasedSourceError(error).value} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" + f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" ) @@ -84,9 +88,3 @@ class StopSyncPerValidationPolicy(BaseFileBasedSourceError): class ErrorListingFiles(BaseFileBasedSourceError): pass - - -class CustomFileBasedSourceException(AirbyteTracedException): - """Custom exception that can be raised in connectors using file_based to avoid catching in DefaultFileBasedAvailabilityStrategy""" - - pass diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py index 04947c5ab621..ed866515a340 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py @@ -5,13 +5,12 @@ import logging from datetime import datetime from io import IOBase -from typing import Iterable, List, Optional, Set +from typing import Iterable, Union, List, Optional, Set import boto3.session import pytz import smart_open -from airbyte_cdk.models import FailureType -from airbyte_cdk.sources.file_based.exceptions import CustomFileBasedSourceException, ErrorListingFiles, FileBasedSourceError +from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode from airbyte_cdk.sources.file_based.remote_file import RemoteFile from botocore.client import BaseClient @@ -68,37 +67,32 @@ def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: lo prefixes = [prefix] if prefix else self.get_prefixes_from_globs(globs) seen = set() total_n_keys = 0 - raise_exception = None try: - if prefixes: - for prefix in prefixes: - for remote_file in self._page(s3, globs, self.config.bucket, prefix, seen, logger): - total_n_keys += 1 - yield remote_file - else: - for remote_file in self._page(s3, globs, self.config.bucket, None, seen, logger): + for current_prefix in (prefixes if prefixes else [None]): + for remote_file in self._page(s3, globs, self.config.bucket, current_prefix, seen, logger): total_n_keys += 1 yield remote_file logger.info(f"Finished listing objects from S3. Found {total_n_keys} objects total ({len(seen)} unique objects).") except ClientError as exc: if exc.response["Error"]["Code"] == "NoSuchBucket": - raise CustomFileBasedSourceException.from_exception( - exc, message=f"The bucket {self.config.bucket} does not exist.", failure_type=FailureType.config_error - ) from exc - raise_exception = exc + error = f"The bucket {self.config.bucket} does not exist." + else: + error = FileBasedSourceError.ERROR_LISTING_FILES + self.raise_error_listing_files(error, globs, exc) except Exception as exc: - raise_exception = exc - - if raise_exception: - raise ErrorListingFiles( - FileBasedSourceError.ERROR_LISTING_FILES, - source="s3", - bucket=self.config.bucket, - globs=globs, - endpoint=self.config.endpoint, - ) from raise_exception + self.raise_error_listing_files(FileBasedSourceError.ERROR_LISTING_FILES, globs, exc) + + def raise_error_listing_files(self, error_message: Union[str, FileBasedSourceError], globs: List[str], exc: Optional[Exception] = None): + """Helper method to raise the ErrorListingFiles exception.""" + raise ErrorListingFiles( + error_message, + source="s3", + bucket=self.config.bucket, + globs=globs, + endpoint=self.config.endpoint, + ) from exc def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase: try: