Skip to content

Commit

Permalink
chore: format code
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 committed Oct 25, 2024
1 parent b9a173a commit 54744ff
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import Any, Dict, List, Optional

import dpath

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.utils import schema_helpers
Expand Down Expand Up @@ -36,9 +35,7 @@ class AbstractFileBasedSpec(BaseModel):
order=10,
)

sync_config: Optional[BaseSyncConfig] = Field(
title="Sync Configuration", description="Sync configuration"
)
sync_config: Optional[BaseSyncConfig] = Field(title="Sync Configuration", description="Sync configuration")

use_file_transfer: bool = Field(title="File Sync (Experimental)", description="Enable file-based bulk load", default=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Literal

from airbyte_cdk import OneOfOptionConfig
from pydantic.v1 import BaseModel, Field


class BaseSyncConfig(BaseModel):
class Config(OneOfOptionConfig):
title = "Sync file configuration"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@


from typing import Literal
from pydantic.v1 import Field

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from pydantic.v1 import Field


class LocalSyncConfig(BaseSyncConfig):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Literal, Optional
from pydantic.v1 import Field

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from pydantic.v1 import Field


class S3SyncConfig(BaseSyncConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
raise ValueError(f"Stream {stream} is not a file-based stream.")
try:
parsed_config = self._get_parsed_config(config)
availability_method = stream.availability_strategy.check_availability if parsed_config.use_file_transfer else stream.availability_strategy.check_availability_and_parsability
availability_method = (
stream.availability_strategy.check_availability
if parsed_config.use_file_transfer
else stream.availability_strategy.check_availability_and_parsability
)
(
stream_is_available,
reason,
Expand Down Expand Up @@ -219,7 +223,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer), self, self.logger, stream_state, cursor
self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer),
self,
self.logger,
stream_state,
cursor,
)
else:
cursor = self.cursor_cls(stream_config)
Expand All @@ -244,7 +252,7 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer
use_file_transfer=use_file_transfer,
)

def _get_stream_from_catalog(self, stream_config: FileBasedStreamConfig) -> Optional[AirbyteStream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
from typing import Any, Dict, Generator, Iterable, Mapping, Optional
from uuid import uuid4


from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter


class _FileReader:
Expand All @@ -33,6 +32,7 @@ def read_data(
except Exception as ex:
logger.error("An error has occurred while reading file: %s", str(ex))


class BlobTransfer(FileTypeParser):
def __init__(self, file_reader: Optional[_FileReader] = None):
self._file_reader = file_reader if file_reader else _FileReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
)
from airbyte_cdk.sources.file_based.file_types import BlobTransfer
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, merge_schemas, schemaless_schema, file_transfer_schema
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, file_transfer_schema, merge_schemas, schemaless_schema
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.file_based.writers.stream_writer import FileTransferStreamWriter
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_cdk.sources.file_based.writers.stream_writer import FileTransferStreamWriter


class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
Expand All @@ -48,7 +48,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):

def __init__(self, **kwargs: Any):
if self.file_transfer_flag in kwargs:
self.use_file_transfer = kwargs.pop(self.file_transfer_flag, False)
self.use_file_transfer = kwargs.pop(self.file_transfer_flag, False)
super().__init__(**kwargs)

@property
Expand Down Expand Up @@ -127,9 +127,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
# todo: complete here the code to not rely on local parser
writer = FileTransferStreamWriter()
blob_transfer = BlobTransfer()
for record in blob_transfer.write_streams(
self.config, file, self.stream_reader, self.logger, writer
):
for record in blob_transfer.write_streams(self.config, file, self.stream_reader, self.logger, writer):
line_no += 1
if not self.record_passes_validation_policy(record):
n_skipped += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import logging
import os
import psutil
import time
from typing import Optional, Union, Any, Mapping
from typing import Any, Mapping, Optional, Union

import psutil
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.clients_config.local_sync_config import LocalSyncConfig

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"


class LocalFileTransferClient:
def __init__(self, config: Optional[Union[LocalSyncConfig, Mapping[str, Any]]] = None):
"""
Expand Down Expand Up @@ -49,7 +50,7 @@ def write(self, file_uri: str, fp, file_size: int, logger: logging.Logger):
absolute_file_path = os.path.abspath(local_file_path)

# Get available disk space
disk_usage = psutil.disk_usage('/')
disk_usage = psutil.disk_usage("/")
available_disk_space = disk_usage.free

# Get available memory
Expand All @@ -65,7 +66,6 @@ def write(self, file_uri: str, fp, file_size: int, logger: logging.Logger):
f"available memory: {available_memory / (1024 * 1024):,.2f} MB ({available_memory / (1024 * 1024 * 1024):.2f} GB)."
)


with open(local_file_path, "wb") as f:
# Measure the time for reading
logger.info(f"Starting to read the file")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import logging

import boto3
from botocore.session import Session

from airbyte_cdk.sources.file_based.config.clients_config import S3SyncConfig
from botocore.session import Session

MIN_CHUNK_SIZE = 5000000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import logging
from typing import Any, Mapping, Union

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter
from airbyte_cdk.sources.file_based.writers.local_file_client import LocalFileTransferClient
from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig


class FileTransferStreamWriter(AbstractFileBasedStreamWriter):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,3 @@ def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str
def file_size(self, file: RemoteFile):
file_size = self.sftp_client.sftp_connection.stat(file.uri).st_size
return file_size

0 comments on commit 54744ff

Please sign in to comment.