Skip to content

Commit

Permalink
Source S3: add filter by start date (#35392)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-yermilov-gl authored Feb 20, 2024
1 parent ba38b9d commit bbb06b8
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.5.4
dockerImageTag: 4.5.5
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.5.4"
version = "4.5.5"
name = "source-s3"
description = "Source implementation for S3."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Iterable, List, Optional, Set

import boto3.session
import pendulum
import pytz
import smart_open
from airbyte_cdk.models import FailureType
Expand Down Expand Up @@ -205,7 +206,11 @@ def _page(
continue

for remote_file in self._handle_file(file):
if self.file_matches_globs(remote_file, globs) and remote_file.uri not in seen:
if (
self.file_matches_globs(remote_file, globs)
and self.is_modified_after_start_date(remote_file.last_modified)
and remote_file.uri not in seen
):
seen.add(remote_file.uri)
yield remote_file
else:
Expand All @@ -217,6 +222,12 @@ def _page(
logger.info(f"Finished listing objects from S3 for prefix={prefix}. Found {total_n_keys_for_prefix} objects.")
break

def is_modified_after_start_date(self, last_modified_date: Optional[datetime]) -> bool:
"""Returns True if given date higher or equal than start date or something is missing"""
if not (self.config.start_date and last_modified_date):
return True
return last_modified_date >= pendulum.parse(self.config.start_date).naive()

def _handle_file(self, file):
if file["Key"].endswith(".zip"):
yield from self._handle_zip_file(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import io
import logging
from datetime import datetime
from datetime import datetime, timedelta
from itertools import product
from typing import Any, Dict, List, Optional, Set
from unittest.mock import patch
Expand Down Expand Up @@ -269,3 +269,38 @@ def test_get_iam_s3_client(boto3_client_mock):

# Assertions to validate the s3 client
assert s3_client is not None

@pytest.mark.parametrize(
"start_date, last_modified_date, expected_result",
(
# True when file is new or modified after given start_date
(
datetime.now() - timedelta(days=180),
datetime.now(),
True
),
(
datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
datetime.strptime("2024-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"),
True
),
# False when file is older than given start_date
(
datetime.now(),
datetime.now() - timedelta(days=180),
False
)
)
)
def test_filter_file_by_start_date(start_date: datetime, last_modified_date: datetime, expected_result: bool) -> None:
reader = SourceS3StreamReader()

reader.config = Config(
bucket="test",
aws_access_key_id="test",
aws_secret_access_key="test",
streams=[],
start_date=start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
)

assert expected_result == reader.is_modified_after_start_date(last_modified_date)
9 changes: 7 additions & 2 deletions docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ The Amazon S3 source connector supports the following [sync modes](https://docs.
| Replicate Multiple Streams \(distinct tables\) | Yes |
| Namespaces | No |

## Supported streams

There is no predefined streams. The streams are based on content of your bucket.

## File Compressions

| Compression | Supported? |
Expand Down Expand Up @@ -260,8 +264,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |
| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. |
| 4.5.5 | 2024-02-18 | [35392](https://github.com/airbytehq/airbyte/pull/35392) | Add support filtering by start date |
| 4.5.4 | 2024-02-15 | [35055](https://github.com/airbytehq/airbyte/pull/35055) | Temporarily revert concurrency |
| 4.5.3 | 2024-02-12 | [35164](https://github.com/airbytehq/airbyte/pull/35164) | Manage dependencies with Poetry. |
| 4.5.2 | 2024-02-06 | [34930](https://github.com/airbytehq/airbyte/pull/34930) | Bump CDK version to fix issue when SyncMode is missing from catalog |
| 4.5.1 | 2024-02-02 | [31701](https://github.com/airbytehq/airbyte/pull/31701) | Add `region` support |
| 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |
Expand Down

0 comments on commit bbb06b8

Please sign in to comment.