From b1aae062011a375b73aaed6bf21f4762a136586f Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 13 Oct 2023 01:25:35 +0200 Subject: [PATCH] Added start date and glob filtering --- .../integration_tests/configured_catalog.json | 9 --- .../connectors/source-gcs/setup.py | 8 +-- .../source-gcs/source_gcs/config.py | 63 ++++++++++++++++--- .../source-gcs/source_gcs/stream_reader.py | 56 +++++++++++------ 4 files changed, 92 insertions(+), 44 deletions(-) diff --git a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json index 9836beb15b42..02d3a387dea7 100644 --- a/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-gcs/integration_tests/configured_catalog.json @@ -8,15 +8,6 @@ }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "example_2", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" } ] } diff --git a/airbyte-integrations/connectors/source-gcs/setup.py b/airbyte-integrations/connectors/source-gcs/setup.py index 5faf371ab5d3..1fefcdf74e20 100644 --- a/airbyte-integrations/connectors/source-gcs/setup.py +++ b/airbyte-integrations/connectors/source-gcs/setup.py @@ -7,15 +7,11 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk>=0.51.17", - "google-cloud-storage==2.5.0", + "google-cloud-storage==2.12.0", "pandas==1.5.3", "pyarrow==12.0.1", - "smart-open[s3]==5.1.0", - "wcmatch==8.4", - "dill==0.3.4", - "pytz", "fastavro==1.4.11", - "python-snappy==0.6.1", + "smart-open[s3]==5.1.0" ] TEST_REQUIREMENTS = [ diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py index 2e1bbfab44c4..7a63a17088aa 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/config.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/config.py @@ -2,10 +2,30 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from typing import List, Optional + from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec +from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat +from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from pydantic import AnyUrl, Field +class SourceGCSStreamConfig(FileBasedStreamConfig): + name: str = Field(title="Name", description="The name of the stream.", order=0) + globs: Optional[List[str]] = Field( + title="Globs", + description="The pattern used to specify which files should be selected from the file system. For more information on glob " + 'pattern matching look here.', + order=1, + ) + format: CsvFormat = Field( + title="Format", + description="The configuration options that are used to alter how to read incoming files that deviate from " + "the standard formatting.", + order=2, + ) + + class Config(AbstractFileBasedSpec): """ NOTE: When this Spec is changed, legacy_config_transformer.py must also be @@ -13,21 +33,44 @@ class Config(AbstractFileBasedSpec): legacy GCS configs into file based configs using the File-Based CDK. """ - @classmethod - def documentation_url(cls) -> AnyUrl: - """ - Returns the documentation URL. - """ - return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https") - - bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=0) - service_account: str = Field( - title="Service Account Information.", + title="Service Account Information", airbyte_secret=True, description=( "Enter your Google Cloud " '' "service account key in JSON format" ), + order=0, ) + + bucket: str = Field(title="Bucket", description="Name of the GCS bucket where the file(s) exist.", order=1) + + streams: List[SourceGCSStreamConfig] = Field( + title="The list of streams to sync", + description=( + "Each instance of this configuration defines a stream. " + "Use this to define which files belong in the stream, their format, and how they should be " + "parsed and validated. When sending data to warehouse destination such as Snowflake or " + "BigQuery, each stream is a separate table." + ), + order=2, + ) + + @classmethod + def documentation_url(cls) -> AnyUrl: + """ + Returns the documentation URL. + """ + return AnyUrl("https://docs.airbyte.com/integrations/sources/gcs", scheme="https") + + @staticmethod + def replace_enum_allOf_and_anyOf(schema): + """ + Replace allOf with anyOf when appropriate in the schema with one value. + """ + objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] + if len(objects_to_check.get("allOf", [])) == 1: + objects_to_check["anyOf"] = objects_to_check.pop("allOf") + + return super(Config, Config).replace_enum_allOf_and_anyOf(schema) diff --git a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py index 2d23354f4bb0..3552d75980fd 100644 --- a/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py +++ b/airbyte-integrations/connectors/source-gcs/source_gcs/stream_reader.py @@ -2,10 +2,11 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import itertools import json import logging from contextlib import contextmanager -from datetime import timedelta +from datetime import datetime, timedelta from io import IOBase from typing import Iterable, List, Optional @@ -33,6 +34,7 @@ class SourceGCSStreamReader(AbstractFileBasedStreamReader): def __init__(self): super().__init__() self._gcs_client = None + self._config = None @property def config(self) -> Config: @@ -43,38 +45,54 @@ def config(self, value: Config): assert isinstance(value, Config), "Config must be an instance of the expected Config class." self._config = value - @property - def gcs_client(self) -> storage.Client: + def _initialize_gcs_client(self): if self.config is None: raise ValueError("Source config is missing; cannot create the GCS client.") if self._gcs_client is None: - credentials = service_account.Credentials.from_service_account_info(json.loads(self.config.service_account)) + credentials = self._get_credentials() self._gcs_client = storage.Client(credentials=credentials) return self._gcs_client + def _get_credentials(self): + return service_account.Credentials.from_service_account_info(json.loads(self.config.service_account)) + + @property + def gcs_client(self) -> storage.Client: + return self._initialize_gcs_client() + def get_matching_files(self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]: """ Retrieve all files matching the specified glob patterns in GCS. """ try: - bucket = self.gcs_client.get_bucket(self.config.bucket) - remote_files = bucket.list_blobs(prefix=prefix) + start_date = ( + datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) if self.config and self.config.start_date else None + ) + prefixes = [prefix] if prefix else self.get_prefixes_from_globs(globs or []) + globs = globs or [None] + + for prefix, glob in itertools.product(prefixes, globs): + bucket = self.gcs_client.get_bucket(self.config.bucket) + blobs = bucket.list_blobs(prefix=prefix, match_glob=glob) + for blob in blobs: + last_modified = blob.updated.astimezone(pytz.utc).replace(tzinfo=None) - for remote_file in remote_files: - if FILE_FORMAT in remote_file.name.lower(): - yield RemoteFile( - uri=remote_file.generate_signed_url(expiration=timedelta(hours=1), version="v4"), - last_modified=remote_file.updated.astimezone(pytz.utc).replace(tzinfo=None), - ) + if FILE_FORMAT in blob.name.lower() and (not start_date or last_modified >= start_date): + uri = blob.generate_signed_url(expiration=timedelta(hours=1), version="v4") + + yield RemoteFile(uri=uri, last_modified=last_modified) except Exception as exc: - logger.error(f"Error while listing files: {str(exc)}") - raise ErrorListingFiles( - FileBasedSourceError.ERROR_LISTING_FILES, - source="gcs", - bucket=self.config.bucket, - prefix=prefix, - ) from exc + self._handle_file_listing_error(exc, prefix, logger) + + def _handle_file_listing_error(self, exc: Exception, prefix: str, logger: logging.Logger): + logger.error(f"Error while listing files: {str(exc)}") + raise ErrorListingFiles( + FileBasedSourceError.ERROR_LISTING_FILES, + source="gcs", + bucket=self.config.bucket, + prefix=prefix, + ) from exc @contextmanager def open_file(self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> IOBase: