From 54744ff04bbcbe0b909399f38a922bdf1f70bcb9 Mon Sep 17 00:00:00 2001 From: Aldo Gonzalez Date: Fri, 25 Oct 2024 09:19:31 -0700 Subject: [PATCH] chore: format code --- .../file_based/config/abstract_file_based_spec.py | 5 +---- .../config/clients_config/base_sync_config.py | 3 +++ .../config/clients_config/local_sync_config.py | 3 ++- .../config/clients_config/s3_sync_config.py | 3 ++- .../sources/file_based/file_based_source.py | 14 +++++++++++--- .../sources/file_based/file_types/blob_transfer.py | 4 ++-- .../file_based/stream/default_file_based_stream.py | 10 ++++------ .../file_based/writers/local_file_client.py | 8 ++++---- .../sources/file_based/writers/s3_client.py | 3 +-- .../sources/file_based/writers/stream_writer.py | 3 ++- .../source_sftp_bulk/stream_reader.py | 1 - 11 files changed, 32 insertions(+), 25 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py index e3f42a4b36c1..18233f3a2d57 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py @@ -7,7 +7,6 @@ from typing import Any, Dict, List, Optional import dpath - from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from airbyte_cdk.sources.utils import schema_helpers @@ -36,9 +35,7 @@ class AbstractFileBasedSpec(BaseModel): order=10, ) - sync_config: Optional[BaseSyncConfig] = Field( - title="Sync Configuration", description="Sync configuration" - ) + sync_config: Optional[BaseSyncConfig] = Field(title="Sync Configuration", description="Sync configuration") use_file_transfer: bool = Field(title="File Sync (Experimental)", description="Enable file-based bulk load", default=False) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/base_sync_config.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/base_sync_config.py index 12306cea13bf..c15c640cfeed 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/base_sync_config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/base_sync_config.py @@ -1,8 +1,11 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + from typing import Literal from airbyte_cdk import OneOfOptionConfig from pydantic.v1 import BaseModel, Field + class BaseSyncConfig(BaseModel): class Config(OneOfOptionConfig): title = "Sync file configuration" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/local_sync_config.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/local_sync_config.py index 9cf3689ff770..6a0b79a13702 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/local_sync_config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/local_sync_config.py @@ -2,8 +2,9 @@ from typing import Literal -from pydantic.v1 import Field + from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig +from pydantic.v1 import Field class LocalSyncConfig(BaseSyncConfig): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/s3_sync_config.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/s3_sync_config.py index 0182401bd8ad..820570c30bf0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/s3_sync_config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/clients_config/s3_sync_config.py @@ -1,8 +1,9 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. from typing import Literal, Optional -from pydantic.v1 import Field + from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig +from pydantic.v1 import Field class S3SyncConfig(BaseSyncConfig): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py index bef25555caef..3001b4798df3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py @@ -128,7 +128,11 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> raise ValueError(f"Stream {stream} is not a file-based stream.") try: parsed_config = self._get_parsed_config(config) - availability_method = stream.availability_strategy.check_availability if parsed_config.use_file_transfer else stream.availability_strategy.check_availability_and_parsability + availability_method = ( + stream.availability_strategy.check_availability + if parsed_config.use_file_transfer + else stream.availability_strategy.check_availability_and_parsability + ) ( stream_is_available, reason, @@ -219,7 +223,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: CursorField(DefaultFileBasedStream.ab_last_mod_col), ) stream = FileBasedStreamFacade.create_from_stream( - self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer), self, self.logger, stream_state, cursor + self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer), + self, + self.logger, + stream_state, + cursor, ) else: cursor = self.cursor_cls(stream_config) @@ -244,7 +252,7 @@ def _make_default_stream( validation_policy=self._validate_and_get_validation_policy(stream_config), errors_collector=self.errors_collector, cursor=cursor, - use_file_transfer=use_file_transfer + use_file_transfer=use_file_transfer, ) def _get_stream_from_catalog(self, stream_config: FileBasedStreamConfig) -> Optional[AirbyteStream]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/blob_transfer.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/blob_transfer.py index 18014a60f4e2..580329f5877a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/blob_transfer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/blob_transfer.py @@ -5,14 +5,13 @@ from typing import Any, Dict, Generator, Iterable, Mapping, Optional from uuid import uuid4 - from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode -from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import SchemaType +from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter class _FileReader: @@ -33,6 +32,7 @@ def read_data( except Exception as ex: logger.error("An error has occurred while reading file: %s", str(ex)) + class BlobTransfer(FileTypeParser): def __init__(self, file_reader: Optional[_FileReader] = None): self._file_reader = file_reader if file_reader else _FileReader() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index dbaf1951af00..5575d2f3b82e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -22,15 +22,15 @@ ) from airbyte_cdk.sources.file_based.file_types import BlobTransfer from airbyte_cdk.sources.file_based.remote_file import RemoteFile -from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, merge_schemas, schemaless_schema, file_transfer_schema +from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, file_transfer_schema, merge_schemas, schemaless_schema from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor from airbyte_cdk.sources.file_based.types import StreamSlice +from airbyte_cdk.sources.file_based.writers.stream_writer import FileTransferStreamWriter from airbyte_cdk.sources.streams import IncrementalMixin from airbyte_cdk.sources.streams.core import JsonSchema from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message from airbyte_cdk.utils.traced_exception import AirbyteTracedException -from airbyte_cdk.sources.file_based.writers.stream_writer import FileTransferStreamWriter class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): @@ -48,7 +48,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): def __init__(self, **kwargs: Any): if self.file_transfer_flag in kwargs: - self.use_file_transfer = kwargs.pop(self.file_transfer_flag, False) + self.use_file_transfer = kwargs.pop(self.file_transfer_flag, False) super().__init__(**kwargs) @property @@ -127,9 +127,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte # todo: complete here the code to not rely on local parser writer = FileTransferStreamWriter() blob_transfer = BlobTransfer() - for record in blob_transfer.write_streams( - self.config, file, self.stream_reader, self.logger, writer - ): + for record in blob_transfer.write_streams(self.config, file, self.stream_reader, self.logger, writer): line_no += 1 if not self.record_passes_validation_policy(record): n_skipped += 1 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/local_file_client.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/local_file_client.py index d01eee6cc7ab..424204140779 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/local_file_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/local_file_client.py @@ -2,16 +2,17 @@ import logging import os -import psutil import time -from typing import Optional, Union, Any, Mapping +from typing import Any, Mapping, Optional, Union +import psutil from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.config.clients_config.local_sync_config import LocalSyncConfig AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files") DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer" + class LocalFileTransferClient: def __init__(self, config: Optional[Union[LocalSyncConfig, Mapping[str, Any]]] = None): """ @@ -49,7 +50,7 @@ def write(self, file_uri: str, fp, file_size: int, logger: logging.Logger): absolute_file_path = os.path.abspath(local_file_path) # Get available disk space - disk_usage = psutil.disk_usage('/') + disk_usage = psutil.disk_usage("/") available_disk_space = disk_usage.free # Get available memory @@ -65,7 +66,6 @@ def write(self, file_uri: str, fp, file_size: int, logger: logging.Logger): f"available memory: {available_memory / (1024 * 1024):,.2f} MB ({available_memory / (1024 * 1024 * 1024):.2f} GB)." ) - with open(local_file_path, "wb") as f: # Measure the time for reading logger.info(f"Starting to read the file") diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/s3_client.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/s3_client.py index acd0b93af3c0..e8665c6a5bf5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/s3_client.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/s3_client.py @@ -3,9 +3,8 @@ import logging import boto3 -from botocore.session import Session - from airbyte_cdk.sources.file_based.config.clients_config import S3SyncConfig +from botocore.session import Session MIN_CHUNK_SIZE = 5000000 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/stream_writer.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/stream_writer.py index 924ed73b2799..02985c20a4a0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/stream_writer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/writers/stream_writer.py @@ -4,9 +4,10 @@ import logging from typing import Any, Mapping, Union + +from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter from airbyte_cdk.sources.file_based.writers.local_file_client import LocalFileTransferClient -from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig class FileTransferStreamWriter(AbstractFileBasedStreamWriter): diff --git a/airbyte-integrations/connectors/source-sftp-bulk/source_sftp_bulk/stream_reader.py b/airbyte-integrations/connectors/source-sftp-bulk/source_sftp_bulk/stream_reader.py index cd15071dab1b..b92bd2cce3f7 100644 --- a/airbyte-integrations/connectors/source-sftp-bulk/source_sftp_bulk/stream_reader.py +++ b/airbyte-integrations/connectors/source-sftp-bulk/source_sftp_bulk/stream_reader.py @@ -88,4 +88,3 @@ def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str def file_size(self, file: RemoteFile): file_size = self.sftp_client.sftp_connection.stat(file.uri).st_size return file_size -