Skip to content

Commit

Permalink
Source GCS: Migrated to file based CDK (#31212)
Browse files Browse the repository at this point in the history
Co-authored-by: lazebnyi <[email protected]>
  • Loading branch information
lazebnyi and lazebnyi authored Oct 24, 2023
1 parent ce2342d commit 2d552a9
Show file tree
Hide file tree
Showing 19 changed files with 704 additions and 113 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-gcs
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
pip install '.[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,43 @@ connector_image: airbyte/source-gcs:dev
acceptance_tests:
spec:
tests:
- spec_path: "source_gcs/spec.yaml"
- spec_path: integration_tests/spec.json
backward_compatibility_tests_config:
disable_for_version: 0.2.0
connection:
tests:
- config_path: "secrets/config.json"
status: "succeed"
status: succeed
- config_path: "secrets/old_config.json"
status: succeed
- config_path: "integration_tests/invalid_config.json"
status: "failed"
status: exception
discovery:
tests:
- config_path: "secrets/config.json"
timeout_seconds: 2400
basic_read:
tests:
- config_path: "secrets/old_config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_trace_message_on_failure: false
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
expect_trace_message_on_failure: false
incremental:
bypass_reason: "This connector does not implement incremental sync"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
example_1:
- name: _ab_source_file_url
bypass_reason: "Uri has autogenerated token in query params"
example_2:
- name: _ab_source_file_url
bypass_reason: "Uri has autogenerated token in query params"
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": {
"_ab_source_file_last_modified": "2023-02-27T10:34:32.664000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_1.csv",
"history": {
"https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_1.csv": "2023-02-27T10:34:32.664000Z"
}
},
"stream_descriptor": {
"name": "example_1"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"_ab_source_file_last_modified": "2023-02-27T10:34:32.680000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_2.csv",
"history": {
"https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_2.csv": "2023-02-27T10:34:32.680000Z"
}
},
"stream_descriptor": {
"name": "example_2"
}
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
"stream": {
"name": "example_1",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "full_refresh",
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "example_2",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "full_refresh",
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
Expand Down
255 changes: 255 additions & 0 deletions airbyte-integrations/connectors/source-gcs/integration_tests/spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/gcs",
"connectionSpecification": {
"title": "Config",
"description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be\nmodified to uptake the changes because it is responsible for converting\nlegacy GCS configs into file based configs using the File-Based CDK.",
"type": "object",
"properties": {
"start_date": {
"title": "Start Date",
"description": "UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
"examples": ["2021-01-01T00:00:00.000000Z"],
"format": "date-time",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
"pattern_descriptor": "YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
"order": 1,
"type": "string"
},
"streams": {
"title": "The list of streams to sync",
"description": "Each instance of this configuration defines a <a href=https://docs.airbyte.com/cloud/core-concepts#stream>stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.",
"order": 3,
"type": "array",
"items": {
"title": "SourceGCSStreamConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name of the stream.",
"order": 0,
"type": "string"
},
"globs": {
"title": "Globs",
"description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href=\"https://en.wikipedia.org/wiki/Glob_(programming)\">here</a>.",
"order": 1,
"type": "array",
"items": {
"type": "string"
}
},
"legacy_prefix": {
"title": "Legacy Prefix",
"description": "The path prefix configured in previous versions of the GCS connector. This option is deprecated in favor of a single glob.",
"airbyte_hidden": true,
"type": "string"
},
"validation_policy": {
"title": "Validation Policy",
"description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
"default": "Emit Record",
"enum": ["Emit Record", "Skip Record", "Wait for Discover"]
},
"input_schema": {
"title": "Input Schema",
"description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
"type": "string"
},
"primary_key": {
"title": "Primary Key",
"description": "The column or columns (for a composite key) that serves as the unique identifier of a record.",
"type": "string"
},
"days_to_sync_if_history_is_full": {
"title": "Days To Sync If History Is Full",
"description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
"default": 3,
"type": "integer"
},
"format": {
"title": "Format",
"description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
"order": 2,
"type": "object",
"oneOf": [
{
"title": "CSV Format",
"type": "object",
"properties": {
"filetype": {
"title": "Filetype",
"default": "csv",
"const": "csv",
"type": "string"
},
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
"default": ",",
"type": "string"
},
"quote_char": {
"title": "Quote Character",
"description": "The character used for quoting CSV values. To disallow quoting, make this field blank.",
"default": "\"",
"type": "string"
},
"escape_char": {
"title": "Escape Character",
"description": "The character used for escaping special characters. To disallow escaping, leave this field blank.",
"type": "string"
},
"encoding": {
"title": "Encoding",
"description": "The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href=\"https://docs.python.org/3/library/codecs.html#standard-encodings\" target=\"_blank\">list of python encodings</a> for allowable options.",
"default": "utf8",
"type": "string"
},
"double_quote": {
"title": "Double Quote",
"description": "Whether two quotes in a quoted CSV value denote a single quote in the data.",
"default": true,
"type": "boolean"
},
"null_values": {
"title": "Null Values",
"description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
"default": [],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"strings_can_be_null": {
"title": "Strings Can Be Null",
"description": "Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
"default": true,
"type": "boolean"
},
"skip_rows_before_header": {
"title": "Skip Rows Before Header",
"description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
"default": 0,
"type": "integer"
},
"skip_rows_after_header": {
"title": "Skip Rows After Header",
"description": "The number of rows to skip after the header row.",
"default": 0,
"type": "integer"
},
"header_definition": {
"title": "CSV Header Definition",
"description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
"default": {
"header_definition_type": "From CSV"
},
"oneOf": [
{
"title": "From CSV",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "From CSV",
"const": "From CSV",
"type": "string"
}
}
},
{
"title": "Autogenerated",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "Autogenerated",
"const": "Autogenerated",
"type": "string"
}
}
},
{
"title": "User Provided",
"type": "object",
"properties": {
"header_definition_type": {
"title": "Header Definition Type",
"default": "User Provided",
"const": "User Provided",
"type": "string"
},
"column_names": {
"title": "Column Names",
"description": "The column names that will be used while emitting the CSV records",
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["column_names"]
}
],
"type": "object"
},
"true_values": {
"title": "True Values",
"description": "A set of case-sensitive strings that should be interpreted as true values.",
"default": ["y", "yes", "t", "true", "on", "1"],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"false_values": {
"title": "False Values",
"description": "A set of case-sensitive strings that should be interpreted as false values.",
"default": ["n", "no", "f", "false", "off", "0"],
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"inference_type": {
"title": "Inference Type",
"description": "How to infer the types of the columns. If none, inference default to strings.",
"default": "None",
"airbyte_hidden": true,
"enum": ["None", "Primitive Types Only"]
}
}
}
]
},
"schemaless": {
"title": "Schemaless",
"description": "When enabled, syncs will not validate or structure records against the stream's schema.",
"default": false,
"type": "boolean"
}
},
"required": ["name", "format"]
}
},
"service_account": {
"title": "Service Account Information",
"description": "Enter your Google Cloud <a href=\"https://cloud.google.com/iam/docs/creating-managing-service-account-keys#creating_service_account_keys\">service account key</a> in JSON format",
"airbyte_secret": true,
"order": 0,
"type": "string"
},
"bucket": {
"title": "Bucket",
"description": "Name of the GCS bucket where the file(s) exist.",
"order": 2,
"type": "string"
}
},
"required": ["streams", "service_account", "bucket"]
}
}
8 changes: 5 additions & 3 deletions airbyte-integrations/connectors/source-gcs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

import sys

from airbyte_cdk.entrypoint import launch
from source_gcs import SourceGCS
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from source_gcs import Config, Cursor, SourceGCS, SourceGCSStreamReader

if __name__ == "__main__":
source = SourceGCS()
_args = sys.argv[1:]
catalog_path = AirbyteEntrypoint.extract_catalog(_args)
source = SourceGCS(SourceGCSStreamReader(), Config, catalog_path, cursor_cls=Cursor)
launch(source, sys.argv[1:])
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-gcs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
dockerImageTag: 0.2.0
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-gcs
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
githubIssueLabel: source-gcs
Expand Down
Loading

0 comments on commit 2d552a9

Please sign in to comment.