From bbb06b866f90353df3d28513c51f3f5906fc157c Mon Sep 17 00:00:00 2001 From: "Roman Yermilov [GL]" <86300758+roman-yermilov-gl@users.noreply.github.com> Date: Tue, 20 Feb 2024 19:44:45 +0100 Subject: [PATCH] Source S3: add filter by start date (#35392) --- .../connectors/source-s3/metadata.yaml | 2 +- .../connectors/source-s3/pyproject.toml | 2 +- .../source-s3/source_s3/v4/stream_reader.py | 13 ++++++- .../unit_tests/v4/test_stream_reader.py | 37 ++++++++++++++++++- docs/integrations/sources/s3.md | 9 ++++- 5 files changed, 57 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/metadata.yaml b/airbyte-integrations/connectors/source-s3/metadata.yaml index f74b6ac72f2a..acbed125e654 100644 --- a/airbyte-integrations/connectors/source-s3/metadata.yaml +++ b/airbyte-integrations/connectors/source-s3/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: file connectorType: source definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 - dockerImageTag: 4.5.4 + dockerImageTag: 4.5.5 dockerRepository: airbyte/source-s3 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 githubIssueLabel: source-s3 diff --git a/airbyte-integrations/connectors/source-s3/pyproject.toml b/airbyte-integrations/connectors/source-s3/pyproject.toml index aaa22997c6f0..6f12b067f101 100644 --- a/airbyte-integrations/connectors/source-s3/pyproject.toml +++ b/airbyte-integrations/connectors/source-s3/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "4.5.4" +version = "4.5.5" name = "source-s3" description = "Source implementation for S3." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py index d914690ee70c..411142ef71cc 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/stream_reader.py @@ -9,6 +9,7 @@ from typing import Iterable, List, Optional, Set import boto3.session +import pendulum import pytz import smart_open from airbyte_cdk.models import FailureType @@ -205,7 +206,11 @@ def _page( continue for remote_file in self._handle_file(file): - if self.file_matches_globs(remote_file, globs) and remote_file.uri not in seen: + if ( + self.file_matches_globs(remote_file, globs) + and self.is_modified_after_start_date(remote_file.last_modified) + and remote_file.uri not in seen + ): seen.add(remote_file.uri) yield remote_file else: @@ -217,6 +222,12 @@ def _page( logger.info(f"Finished listing objects from S3 for prefix={prefix}. Found {total_n_keys_for_prefix} objects.") break + def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool: + """Returns True if given date higher or equal than start date or something is missing""" + if not (self.config.start_date and last_modified_date): + return True + return last_modified_date >= pendulum.parse(self.config.start_date).naive() + def _handle_file(self, file): if file["Key"].endswith(".zip"): yield from self._handle_zip_file(file) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py index b1bede862d22..01ad2d926380 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_stream_reader.py @@ -5,7 +5,7 @@ import io import logging -from datetime import datetime +from datetime import datetime, timedelta from itertools import product from typing import Any, Dict, List, Optional, Set from unittest.mock import patch @@ -269,3 +269,38 @@ def test_get_iam_s3_client(boto3_client_mock): # Assertions to validate the s3 client assert s3_client is not None + +@pytest.mark.parametrize( + "start_date, last_modified_date, expected_result", + ( + # True when file is new or modified after given start_date + ( + datetime.now() - timedelta(days=180), + datetime.now(), + True + ), + ( + datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), + datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), + True + ), + # False when file is older than given start_date + ( + datetime.now(), + datetime.now() - timedelta(days=180), + False + ) + ) +) +def test_filter_file_by_start_date(start_date: datetime, last_modified_date: datetime, expected_result: bool) -> None: + reader = SourceS3StreamReader() + + reader.config = Config( + bucket="test", + aws_access_key_id="test", + aws_secret_access_key="test", + streams=[], + start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ") + ) + + assert expected_result == reader.is_modified_after_start_date(last_modified_date) \ No newline at end of file diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index d225bba5909a..ff8263e911c5 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -88,6 +88,10 @@ The Amazon S3 source connector supports the following [sync modes](https://docs. | Replicate Multiple Streams \(distinct tables\) | Yes | | Namespaces | No | +## Supported streams + +There is no predefined streams. The streams are based on content of your bucket. + ## File Compressions | Compression | Supported? | @@ -260,8 +264,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [ | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------| -| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency | -| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. | +| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date | +| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency | +| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. | | 4.5.2 | 2024-02-06 | [34930](https://github.com/airbytehq/airbyte/pull/34930) | Bump CDK version to fix issue when SyncMode is missing from catalog | | 4.5.1 | 2024-02-02 | [31701](https://github.com/airbytehq/airbyte/pull/31701) | Add `region` support | | 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |