Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source GCS: Migrated to file based CDK #31212

Merged
merged 24 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-gcs
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
pip install '.[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@ connector_image: airbyte/source-gcs:dev
acceptance_tests:
spec:
tests:
- spec_path: "source_gcs/spec.yaml"
- spec_path: integration_tests/spec.json
backward_compatibility_tests_config:
disable_for_version: 0.2.0
connection:
tests:
- config_path: "secrets/config.json"
status: "succeed"
status: succeed
- config_path: "integration_tests/invalid_config.json"
status: "failed"
status: exception
discovery:
tests:
- config_path: "secrets/config.json"
timeout_seconds: 2400
basic_read:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_trace_message_on_failure: false
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
incremental:
bypass_reason: "This connector does not implement incremental sync"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
example_1:
- name: _ab_source_file_url
bypass_reason: "Uri has autogenerated token in query params"
example_2:
- name: _ab_source_file_url
bypass_reason: "Uri has autogenerated token in query params"
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
{
"stream": {
"name": "example_2",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
255 changes: 255 additions & 0 deletions airbyte-integrations/connectors/source-gcs/integration_tests/spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/gcs",
"connectionSpecification": {
"title": "Config",
"description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be\nmodified to uptake the changes because it is responsible for converting\nlegacy GCS configs into file based configs using the File-Based CDK.",
"type": "object",
"properties": {
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
"examples": ["2021-01-01T00:00:00.000000Z"],
"format": "date-time",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
"order": 1,
"type": "string"
},
"streams": {
"title": "The list of streams to sync",
"description": "Each instance of this configuration defines a <a href=https://docs.airbyte.com/cloud/core-concepts#stream>stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.",
"order": 3,
"type": "array",
"items": {
"title": "SourceGCSStreamConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name of the stream.",
"order": 0,
"type": "string"
},
"globs": {
"title": "Globs",
"description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href=\"https://en.wikipedia.org/wiki/Glob_(programming)\">here</a>.",
"order": 1,
"type": "array",
"items": {
"type": "string"
}
},
"legacy_prefix": {
"title": "Legacy Prefix",
"description": "The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
"airbyte_hidden": true,
"type": "string"
},
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
"validation_policy": {
"title": "Validation Policy",
"description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
"default": "Emit Record",
"enum": ["Emit Record", "Skip Record", "Wait for Discover"]
},
"input_schema": {
"title": "Input Schema",
"description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
"type": "string"
},
"primary_key": {
"title": "Primary Key",
"description": "The column or columns (for a composite key) that serves as the unique identifier of a record.",
"type": "string"
},
"days_to_sync_if_history_is_full": {
"title": "Days To Sync If History Is Full",
"description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
"default": 3,
"type": "integer"
},
"format": {
"title": "Format",
"description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
"order": 2,
"type": "object",
"oneOf": [
{
"title": "CSV Format",
"type": "object",
"properties": {
"filetype": {
"title": "Filetype",
"default": "csv",
"const": "csv",
"type": "string"
},
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
"default": ",",
"type": "string"
},
"quote_char": {
"title": "Quote Character",
"description": "The character used for quoting CSV values. To disallow quoting, make this field blank.",
"default": "\"",
"type": "string"
},
"escape_char": {
"title": "Escape Character",
"description": "The character used for escaping special characters. To disallow escaping, leave this field blank.",
"type": "string"
},
"encoding": {
"title": "Encoding",
"description": "The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href=\"https://docs.python.org/3/library/codecs.html#standard-encodings\" target=\"_blank\">list of python encodings</a> for allowable options.",
"default": "utf8",
"type": "string"
},
"double_quote": {
"title": "Double Quote",
"description": "Whether two quotes in a quoted CSV value denote a single quote in the data.",
"default": true,
"type": "boolean"
},
"null_values": {
"title": "Null Values",
"description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
"default": [],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"strings_can_be_null": {
"title": "Strings Can Be Null",
"description": "Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
"default": true,
"type": "boolean"
},
"skip_rows_before_header": {
"title": "Skip Rows Before Header",
"description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
"default": 0,
"type": "integer"
},
"skip_rows_after_header": {
"title": "Skip Rows After Header",
"description": "The number of rows to skip after the header row.",
"default": 0,
"type": "integer"
},
"header_definition": {
"title": "CSV Header Definition",
"description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
"default": {
"header_definition_type": "From CSV"
},
"oneOf": [
{
"title": "From CSV",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "From CSV",
"const": "From CSV",
"type": "string"
}
}
},
{
"title": "Autogenerated",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "Autogenerated",
"const": "Autogenerated",
"type": "string"
}
}
},
{
"title": "User Provided",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "User Provided",
"const": "User Provided",
"type": "string"
},
"column_names": {
"title": "Column Names",
"description": "The column names that will be used while emitting the CSV records",
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["column_names"]
}
],
"type": "object"
},
"true_values": {
"title": "True Values",
"description": "A set of case-sensitive strings that should be interpreted as true values.",
"default": ["y", "yes", "t", "true", "on", "1"],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"false_values": {
"title": "False Values",
"description": "A set of case-sensitive strings that should be interpreted as false values.",
"default": ["n", "no", "f", "false", "off", "0"],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"inference_type": {
"title": "Inference Type",
"description": "How to infer the types of the columns. If none, inference default to strings.",
"default": "None",
"airbyte_hidden": true,
"enum": ["None", "Primitive Types Only"]
}
}
}
]
},
"schemaless": {
"title": "Schemaless",
"description": "When enabled, syncs will not validate or structure records against the stream's schema.",
"default": false,
"type": "boolean"
}
},
"required": ["name", "format"]
}
},
"service_account": {
"title": "Service Account Information",
"description": "Enter your Google Cloud <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\">service account key</a> in JSON format",
"airbyte_secret": true,
"order": 0,
"type": "string"
},
"bucket": {
"title": "Bucket",
"description": "Name of the GCS bucket where the file(s) exist.",
"order": 2,
"type": "string"
}
},
"required": ["streams", "service_account", "bucket"]
}
}
12 changes: 9 additions & 3 deletions airbyte-integrations/connectors/source-gcs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@

import sys

from airbyte_cdk.entrypoint import launch
from source_gcs import SourceGCS
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from source_gcs import Config, SourceGCS, SourceGCSStreamReader

if __name__ == "__main__":
source = SourceGCS()
_args = sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(_args)
source = SourceGCS(
SourceGCSStreamReader(),
Config,
catalog_path,
)
launch(source, sys.argv[1:])
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerImageTag: 0.2.0
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-gcs
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
githubIssueLabel: source-gcs
Expand Down
9 changes: 8 additions & 1 deletion airbyte-integrations/connectors/source-gcs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "google-cloud-storage==2.5.0", "pandas==1.5.3"]
MAIN_REQUIREMENTS = [
"airbyte-cdk>=0.51.17",
"google-cloud-storage==2.12.0",
"pandas==1.5.3",
"pyarrow==12.0.1",
"fastavro==1.4.11",
"smart-open[s3]==5.1.0",
]

TEST_REQUIREMENTS = [
"requests-mock~=1.9.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .config import Config
from .legacy_config_transformer import LegacyConfigTransformer
from .source import SourceGCS
from .stream_reader import SourceGCSStreamReader

__all__ = ["SourceGCS"]
__all__ = [
"Config",
"LegacyConfigTransformer",
"SourceGCS",
"SourceGCSStreamReader",
]
Loading
Loading