Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-Transfer Project Changes - DO NOT MERGE #46383

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#

from dataclasses import InitVar, dataclass
from typing import Annotated, Any, Dict, List, Mapping, Optional
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union

from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
from airbyte_protocol_dataclasses.models import *
from serpyco_rs.metadata import Alias

Expand Down Expand Up @@ -76,7 +77,7 @@ class AirbyteMessage:
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
state: Optional[AirbyteStateMessage] = None
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import Any, Dict, Optional


@dataclass
class AirbyteFileTransferRecordMessage:
stream: str
file: Dict[str, Any]
emitted_at: int
namespace: Optional[str] = None
data: Optional[Dict[str, Any]] = None
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Dict, List, Optional

import dpath
from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.utils import schema_helpers
from pydantic.v1 import AnyUrl, BaseModel, Field
Expand Down Expand Up @@ -34,6 +35,10 @@ class AbstractFileBasedSpec(BaseModel):
order=10,
)

sync_config: Optional[BaseSyncConfig] = Field(title="Sync Configuration", description="Sync configuration")

use_file_transfer: bool = Field(title="File Sync (Experimental)", description="Enable file-based bulk load", default=False)

@classmethod
@abstractmethod
def documentation_url(cls) -> AnyUrl:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
from pydantic.v1 import BaseModel, Field


class BlobFormat(BaseModel):
class Config(OneOfOptionConfig):
title = "Blob Format"
description = "File-based sync."
discriminator = "filetype"

filetype: str = Field(
"blob",
const=True,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .base_sync_config import BaseSyncConfig
from .s3_sync_config import S3SyncConfig

__all__ = ["BaseSyncConfig","S3SyncConfig"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Literal

from airbyte_cdk import OneOfOptionConfig
from pydantic.v1 import BaseModel, Field


class BaseSyncConfig(BaseModel):
class Config(OneOfOptionConfig):
title = "Sync file configuration"
discriminator = "sync_type"

sync_type: Literal["base"] = Field("base", const=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.


from typing import Literal

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from pydantic.v1 import Field


class LocalSyncConfig(BaseSyncConfig):
sync_type: Literal["local"] = Field("local", const=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Literal, Optional

from airbyte_cdk.sources.file_based.config.clients_config.base_sync_config import BaseSyncConfig
from pydantic.v1 import Field


class S3SyncConfig(BaseSyncConfig):
sync_type: Literal["s3"] = Field("s3", const=True)
bucket: str = Field(description="Name of the S3 bucket where the file(s) exist.", order=7)
aws_access_key_id: Optional[str] = Field(
title="AWS Access Key ID",
default=None,
description="In order to access private Buckets stored on AWS S3, this connector requires credentials with the proper "
"permissions. If accessing publicly available data, this field is not necessary.",
airbyte_secret=True,
always_show=True,
order=8,
)
aws_secret_access_key: Optional[str] = Field(
title="AWS Secret Access Key",
default=None,
description="In order to access private Buckets stored on AWS S3, this connector requires credentials with the proper "
"permissions. If accessing publicly available data, this field is not necessary.",
airbyte_secret=True,
always_show=True,
order=9,
)
path_prefix: str = Field(
default="",
description="By providing a path-like prefix (e.g. myFolder/thisTable/) under which all the relevant files sit, "
"we can optimize finding these in S3. This is optional but recommended if your bucket contains many "
"folders/files which you don't need to replicate.",
order=10,
)
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
try:
parsed_config = self._get_parsed_config(config)
availability_method = (
stream.availability_strategy.check_availability
if parsed_config.use_file_transfer
else stream.availability_strategy.check_availability_and_parsability
)
(
stream_is_available,
reason,
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
) = availability_method(stream, logger, self)
except AirbyteTracedException as ate:
errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
tracebacks.append(traceback.format_exc())
Expand Down Expand Up @@ -217,11 +223,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer),
self,
self.logger,
stream_state,
cursor,
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(stream_config, cursor)
stream = self._make_default_stream(stream_config, cursor, parsed_config.use_file_transfer)

streams.append(stream)
return streams
Expand All @@ -230,7 +240,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc

def _make_default_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor], use_file_transfer=False
) -> AbstractFileBasedStream:
return DefaultFileBasedStream(
config=stream_config,
Expand All @@ -242,6 +252,7 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer,
)

def _get_stream_from_catalog(self, stream_config: FileBasedStreamConfig) -> Optional[AirbyteStream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ def filter_files_by_globs_and_start_date(self, files: List[RemoteFile], globs: L
seen.add(file.uri)
yield file

def file_size(self, file: RemoteFile) -> int:
"""
Utility method to get size of the read file, is required for stream writer implementation
return size in bytes unit
"""
...

@staticmethod
def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
# Use the GLOBSTAR flag to enable recursive ** matching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .jsonl_parser import JsonlParser
from .parquet_parser import ParquetParser
from .unstructured_parser import UnstructuredParser
from .blob_transfer import BlobTransfer

default_parsers: Mapping[Type[Any], FileTypeParser] = {
AvroFormat: AvroParser(),
Expand All @@ -24,4 +25,4 @@
UnstructuredFormat: UnstructuredParser(),
}

__all__ = ["AvroParser", "CsvParser", "ExcelParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "default_parsers"]
__all__ = ["AvroParser", "CsvParser", "ExcelParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "BlobTransfer", "default_parsers"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Dict, Generator, Iterable, Mapping, Optional
from uuid import uuid4

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from airbyte_cdk.sources.file_based.writers.file_based_stream_writer import AbstractFileBasedStreamWriter


class _FileReader:
def read_data(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
file_read_mode: FileReadMode,
) -> Generator[Dict[str, Any], None, None]:

try:
file_size = stream_reader.file_size(file)
with stream_reader.open_file(file, file_read_mode, "UTF-8", logger) as fp:
yield fp, file_size

except Exception as ex:
logger.error("An error has occurred while reading file: %s", str(ex))


class BlobTransfer(FileTypeParser):
def __init__(self, file_reader: Optional[_FileReader] = None):
self._file_reader = file_reader if file_reader else _FileReader()

def write_streams(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
stream_writer: AbstractFileBasedStreamWriter = None,
) -> Iterable[Dict[str, Any]]:
file_no = 0
try:
data_generator = self._file_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for file_opened, file_size in data_generator:
yield from stream_writer.write(file.uri, file_opened, file_size, logger)
file_no += 1
except RecordParseError as parse_err:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=file_no) from parse_err
finally:
data_generator.close()

@property
def file_read_mode(self) -> FileReadMode:
return FileReadMode.READ

def get_parser_defined_primary_key(self, config: FileBasedStreamConfig) -> Optional[str]:
...

async def infer_schema(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
):
...

def parse_records(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
):
...

def check_config(self, config: FileBasedStreamConfig):
...
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]

schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}
file_transfer_schema = {"type": "object", "properties": {"data": {"type": "object"}, "file": {"type": "object"}}}


@total_ordering
Expand Down
Loading
Loading