Skip to content

Commit

Permalink
Tdl 25859/handle s3 files race condition (#67)
Browse files Browse the repository at this point in the history
* changes to handle s3 files modified date race condition

* date format change

* bump version changes

* addressed review comments

---------

Co-authored-by: “rdeshmukh15” <“[email protected]”>
  • Loading branch information
rdeshmukh15 and “rdeshmukh15” authored Nov 20, 2024
1 parent 1461a53 commit 10b9798
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 11 deletions.
4 changes: 1 addition & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
source /usr/local/share/virtualenvs/tap-s3-csv/bin/activate
pip install .
pip install pylint
pylint tap_s3_csv -d duplicate-code,consider-using-f-string,logging-format-interpolation,missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,broad-except,bare-except,unused-variable,unnecessary-comprehension,no-member,deprecated-method,protected-access,broad-exception-raised
pylint tap_s3_csv -d duplicate-code,consider-using-f-string,logging-format-interpolation,missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,broad-except,bare-except,unused-variable,unnecessary-comprehension,no-member,deprecated-method,protected-access,broad-exception-raised,too-many-positional-arguments
- run:
name: 'Unit Tests'
command: |
Expand All @@ -30,8 +30,6 @@ jobs:
- run:
name: 'Integration Tests'
command: |
aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID"
aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY"
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Changelog

## 1.3.9
* Handle S3 files race condition
* [#67](https://github.com/singer-io/tap-s3-csv/pull/67)

## 1.3.8
* Add Missing Type Information in JSON Schema
* [#55](https://github.com/singer-io/tap-s3-csv/pull/62)
* [#62](https://github.com/singer-io/tap-s3-csv/pull/62)

## 1.3.7
* Remove Backoff for Access Denied errors
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-s3-csv',
version='1.3.8',
version='1.3.9',
description='Singer.io tap for extracting CSV files from S3',
author='Stitch',
url='https://singer.io',
Expand Down
10 changes: 7 additions & 3 deletions tap_s3_csv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from datetime import datetime
import json
import sys
import singer

from singer import metadata
from singer import utils as singer_utils
from tap_s3_csv.discover import discover_streams
from tap_s3_csv import s3
from tap_s3_csv.sync import sync_stream
Expand All @@ -27,7 +29,7 @@ def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)


def do_sync(config, catalog, state):
def do_sync(config, catalog, state, sync_start_time):
LOGGER.info('Starting sync.')

for stream in catalog['streams']:
Expand All @@ -43,7 +45,7 @@ def do_sync(config, catalog, state):
singer.write_schema(stream_name, stream['schema'], key_properties)

LOGGER.info("%s: Starting sync", stream_name)
counter_value = sync_stream(config, state, table_spec, stream)
counter_value = sync_stream(config, state, table_spec, stream, sync_start_time)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter_value)

LOGGER.info('Done syncing.')
Expand Down Expand Up @@ -73,6 +75,8 @@ def main():
config = args.config

config['tables'] = validate_table_config(config)
now_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
sync_start_time = singer_utils.strptime_with_tz(now_str)

try:
for page in s3.list_files_in_bucket(config):
Expand All @@ -84,7 +88,7 @@ def main():
if args.discover:
do_discover(args.config)
elif args.properties:
do_sync(config, args.properties, args.state)
do_sync(config, args.properties, args.state, sync_start_time)


if __name__ == '__main__':
Expand Down
8 changes: 5 additions & 3 deletions tap_s3_csv/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

LOGGER = singer.get_logger()

def sync_stream(config, state, table_spec, stream):
def sync_stream(config, state, table_spec, stream, sync_start_time):
table_name = table_spec['table_name']
modified_since = singer_utils.strptime_with_tz(singer.get_bookmark(state, table_name, 'modified_since') or
config['start_date'])
Expand All @@ -40,8 +40,10 @@ def sync_stream(config, state, table_spec, stream):
for s3_file in sorted(s3_files, key=lambda item: item['last_modified']):
records_streamed += sync_table_file(
config, s3_file['key'], table_spec, stream)

state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat())
if s3_file['last_modified'] < sync_start_time:
state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat())
else:
state = singer.write_bookmark(state, table_name, 'modified_since', sync_start_time.isoformat())
singer.write_state(state)

if s3.skipped_files_count:
Expand Down
44 changes: 44 additions & 0 deletions tests/unittests/test_sync_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import unittest
from unittest.mock import patch, MagicMock
from datetime import datetime
from tap_s3_csv import sync_stream
from parameterized import parameterized

class TestSyncStream(unittest.TestCase):

@parameterized.expand([
# Case when file is older than sync_start_time
("file_older_than_sync_start_time", datetime(2024, 8, 13, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-13T12:00:00', 1),
# Case when file is newer than sync_start_time
("file_newer_than_sync_start_time", datetime(2024, 8, 15, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-14T12:00:00', 1),
# Case when file is the same as sync_start_time
("file_same_as_sync_start_time", datetime(2024, 8, 14, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-14T12:00:00', 1)
])
@patch('tap_s3_csv.s3.get_input_files_for_table')
@patch('tap_s3_csv.sync.sync_table_file')
@patch('tap_s3_csv.singer.get_bookmark')
@patch('tap_s3_csv.singer.write_bookmark')
@patch('tap_s3_csv.singer.write_state')
@patch('tap_s3_csv.LOGGER')
def test_sync_stream(self, name, file_last_modified, sync_start_time, expected_bookmark, expected_records_streamed, mock_logger, mock_write_state, mock_write_bookmark, mock_get_bookmark, mock_sync_table_file, mock_get_input_files_for_table):
"""
Parameterized test for the sync_stream function with various file modification times.
"""
mock_get_bookmark.return_value = '2024-01-01T00:00:00Z'
mock_sync_table_file.return_value = 1
mock_write_state.return_value = None

config = {'start_date': '2024-01-01T00:00:00Z'}
state = {}
table_spec = {'table_name': 'test_table'}
stream = None

mock_get_input_files_for_table.return_value = [{'key': 'file1.csv', 'last_modified': file_last_modified}]
mock_write_bookmark.return_value = expected_bookmark

records_streamed = sync_stream(config, state, table_spec, stream, sync_start_time)

self.assertEqual(records_streamed, expected_records_streamed)
mock_write_bookmark.assert_called_with(state, 'test_table', 'modified_since', expected_bookmark)
mock_write_state.assert_called_once()
mock_write_state.reset_mock()

0 comments on commit 10b9798

Please sign in to comment.