From 10b9798c569dd2541ad1c69ab8df16e837833546 Mon Sep 17 00:00:00 2001 From: Rutuja Deshmukh <107538720+rdeshmukh15@users.noreply.github.com> Date: Wed, 20 Nov 2024 13:32:39 +0530 Subject: [PATCH] Tdl 25859/handle s3 files race condition (#67) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * changes to handle s3 files modified date race condition * date format change * bump version changes * addressed review comments --------- Co-authored-by: “rdeshmukh15” <“rutuja.deshmukh@qlik.com”> --- .circleci/config.yml | 4 +-- CHANGELOG.md | 6 +++- setup.py | 2 +- tap_s3_csv/__init__.py | 10 +++++-- tap_s3_csv/sync.py | 8 ++++-- tests/unittests/test_sync_stream.py | 44 +++++++++++++++++++++++++++++ 6 files changed, 63 insertions(+), 11 deletions(-) create mode 100644 tests/unittests/test_sync_stream.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 6a02bc5..dcdc725 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,7 +14,7 @@ jobs: source /usr/local/share/virtualenvs/tap-s3-csv/bin/activate pip install . pip install pylint - pylint tap_s3_csv -d duplicate-code,consider-using-f-string,logging-format-interpolation,missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,broad-except,bare-except,unused-variable,unnecessary-comprehension,no-member,deprecated-method,protected-access,broad-exception-raised + pylint tap_s3_csv -d duplicate-code,consider-using-f-string,logging-format-interpolation,missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,broad-except,bare-except,unused-variable,unnecessary-comprehension,no-member,deprecated-method,protected-access,broad-exception-raised,too-many-positional-arguments - run: name: 'Unit Tests' command: | @@ -30,8 +30,6 @@ jobs: - run: name: 'Integration Tests' command: | - aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID" - aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY" aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh source /usr/local/share/virtualenvs/tap-tester/bin/activate diff --git a/CHANGELOG.md b/CHANGELOG.md index b4b79a3..9522a64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ # Changelog +## 1.3.9 + * Handle S3 files race condition + * [#67](https://github.com/singer-io/tap-s3-csv/pull/67) + ## 1.3.8 * Add Missing Type Information in JSON Schema - * [#55](https://github.com/singer-io/tap-s3-csv/pull/62) + * [#62](https://github.com/singer-io/tap-s3-csv/pull/62) ## 1.3.7 * Remove Backoff for Access Denied errors diff --git a/setup.py b/setup.py index 3dd26d6..05c95d1 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-s3-csv', - version='1.3.8', + version='1.3.9', description='Singer.io tap for extracting CSV files from S3', author='Stitch', url='https://singer.io', diff --git a/tap_s3_csv/__init__.py b/tap_s3_csv/__init__.py index 97ff4af..8c68eb9 100644 --- a/tap_s3_csv/__init__.py +++ b/tap_s3_csv/__init__.py @@ -1,8 +1,10 @@ +from datetime import datetime import json import sys import singer from singer import metadata +from singer import utils as singer_utils from tap_s3_csv.discover import discover_streams from tap_s3_csv import s3 from tap_s3_csv.sync import sync_stream @@ -27,7 +29,7 @@ def stream_is_selected(mdata): return mdata.get((), {}).get('selected', False) -def do_sync(config, catalog, state): +def do_sync(config, catalog, state, sync_start_time): LOGGER.info('Starting sync.') for stream in catalog['streams']: @@ -43,7 +45,7 @@ def do_sync(config, catalog, state): singer.write_schema(stream_name, stream['schema'], key_properties) LOGGER.info("%s: Starting sync", stream_name) - counter_value = sync_stream(config, state, table_spec, stream) + counter_value = sync_stream(config, state, table_spec, stream, sync_start_time) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter_value) LOGGER.info('Done syncing.') @@ -73,6 +75,8 @@ def main(): config = args.config config['tables'] = validate_table_config(config) + now_str = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") + sync_start_time = singer_utils.strptime_with_tz(now_str) try: for page in s3.list_files_in_bucket(config): @@ -84,7 +88,7 @@ def main(): if args.discover: do_discover(args.config) elif args.properties: - do_sync(config, args.properties, args.state) + do_sync(config, args.properties, args.state, sync_start_time) if __name__ == '__main__': diff --git a/tap_s3_csv/sync.py b/tap_s3_csv/sync.py index 2b9a6f8..13eeaaf 100644 --- a/tap_s3_csv/sync.py +++ b/tap_s3_csv/sync.py @@ -20,7 +20,7 @@ LOGGER = singer.get_logger() -def sync_stream(config, state, table_spec, stream): +def sync_stream(config, state, table_spec, stream, sync_start_time): table_name = table_spec['table_name'] modified_since = singer_utils.strptime_with_tz(singer.get_bookmark(state, table_name, 'modified_since') or config['start_date']) @@ -40,8 +40,10 @@ def sync_stream(config, state, table_spec, stream): for s3_file in sorted(s3_files, key=lambda item: item['last_modified']): records_streamed += sync_table_file( config, s3_file['key'], table_spec, stream) - - state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) + if s3_file['last_modified'] < sync_start_time: + state = singer.write_bookmark(state, table_name, 'modified_since', s3_file['last_modified'].isoformat()) + else: + state = singer.write_bookmark(state, table_name, 'modified_since', sync_start_time.isoformat()) singer.write_state(state) if s3.skipped_files_count: diff --git a/tests/unittests/test_sync_stream.py b/tests/unittests/test_sync_stream.py new file mode 100644 index 0000000..65201d1 --- /dev/null +++ b/tests/unittests/test_sync_stream.py @@ -0,0 +1,44 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime +from tap_s3_csv import sync_stream +from parameterized import parameterized + +class TestSyncStream(unittest.TestCase): + + @parameterized.expand([ + # Case when file is older than sync_start_time + ("file_older_than_sync_start_time", datetime(2024, 8, 13, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-13T12:00:00', 1), + # Case when file is newer than sync_start_time + ("file_newer_than_sync_start_time", datetime(2024, 8, 15, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-14T12:00:00', 1), + # Case when file is the same as sync_start_time + ("file_same_as_sync_start_time", datetime(2024, 8, 14, 12, 0, 0), datetime(2024, 8, 14, 12, 0, 0), '2024-08-14T12:00:00', 1) + ]) + @patch('tap_s3_csv.s3.get_input_files_for_table') + @patch('tap_s3_csv.sync.sync_table_file') + @patch('tap_s3_csv.singer.get_bookmark') + @patch('tap_s3_csv.singer.write_bookmark') + @patch('tap_s3_csv.singer.write_state') + @patch('tap_s3_csv.LOGGER') + def test_sync_stream(self, name, file_last_modified, sync_start_time, expected_bookmark, expected_records_streamed, mock_logger, mock_write_state, mock_write_bookmark, mock_get_bookmark, mock_sync_table_file, mock_get_input_files_for_table): + """ + Parameterized test for the sync_stream function with various file modification times. + """ + mock_get_bookmark.return_value = '2024-01-01T00:00:00Z' + mock_sync_table_file.return_value = 1 + mock_write_state.return_value = None + + config = {'start_date': '2024-01-01T00:00:00Z'} + state = {} + table_spec = {'table_name': 'test_table'} + stream = None + + mock_get_input_files_for_table.return_value = [{'key': 'file1.csv', 'last_modified': file_last_modified}] + mock_write_bookmark.return_value = expected_bookmark + + records_streamed = sync_stream(config, state, table_spec, stream, sync_start_time) + + self.assertEqual(records_streamed, expected_records_streamed) + mock_write_bookmark.assert_called_with(state, 'test_table', 'modified_since', expected_bookmark) + mock_write_state.assert_called_once() + mock_write_state.reset_mock()