Skip to content

Commit

Permalink
✨Feat(souce-sftp-bulk): file transfer changes (#47703)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 authored Oct 31, 2024
1 parent 705a0e8 commit f60e149
Show file tree
Hide file tree
Showing 10 changed files with 727 additions and 528 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"delivery_method": { "delivery_type": "use_file_transfer" },
"host": "localhost",
"port": 2222,
"username": "foo",
"credentials": {
"auth_type": "password",
"password": "pass"
},
"file_type": "json",
"start_date": "2021-01-01T00:00:00.000000Z",
"folder_path": "/files",
"streams": [
{
"name": "test_stream",
"file_type": "csv",
"globs": ["**/test_1.csv"],
"legacy_prefix": "",
"validation_policy": "Emit Record",
"format": {
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"double_quote": true,
"null_values": [
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null"
],
"true_values": ["1", "True", "TRUE", "true"],
"false_values": ["0", "False", "FALSE", "false"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": false,
"encoding": "utf8",
"header_definition": {
"header_definition_type": "From CSV"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def config_fixture(docker_client) -> Mapping[str, Any]:
yield config


@pytest.fixture(name="config_fixture_use_file_transfer", scope="session")
def config_fixture_use_file_transfer(docker_client) -> Mapping[str, Any]:
config = load_config("config_use_file_transfer.json")
config["host"] = get_docker_ip()
yield config


@pytest.fixture(name="config_private_key", scope="session")
def config_fixture_private_key(docker_client) -> Mapping[str, Any]:
config = load_config("config_private_key.json") | {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@


import logging
import os
from copy import deepcopy
from typing import Any, Mapping
from unittest.mock import ANY

import pytest
from airbyte_cdk import AirbyteTracedException, ConfiguredAirbyteCatalog, Status
Expand Down Expand Up @@ -90,3 +92,14 @@ def test_get_files_empty_files(configured_catalog: ConfiguredAirbyteCatalog, con
source = SourceSFTPBulk(catalog=configured_catalog, config=config_with_wrong_glob_pattern, state=None)
output = read(source=source, config=config_with_wrong_glob_pattern, catalog=configured_catalog)
assert len(output.records) == 0

def test_get_file_csv_file_transfer(configured_catalog: ConfiguredAirbyteCatalog, config_fixture_use_file_transfer: Mapping[str, Any]):
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_use_file_transfer, state=None)
output = read(source=source, config=config_fixture_use_file_transfer, catalog=configured_catalog)
expected_file_data = {'bytes': 37, 'file_relative_path': 'files/csv/test_1.csv', 'file_url': '/tmp/airbyte-file-transfer/files/csv/test_1.csv', 'modified': ANY, 'source_file_url': '/files/csv/test_1.csv'}
assert len(output.records) == 1
assert list(map(lambda record: record.record.file, output.records)) == [expected_file_data]

# Additional assertion to check if the file exists at the file_url path
file_path = expected_file_data['file_url']
assert os.path.exists(file_path), f"File not found at path: {file_path}"
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,46 @@
"required": ["name", "format"]
}
},
"delivery_method": {
"title": "Delivery Method",
"default": "use_records_transfer",
"type": "object",
"order": 7,
"display_type": "radio",
"group": "advanced",
"oneOf": [
{
"title": "Replicate Records",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_records_transfer",
"const": "use_records_transfer",
"enum": ["use_records_transfer"],
"type": "string"
}
},
"description": "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.",
"required": ["delivery_type"]
},
{
"title": "Copy Raw Files",
"type": "object",
"properties": {
"delivery_type": {
"title": "Delivery Type",
"default": "use_file_transfer",
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string"
}
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
"required": ["delivery_type"]
}
]
},
"host": {
"title": "Host Address",
"description": "The server host address",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 31e3242f-dee7-4cdc-a4b8-8e06c5458517
dockerImageTag: 1.2.0
dockerImageTag: 1.3.0
dockerRepository: airbyte/source-sftp-bulk
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp-bulk
githubIssueLabel: source-sftp-bulk
Expand Down
Loading

0 comments on commit f60e149

Please sign in to comment.