From 2d552a99e457ce8a0117a6f73225f0433a59e38f Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:35:05 +0200 Subject: [PATCH] Source GCS: Migrated to file based CDK (#31212) Co-authored-by: lazebnyi --- .../connectors/source-gcs/Dockerfile | 2 +- .../connectors/source-gcs/README.md | 2 +- .../source-gcs/acceptance-test-config.yml | 29 +- .../integration_tests/abnormal_state.json | 30 +++ .../integration_tests/configured_catalog.json | 8 +- .../source-gcs/integration_tests/spec.json | 255 ++++++++++++++++++ .../connectors/source-gcs/main.py | 8 +- .../connectors/source-gcs/metadata.yaml | 2 +- .../connectors/source-gcs/setup.py | 9 +- .../source-gcs/source_gcs/__init__.py | 13 +- .../source-gcs/source_gcs/config.py | 82 ++++++ .../source-gcs/source_gcs/cursor.py | 58 ++++ .../source-gcs/source_gcs/helpers.py | 8 +- .../source_gcs/legacy_config_transformer.py | 45 ++++ .../source-gcs/source_gcs/source.py | 82 ++---- .../connectors/source-gcs/source_gcs/spec.py | 39 +++ .../source-gcs/source_gcs/spec.yaml | 25 -- .../source-gcs/source_gcs/stream_reader.py | 111 ++++++++ docs/integrations/sources/gcs.md | 9 +- 19 files changed, 704 insertions(+), 113 deletions(-) create mode 100644 airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-gcs/integration_tests/spec.json create mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/config.py create mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/cursor.py create mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/legacy_config_transformer.py create mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/spec.py delete mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/spec.yaml create mode 100644 airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py diff --git a/airbyte-integrations/connectors/source-gcs/Dockerfile b/airbyte-integrations/connectors/source-gcs/Dockerfile index 0d9dda5d897c..e620ea6148df 100644 --- a/airbyte-integrations/connectors/source-gcs/Dockerfile +++ b/airbyte-integrations/connectors/source-gcs/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.0 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/source-gcs diff --git a/airbyte-integrations/connectors/source-gcs/README.md b/airbyte-integrations/connectors/source-gcs/README.md index 3695276558a2..b304da069db0 100644 --- a/airbyte-integrations/connectors/source-gcs/README.md +++ b/airbyte-integrations/connectors/source-gcs/README.md @@ -79,7 +79,7 @@ docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integrat Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. First install test dependencies into your virtual environment: ``` -pip install .[tests] +pip install '.[tests]' ``` ### Unit Tests To run unit tests locally, from the connector directory run: diff --git a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml index 3afe21a1ee3d..2114c72238d1 100644 --- a/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-gcs/acceptance-test-config.yml @@ -4,24 +4,43 @@ connector_image: airbyte/source-gcs:dev acceptance_tests: spec: tests: - - spec_path: "source_gcs/spec.yaml" + - spec_path: integration_tests/spec.json + backward_compatibility_tests_config: + disable_for_version: 0.2.0 connection: tests: - config_path: "secrets/config.json" - status: "succeed" + status: succeed + - config_path: "secrets/old_config.json" + status: succeed - config_path: "integration_tests/invalid_config.json" - status: "failed" + status: exception discovery: tests: - config_path: "secrets/config.json" + timeout_seconds: 2400 basic_read: tests: + - config_path: "secrets/old_config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + expect_trace_message_on_failure: false - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] + expect_trace_message_on_failure: false incremental: - bypass_reason: "This connector does not implement incremental sync" + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state: + future_state_path: "integration_tests/abnormal_state.json" full_refresh: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + ignored_fields: + example_1: + - name: _ab_source_file_url + bypass_reason: "Uri has autogenerated token in query params" + example_2: + - name: _ab_source_file_url + bypass_reason: "Uri has autogenerated token in query params" diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..18702dfa6966 --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/abnormal_state.json @@ -0,0 +1,30 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_state": { + "_ab_source_file_last_modified": "2023-02-27T10:34:32.664000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_1.csv", + "history": { + "https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_1.csv": "2023-02-27T10:34:32.664000Z" + } + }, + "stream_descriptor": { + "name": "example_1" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_state": { + "_ab_source_file_last_modified": "2023-02-27T10:34:32.680000Z_https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_2.csv", + "history": { + "https://storage.googleapis.com/airbyte-integration-test-source-gcs/test_folder/example_2.csv": "2023-02-27T10:34:32.680000Z" + } + }, + "stream_descriptor": { + "name": "example_2" + } + } + } +] diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index 9836beb15b42..f6dd1106608b 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -4,18 +4,18 @@ "stream": { "name": "example_1", "json_schema": {}, - "supported_sync_modes": ["full_refresh"] + "supported_sync_modes": ["full_refresh", "incremental"] }, - "sync_mode": "full_refresh", + "sync_mode": "incremental", "destination_sync_mode": "overwrite" }, { "stream": { "name": "example_2", "json_schema": {}, - "supported_sync_modes": ["full_refresh"] + "supported_sync_modes": ["full_refresh", "incremental"] }, - "sync_mode": "full_refresh", + "sync_mode": "incremental", "destination_sync_mode": "overwrite" } ] diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json new file mode 100644 index 000000000000..49c035192ebc --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/spec.json @@ -0,0 +1,255 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/gcs", + "connectionSpecification": { + "title": "Config", + "description": "NOTE: When this Spec is changed, legacy_config_transformer.py must also be\nmodified to uptake the changes because it is responsible for converting\nlegacy GCS configs into file based configs using the File-Based CDK.", + "type": "object", + "properties": { + "start_date": { + "title": "Start Date", + "description": "UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", + "examples": ["2021-01-01T00:00:00.000000Z"], + "format": "date-time", + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", + "pattern_descriptor": "YYYY-MM-DDTHH:mm:ss.SSSSSSZ", + "order": 1, + "type": "string" + }, + "streams": { + "title": "The list of streams to sync", + "description": "Each instance of this configuration defines a stream. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.", + "order": 3, + "type": "array", + "items": { + "title": "SourceGCSStreamConfig", + "type": "object", + "properties": { + "name": { + "title": "Name", + "description": "The name of the stream.", + "order": 0, + "type": "string" + }, + "globs": { + "title": "Globs", + "description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look here.", + "order": 1, + "type": "array", + "items": { + "type": "string" + } + }, + "legacy_prefix": { + "title": "Legacy Prefix", + "description": "The path prefix configured in previous versions of the GCS connector. This option is deprecated in favor of a single glob.", + "airbyte_hidden": true, + "type": "string" + }, + "validation_policy": { + "title": "Validation Policy", + "description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", + "default": "Emit Record", + "enum": ["Emit Record", "Skip Record", "Wait for Discover"] + }, + "input_schema": { + "title": "Input Schema", + "description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", + "type": "string" + }, + "primary_key": { + "title": "Primary Key", + "description": "The column or columns (for a composite key) that serves as the unique identifier of a record.", + "type": "string" + }, + "days_to_sync_if_history_is_full": { + "title": "Days To Sync If History Is Full", + "description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", + "default": 3, + "type": "integer" + }, + "format": { + "title": "Format", + "description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", + "order": 2, + "type": "object", + "oneOf": [ + { + "title": "CSV Format", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "default": "csv", + "const": "csv", + "type": "string" + }, + "delimiter": { + "title": "Delimiter", + "description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", + "default": ",", + "type": "string" + }, + "quote_char": { + "title": "Quote Character", + "description": "The character used for quoting CSV values. To disallow quoting, make this field blank.", + "default": "\"", + "type": "string" + }, + "escape_char": { + "title": "Escape Character", + "description": "The character used for escaping special characters. To disallow escaping, leave this field blank.", + "type": "string" + }, + "encoding": { + "title": "Encoding", + "description": "The character encoding of the CSV data. Leave blank to default to UTF8. See list of python encodings for allowable options.", + "default": "utf8", + "type": "string" + }, + "double_quote": { + "title": "Double Quote", + "description": "Whether two quotes in a quoted CSV value denote a single quote in the data.", + "default": true, + "type": "boolean" + }, + "null_values": { + "title": "Null Values", + "description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", + "default": [], + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true + }, + "strings_can_be_null": { + "title": "Strings Can Be Null", + "description": "Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.", + "default": true, + "type": "boolean" + }, + "skip_rows_before_header": { + "title": "Skip Rows Before Header", + "description": "The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", + "default": 0, + "type": "integer" + }, + "skip_rows_after_header": { + "title": "Skip Rows After Header", + "description": "The number of rows to skip after the header row.", + "default": 0, + "type": "integer" + }, + "header_definition": { + "title": "CSV Header Definition", + "description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", + "default": { + "header_definition_type": "From CSV" + }, + "oneOf": [ + { + "title": "From CSV", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "From CSV", + "const": "From CSV", + "type": "string" + } + } + }, + { + "title": "Autogenerated", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "Autogenerated", + "const": "Autogenerated", + "type": "string" + } + } + }, + { + "title": "User Provided", + "type": "object", + "properties": { + "header_definition_type": { + "title": "Header Definition Type", + "default": "User Provided", + "const": "User Provided", + "type": "string" + }, + "column_names": { + "title": "Column Names", + "description": "The column names that will be used while emitting the CSV records", + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": ["column_names"] + } + ], + "type": "object" + }, + "true_values": { + "title": "True Values", + "description": "A set of case-sensitive strings that should be interpreted as true values.", + "default": ["y", "yes", "t", "true", "on", "1"], + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true + }, + "false_values": { + "title": "False Values", + "description": "A set of case-sensitive strings that should be interpreted as false values.", + "default": ["n", "no", "f", "false", "off", "0"], + "type": "array", + "items": { + "type": "string" + }, + "uniqueItems": true + }, + "inference_type": { + "title": "Inference Type", + "description": "How to infer the types of the columns. If none, inference default to strings.", + "default": "None", + "airbyte_hidden": true, + "enum": ["None", "Primitive Types Only"] + } + } + } + ] + }, + "schemaless": { + "title": "Schemaless", + "description": "When enabled, syncs will not validate or structure records against the stream's schema.", + "default": false, + "type": "boolean" + } + }, + "required": ["name", "format"] + } + }, + "service_account": { + "title": "Service Account Information", + "description": "Enter your Google Cloud service account key in JSON format", + "airbyte_secret": true, + "order": 0, + "type": "string" + }, + "bucket": { + "title": "Bucket", + "description": "Name of the GCS bucket where the file(s) exist.", + "order": 2, + "type": "string" + } + }, + "required": ["streams", "service_account", "bucket"] + } +} diff --git a/airbyte-integrations/connectors/source-gcs/main.py b/airbyte-integrations/connectors/source-gcs/main.py index 74e5bf63ab7e..c98b5b943cc7 100644 --- a/airbyte-integrations/connectors/source-gcs/main.py +++ b/airbyte-integrations/connectors/source-gcs/main.py @@ -5,9 +5,11 @@ import sys -from airbyte_cdk.entrypoint import launch -from source_gcs import SourceGCS +from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch +from source_gcs import Config, Cursor, SourceGCS, SourceGCSStreamReader if __name__ == "__main__": - source = SourceGCS() + _args = sys.argv[1:] + catalog_path = AirbyteEntrypoint.extract_catalog(_args) + source = SourceGCS(SourceGCSStreamReader(), Config, catalog_path, cursor_cls=Cursor) launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-gcs/metadata.yaml b/airbyte-integrations/connectors/source-gcs/metadata.yaml index 927b6b006180..5b93da60a465 100644 --- a/airbyte-integrations/connectors/source-gcs/metadata.yaml +++ b/airbyte-integrations/connectors/source-gcs/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: file connectorType: source definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820 - dockerImageTag: 0.2.0 + dockerImageTag: 0.3.0 dockerRepository: airbyte/source-gcs documentationUrl: https://docs.airbyte.com/integrations/sources/gcs githubIssueLabel: source-gcs diff --git a/airbyte-integrations/connectors/source-gcs/setup.py b/airbyte-integrations/connectors/source-gcs/setup.py index 73669bd2ee21..28a0e40a874a 100644 --- a/airbyte-integrations/connectors/source-gcs/setup.py +++ b/airbyte-integrations/connectors/source-gcs/setup.py @@ -5,7 +5,14 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "google-cloud-storage==2.5.0", "pandas==1.5.3"] +MAIN_REQUIREMENTS = [ + "airbyte-cdk>=0.51.17", + "google-cloud-storage==2.12.0", + "pandas==1.5.3", + "pyarrow==12.0.1", + "fastavro==1.4.11", + "smart-open[s3]==5.1.0", +] TEST_REQUIREMENTS = [ "requests-mock~=1.9.3", diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/__init__.py b/airbyte-integrations/connectors/source-gcs/source_gcs/__init__.py index 917e71deaf26..852a5b50e206 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/__init__.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/__init__.py @@ -2,7 +2,16 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +from .config import Config +from .cursor import Cursor +from .legacy_config_transformer import LegacyConfigTransformer from .source import SourceGCS +from .stream_reader import SourceGCSStreamReader -__all__ = ["SourceGCS"] +__all__ = [ + "Config", + "Cursor", + "LegacyConfigTransformer", + "SourceGCS", + "SourceGCSStreamReader", +] diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py new file mode 100644 index 000000000000..ebd1117841e1 --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import List, Optional + +from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec +from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat +from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig +from pydantic import AnyUrl, Field + + +class SourceGCSStreamConfig(FileBasedStreamConfig): + name: str = Field(title="Name", description="The name of the stream.", order=0) + globs: Optional[List[str]] = Field( + title="Globs", + description="The pattern used to specify which files should be selected from the file system. For more information on glob " + 'pattern matching look here.', + order=1, + ) + format: CsvFormat = Field( + title="Format", + description="The configuration options that are used to alter how to read incoming files that deviate from " + "the standard formatting.", + order=2, + ) + legacy_prefix: Optional[str] = Field( + title="Legacy Prefix", + description="The path prefix configured in previous versions of the GCS connector. " + "This option is deprecated in favor of a single glob.", + airbyte_hidden=True, + ) + + +class Config(AbstractFileBasedSpec): + """ + NOTE: When this Spec is changed, legacy_config_transformer.py must also be + modified to uptake the changes because it is responsible for converting + legacy GCS configs into file based configs using the File-Based CDK. + """ + + service_account: str = Field( + title="Service Account Information", + airbyte_secret=True, + description=( + "Enter your Google Cloud " + '' + "service account key in JSON format" + ), + order=0, + ) + + bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=2) + + streams: List[SourceGCSStreamConfig] = Field( + title="The list of streams to sync", + description=( + "Each instance of this configuration defines a stream. " + "Use this to define which files belong in the stream, their format, and how they should be " + "parsed and validated. When sending data to warehouse destination such as Snowflake or " + "BigQuery, each stream is a separate table." + ), + order=3, + ) + + @classmethod + def documentation_url(cls) -> AnyUrl: + """ + Returns the documentation URL. + """ + return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https") + + @staticmethod + def replace_enum_allOf_and_anyOf(schema): + """ + Replace allOf with anyOf when appropriate in the schema with one value. + """ + objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] + if len(objects_to_check.get("allOf", [])) == 1: + objects_to_check["anyOf"] = objects_to_check.pop("allOf") + + return super(Config, Config).replace_enum_allOf_and_anyOf(schema) diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/cursor.py b/airbyte-integrations/connectors/source-gcs/source_gcs/cursor.py new file mode 100644 index 000000000000..e55bbd20e29c --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/cursor.py @@ -0,0 +1,58 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import logging +from datetime import datetime + +from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor + + +class Cursor(DefaultFileBasedCursor): + @staticmethod + def get_file_uri(file: RemoteFile) -> str: + return file.uri.split("?")[0] + + def add_file(self, file: RemoteFile) -> None: + uri = self.get_file_uri(file) + self._file_to_datetime_history[uri] = file.last_modified.strftime(self.DATE_TIME_FORMAT) + if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: + # Get the earliest file based on its last modified date and its uri + oldest_file = self._compute_earliest_file_in_history() + if oldest_file: + del self._file_to_datetime_history[oldest_file.uri] + else: + raise Exception( + "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." + ) + + def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: + uri = self.get_file_uri(file) + if uri in self._file_to_datetime_history: + # If the file's uri is in the history, we should sync the file if it has been modified since it was synced + updated_at_from_history = datetime.strptime(self._file_to_datetime_history[uri], self.DATE_TIME_FORMAT) + if file.last_modified < updated_at_from_history: + logger.warning( + f"The file {uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file." + ) + else: + return file.last_modified > updated_at_from_history + return file.last_modified > updated_at_from_history + if self._is_history_full(): + if self._initial_earliest_file_in_history is None: + return True + if file.last_modified > self._initial_earliest_file_in_history.last_modified: + # If the history is partial and the file's datetime is strictly greater than the earliest file in the history, + # we should sync it + return True + elif file.last_modified == self._initial_earliest_file_in_history.last_modified: + # If the history is partial and the file's datetime is equal to the earliest file in the history, + # we should sync it if its uri is strictly greater than the earliest file in the history + return uri > self._initial_earliest_file_in_history.uri + else: + # Otherwise, only sync the file if it has been modified since the start of the time window + return file.last_modified >= self.get_start_time() + else: + # The file is not in the history and the history is complete. We know we need to sync the file + return True diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py b/airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py index 164e6852c65f..3962cbef5458 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/helpers.py @@ -12,16 +12,16 @@ def get_gcs_client(config): - credentials = service_account.Credentials.from_service_account_info(json.loads(config.get("service_account"))) + credentials = service_account.Credentials.from_service_account_info(json.loads(config.service_account)) client = storage.Client(credentials=credentials) return client def get_gcs_blobs(config): client = get_gcs_client(config) - bucket = client.get_bucket(config.get("gcs_bucket")) - blobs = bucket.list_blobs(prefix=config.get("gcs_path")) - # TODO: only support CSV intially. Change this check if implementing other file formats. + bucket = client.get_bucket(config.gcs_bucket) + blobs = bucket.list_blobs(prefix=config.gcs_path) + # TODO: only support CSV initially. Change this check if implementing other file formats. blobs = [blob for blob in blobs if "csv" in blob.name.lower()] return blobs diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/legacy_config_transformer.py b/airbyte-integrations/connectors/source-gcs/source_gcs/legacy_config_transformer.py new file mode 100644 index 000000000000..2a06e386370d --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/legacy_config_transformer.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Dict, Mapping + +from source_gcs.spec import SourceGCSSpec + +from .helpers import get_gcs_blobs, get_stream_name + + +class LegacyConfigTransformer: + """ + Transforms GCS source configs from legacy format to be compatible + with the new GCS source built with the file-based CDK. + """ + + @staticmethod + def _create_stream(blob: Any, legacy_prefix: str) -> Dict[str, Any]: + """ + Create a stream dict from a blob. + + :param blob: The blob from which to create the stream. + :param legacy_prefix: The legacy prefix path on GCS. + :return: A dictionary representing the stream. + """ + return { + "name": get_stream_name(blob), + "legacy_prefix": f"{legacy_prefix}/{blob.name.split('/')[-1]}", + "validation_policy": "Emit Record", + "format": {"filetype": "csv"}, + } + + @classmethod + def convert(cls, legacy_config: SourceGCSSpec) -> Mapping[str, Any]: + """ + Convert a legacy configuration to a transformed configuration. + + :param legacy_config: Legacy configuration of type SourceGCSSpec. + :return: Transformed configuration as a dictionary. + """ + blobs = get_gcs_blobs(legacy_config) + streams = [cls._create_stream(blob, legacy_config.gcs_path) for blob in blobs] + + return {"bucket": legacy_config.gcs_bucket, "service_account": legacy_config.service_account, "streams": streams} diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/source.py b/airbyte-integrations/connectors/source-gcs/source_gcs/source.py index 88975ef6b68c..b33ff2b87eb4 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/source.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/source.py @@ -3,70 +3,28 @@ # -import json -from datetime import datetime -from typing import Dict, Generator +from typing import Any, Mapping -from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import ( - AirbyteCatalog, - AirbyteConnectionStatus, - AirbyteMessage, - AirbyteRecordMessage, - AirbyteStream, - ConfiguredAirbyteCatalog, - Status, - Type, -) -from airbyte_cdk.sources import Source +from airbyte_cdk.config_observation import emit_configuration_as_airbyte_control_message +from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource +from source_gcs.legacy_config_transformer import LegacyConfigTransformer +from source_gcs.spec import SourceGCSSpec -from .helpers import construct_file_schema, get_gcs_blobs, get_stream_name, read_csv_file - -class SourceGCS(Source): - def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus: +class SourceGCS(FileBasedSource): + def read_config(self, config_path: str) -> Mapping[str, Any]: """ - Check to see if a client can be created and list the files in the bucket. + Override the default read_config to transform the legacy config format + into the new one before validating it against the new spec. """ - try: - blobs = get_gcs_blobs(config) - if not blobs: - return AirbyteConnectionStatus(status=Status.FAILED, message="No compatible file found in bucket") - return AirbyteConnectionStatus(status=Status.SUCCEEDED) - except Exception as e: - return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}") - - def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog: - streams = [] - - blobs = get_gcs_blobs(config) - for blob in blobs: - # Read the first 0.1MB of the file to determine schema - df = read_csv_file(blob, read_header_only=True) - stream_name = get_stream_name(blob) - json_schema = construct_file_schema(df) - streams.append(AirbyteStream(name=stream_name, json_schema=json_schema, supported_sync_modes=["full_refresh"])) - - return AirbyteCatalog(streams=streams) - - def read( - self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] - ) -> Generator[AirbyteMessage, None, None]: - logger.info("Start reading") - blobs = get_gcs_blobs(config) - - # Read only selected stream(s) - selected_streams = [configged_stream.stream.name for configged_stream in catalog.streams] - selected_blobs = [blob for blob in blobs if get_stream_name(blob) in selected_streams] - - for blob in selected_blobs: - logger.info(blob.name) - df = read_csv_file(blob) - stream_name = get_stream_name(blob) - for _, row in df.iterrows(): - row_dict = row.to_dict() - row_dict = {k: str(v) for k, v in row_dict.items()} - yield AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream=stream_name, data=row_dict, emitted_at=int(datetime.now().timestamp()) * 1000), - ) + config = super().read_config(config_path) + if not self._is_file_based_config(config): + parsed_legacy_config = SourceGCSSpec(**config) + converted_config = LegacyConfigTransformer.convert(parsed_legacy_config) + emit_configuration_as_airbyte_control_message(converted_config) + return converted_config + return config + + @staticmethod + def _is_file_based_config(config: Mapping[str, Any]) -> bool: + return "streams" in config diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/spec.py b/airbyte-integrations/connectors/source-gcs/source_gcs/spec.py new file mode 100644 index 000000000000..790554aeef61 --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/spec.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from pydantic import BaseModel, Field + + +class SourceGCSSpec(BaseModel): + """ + The SourceGCSSpec class defines the expected input configuration + for the Google Cloud Storage (GCS) source. It uses Pydantic for data + validation through the defined data models. + + Note: When this Spec is changed, ensure that the legacy_config_transformer.py + is also modified to accommodate the changes, as it is responsible for + converting legacy GCS configs into file based configs using the File-Based CDK. + """ + + gcs_bucket: str = Field( + title="GCS bucket", + description="GCS bucket name", + order=0, + ) + + gcs_path: str = Field( + title="GCS Path", + description="GCS path to data", + order=1, + ) + + service_account: str = Field( + title="Service Account Information.", + airbyte_secret=True, + description=( + 'Enter your Google Cloud ' + "service account key in JSON format" + ), + ) diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/spec.yaml b/airbyte-integrations/connectors/source-gcs/source_gcs/spec.yaml deleted file mode 100644 index 6b042e975071..000000000000 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/spec.yaml +++ /dev/null @@ -1,25 +0,0 @@ -documentationUrl: https://docsurl.com -connectionSpecification: - $schema: http://json-schema.org/draft-07/schema# - title: Gcs Spec - type: object - required: - - gcs_bucket - - gcs_path - - service_account - properties: - gcs_bucket: - type: string - title: GCS bucket - description: GCS bucket name - gcs_path: - type: string - title: GCS Path - description: GCS path to data - service_account: - type: string - title: Service Account Information. - description: 'Enter your Google Cloud service account key in JSON format' - airbyte_secret: true - examples: - - '{ "type": "service_account", "project_id": YOUR_PROJECT_ID, "private_key_id": YOUR_PRIVATE_KEY, ... }' diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py new file mode 100644 index 000000000000..3552d75980fd --- /dev/null +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import itertools +import json +import logging +from contextlib import contextmanager +from datetime import datetime, timedelta +from io import IOBase +from typing import Iterable, List, Optional + +import pytz +import smart_open +from airbyte_cdk.sources.file_based.exceptions import ErrorListingFiles, FileBasedSourceError +from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode +from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from google.cloud import storage +from google.oauth2 import service_account +from source_gcs.config import Config + +ERROR_MESSAGE_ACCESS = ( + "We don't have access to {uri}. The file appears to have become unreachable during sync." + "Check whether key {uri} exists in `{bucket}` bucket and/or has proper ACL permissions" +) +FILE_FORMAT = "csv" # TODO: Change if other file formats are implemented + + +class SourceGCSStreamReader(AbstractFileBasedStreamReader): + """ + Stream reader for Google Cloud Storage (GCS). + """ + + def __init__(self): + super().__init__() + self._gcs_client = None + self._config = None + + @property + def config(self) -> Config: + return self._config + + @config.setter + def config(self, value: Config): + assert isinstance(value, Config), "Config must be an instance of the expected Config class." + self._config = value + + def _initialize_gcs_client(self): + if self.config is None: + raise ValueError("Source config is missing; cannot create the GCS client.") + if self._gcs_client is None: + credentials = self._get_credentials() + self._gcs_client = storage.Client(credentials=credentials) + return self._gcs_client + + def _get_credentials(self): + return service_account.Credentials.from_service_account_info(json.loads(self.config.service_account)) + + @property + def gcs_client(self) -> storage.Client: + return self._initialize_gcs_client() + + def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]: + """ + Retrieve all files matching the specified glob patterns in GCS. + """ + try: + start_date = ( + datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) if self.config and self.config.start_date else None + ) + prefixes = [prefix] if prefix else self.get_prefixes_from_globs(globs or []) + globs = globs or [None] + + for prefix, glob in itertools.product(prefixes, globs): + bucket = self.gcs_client.get_bucket(self.config.bucket) + blobs = bucket.list_blobs(prefix=prefix, match_glob=glob) + for blob in blobs: + last_modified = blob.updated.astimezone(pytz.utc).replace(tzinfo=None) + + if FILE_FORMAT in blob.name.lower() and (not start_date or last_modified >= start_date): + uri = blob.generate_signed_url(expiration=timedelta(hours=1), version="v4") + + yield RemoteFile(uri=uri, last_modified=last_modified) + + except Exception as exc: + self._handle_file_listing_error(exc, prefix, logger) + + def _handle_file_listing_error(self, exc: Exception, prefix: str, logger: logging.Logger): + logger.error(f"Error while listing files: {str(exc)}") + raise ErrorListingFiles( + FileBasedSourceError.ERROR_LISTING_FILES, + source="gcs", + bucket=self.config.bucket, + prefix=prefix, + ) from exc + + @contextmanager + def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase: + """ + Open and yield a remote file from GCS for reading. + """ + logger.debug(f"Trying to open {file.uri}") + try: + result = smart_open.open(file.uri, mode=mode.value, encoding=encoding) + except OSError as oe: + logger.warning(ERROR_MESSAGE_ACCESS.format(uri=file.uri, bucket=self.config.bucket)) + logger.exception(oe) + try: + yield result + finally: + result.close() diff --git a/docs/integrations/sources/gcs.md b/docs/integrations/sources/gcs.md index 8b309e911197..e127c9e44cec 100644 --- a/docs/integrations/sources/gcs.md +++ b/docs/integrations/sources/gcs.md @@ -35,7 +35,8 @@ Use the service account ID from above, grant read access to your target bucket. ## Changelog -| Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :------------------- | -| 0.2.0 | 2023-06-26 | [27725](https://github.com/airbytehq/airbyte/pull/27725) | License Update: Elv2 | -| 0.1.0 | 2023-02-16 | [23186](https://github.com/airbytehq/airbyte/pull/23186) | New Source: GCS | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:---------------------------| +| 0.3.0 | 2023-10-11 | [31212](https://github.com/airbytehq/airbyte/pull/31212) | Migrated to file based CDK | +| 0.2.0 | 2023-06-26 | [27725](https://github.com/airbytehq/airbyte/pull/27725) | License Update: Elv2 | +| 0.1.0 | 2023-02-16 | [23186](https://github.com/airbytehq/airbyte/pull/23186) | New Source: GCS |