diff --git a/tests/sfbase.py b/tests/sfbase.py index 86753188..7d5cf32e 100644 --- a/tests/sfbase.py +++ b/tests/sfbase.py @@ -928,11 +928,9 @@ def setUpClass(cls): def get_custom_fields(self, found_catalogs, conn_id): """ List all the custom_fields for each stream""" custom_fields = {} - for stream in self.streams_to_test(): - - catalog = [catalog for catalog in found_catalogs - if catalog["stream_name"] == stream][0] + for catalog in found_catalogs: schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])["annotated-schema"] + stream = catalog['stream_name'] custom_fields[stream] = {key for key in schema['properties'].keys() if key.endswith("__c")} return custom_fields @@ -949,6 +947,18 @@ def get_non_custom_fields(self, found_catalogs, conn_id): and schema['properties'][key]['inclusion'] != "unsupported"} return non_custom_fields + def get_select_by_default_fields(self, found_catalogs, conn_id): + """ List all the selected_by_default fields for each stream""" + select_by_default_fields = {} + other_fields = {} + for catalog in found_catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])['metadata'] + stream = catalog['stream_name'] + select_by_default_fields[stream] = {item['breadcrumb'][-1] for item in schema + if item['breadcrumb'] != [] and + item['metadata'].get('selected-by-default') == True} + return select_by_default_fields + @staticmethod def count_custom_non_custom_fields(fields): custom = 0 @@ -1172,6 +1182,17 @@ def get_full_table_streams(self): } return full_table_streams + def get_custom_fields_streams(self): + custom_field_streams = { + 'Account', + 'Case', + 'Contact', + 'Lead', + 'Opportunity', + 'TapTester__c', + } + return custom_field_streams + def switchable_streams(self): streams = self.expected_stream_names().difference(self.get_full_table_streams()) final_list = streams.intersection(self.get_streams_with_data()) diff --git a/tests/test_salesforce_all_fields_custom.py b/tests/test_salesforce_all_fields_custom.py index c0c48d61..1520ddbc 100644 --- a/tests/test_salesforce_all_fields_custom.py +++ b/tests/test_salesforce_all_fields_custom.py @@ -12,7 +12,8 @@ class SFCustomFieldsTest(AllFieldsTest, SFBaseTest): def name(): return "tt_sf_all_fields_custom" - streams_to_test = SFBaseTest.streams_to_test + def streams_to_test(self): + return self.get_custom_fields_streams() def streams_to_selected_fields(self): found_catalogs = AllFieldsTest.found_catalogs @@ -21,7 +22,11 @@ def streams_to_selected_fields(self): return custom_fields def test_all_fields_for_streams_are_replicated(self): - for stream in self.streams_to_test(): + selected_streams = self.streams_to_test() + actual_custom_field_streams = {key for key in self.selected_fields.keys() if self.selected_fields.get(key,set())} + self.assertSetEqual( selected_streams, actual_custom_field_streams, + msg = f"More streams have custom fields actual_custom_field_streams.diff(selected_streams)") + for stream in selected_streams: with self.subTest(stream=stream): automatic_fields = self.expected_automatic_fields(stream) expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields) diff --git a/tests/test_salesforce_all_fields_custom_rest.py b/tests/test_salesforce_all_fields_custom_rest.py index 74f017ef..e4877dad 100644 --- a/tests/test_salesforce_all_fields_custom_rest.py +++ b/tests/test_salesforce_all_fields_custom_rest.py @@ -12,7 +12,8 @@ class SFCustomFieldsTestRest(AllFieldsTest, SFBaseTest): def name(): return "tt_sf_all_fields_custom_rest" - streams_to_test = SFBaseTest.streams_to_test + def streams_to_test(self): + return self.get_custom_fields_streams() def streams_to_selected_fields(self): found_catalogs = AllFieldsTest.found_catalogs @@ -21,7 +22,12 @@ def streams_to_selected_fields(self): return custom_fields def test_all_fields_for_streams_are_replicated(self): - for stream in self.streams_to_test(): + + selected_streams = self.streams_to_test() + actual_custom_field_streams = {key for key in self.selected_fields.keys() if self.selected_fields.get(key,set())} + self.assertSetEqual( selected_streams, actual_custom_field_streams, + msg = f"More streams have custom fields actual_custom_field_streams.diff(selected_streams)") + for stream in selected_streams: with self.subTest(stream=stream): automatic_fields = self.expected_automatic_fields(stream) expected_custom_fields = self.selected_fields.get(stream, set()).union(automatic_fields) diff --git a/tests/test_salesforce_all_fields_non_custom.py b/tests/test_salesforce_all_fields_non_custom.py index 51f6ce6b..e1d7bb48 100644 --- a/tests/test_salesforce_all_fields_non_custom.py +++ b/tests/test_salesforce_all_fields_non_custom.py @@ -13,7 +13,16 @@ class SFNonCustomFieldsTest(AllFieldsTest, SFBaseTest): def name(): return "tt_sf_all_fields_non_custom" - streams_to_test = SFBaseTest.streams_to_test + def streams_to_test(self): + return { + 'Account', + 'ActiveProfileMetric', + 'Calendar', + 'ContentWorkspacePermission', + 'CampaignMemberStatus', + 'Community', + } + def streams_to_selected_fields(self): found_catalogs = AllFieldsTest.found_catalogs @@ -22,7 +31,7 @@ def streams_to_selected_fields(self): return non_custom_fields def test_non_custom_fields(self): - for stream in self.streams_to_selected_fields(): + for stream in self.streams_to_test: with self.subTest(stream=stream): expected_non_custom_fields = self.selected_fields.get(stream,set()) replicated_non_custom_fields = self.actual_fields.get(stream, set()) diff --git a/tests/test_salesforce_all_fields_non_custom_rest.py b/tests/test_salesforce_all_fields_non_custom_rest.py index aa9aad0e..d529da43 100644 --- a/tests/test_salesforce_all_fields_non_custom_rest.py +++ b/tests/test_salesforce_all_fields_non_custom_rest.py @@ -14,7 +14,15 @@ class SFNonCustomFieldsTestRest(AllFieldsTest, SFBaseTest): def name(): return "tt_sf_all_fields_non_custom_rest" - streams_to_test = SFBaseTest.streams_to_test + def streams_to_test(self): + return { + 'Case', + 'PricebookEntry', + 'Profile', + 'PermissionSet', + 'Product2', + 'PromptAction', + } def streams_to_selected_fields(self): found_catalogs = AllFieldsTest.found_catalogs @@ -23,7 +31,7 @@ def streams_to_selected_fields(self): return non_custom_fields def test_non_custom_fields(self): - for stream in self.streams_to_selected_fields(): + for stream in self.streams_to_test: with self.subTest(stream=stream): expected_non_custom_fields = self.selected_fields.get(stream,set()) replicated_non_custom_fields = self.actual_fields.get(stream, set()) diff --git a/tests/test_salesforce_bookmarks.py b/tests/test_salesforce_bookmarks.py index 7df3fde8..558d707f 100644 --- a/tests/test_salesforce_bookmarks.py +++ b/tests/test_salesforce_bookmarks.py @@ -1,165 +1,41 @@ -import unittest -import datetime -import dateutil.parser -import pytz +from tap_tester.base_suite_tests.bookmark_test import BookmarkTest +from sfbase import SFBaseTest -from tap_tester import runner, menagerie, connections -from base import SalesforceBaseTest +class SFBookmarkTest(BookmarkTest, SFBaseTest): - -class SalesforceBookmarks(SalesforceBaseTest): + salesforce_api = 'BULK' @staticmethod def name(): - return "tap_tester_salesforce_bookmarks" + return "tt_sf_bookmarks" @staticmethod - def expected_sync_streams(): + def streams_to_test(): return { - 'Account', - 'Contact', - # 'Lead', # TODO grab the dates that exist - # 'Opportunity', # cannot test, dates are 1 s apart 'User', + 'Publisher', + 'AppDefinition', } - @staticmethod - def convert_state_to_utc(date_str): - """ - Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to - a string formatted utc datetime, - in order to compare aginast json formatted datetime values - """ - date_object = dateutil.parser.parse(date_str) - date_object_utc = date_object.astimezone(tz=pytz.UTC) - return datetime.datetime.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ") - - def calculated_states_by_stream(self, current_state): - """ - Look at the bookmarks from a previous sync and set a new bookmark - value that is 1 day prior. This ensures the subsequent sync will replicate - at least 1 record but, fewer records than the previous sync. - """ - - stream_to_current_state = {stream : bookmark.get(self.expected_replication_keys()[stream].pop()) - for stream, bookmark in current_state['bookmarks'].items()} - stream_to_calculated_state = {stream: "" for stream in self.expected_sync_streams()} - - timedelta_by_stream = {stream: [1,0,0] # {stream_name: [days, hours, minutes], ...} - for stream in self.expected_streams()} - timedelta_by_stream['Account'] = [0, 0, 2] - - for stream, state in stream_to_current_state.items(): - days, hours, minutes = timedelta_by_stream[stream] - - # convert state from string to datetime object - state_as_datetime = dateutil.parser.parse(state) - calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes) - # convert back to string and format - calculated_state = datetime.datetime.strftime(calculated_state_as_datetime, "%Y-%m-%dT%H:%M:%S.000000Z") - stream_to_calculated_state[stream] = calculated_state - - return stream_to_calculated_state - - def test_run(self): - self.salesforce_api = 'BULK' - - replication_keys = self.expected_replication_keys() - - # SYNC 1 - conn_id = connections.ensure_connection(self) - - # Run in check mode - found_catalogs = self.run_and_verify_check_mode(conn_id) - - # Select only the expected streams tables - expected_streams = self.expected_sync_streams() - catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] - self.select_all_streams_and_fields(conn_id, catalog_entries) - streams_replication_methods = {stream: self.INCREMENTAL - for stream in expected_streams} - self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods) - - # Run a sync job using orchestrator - first_sync_record_count = self.run_and_verify_sync(conn_id) - first_sync_records = runner.get_records_from_target_output() - first_sync_bookmarks = menagerie.get_state(conn_id) - - # UPDATE STATE BETWEEN SYNCS - new_states = {'bookmarks': dict()} - for stream, new_state in self.calculated_states_by_stream(first_sync_bookmarks).items(): - replication_key = list(replication_keys[stream])[0] - new_states['bookmarks'][stream] = {replication_key: new_state} - menagerie.set_state(conn_id, new_states) - - # SYNC 2 - second_sync_record_count = self.run_and_verify_sync(conn_id) - second_sync_records = runner.get_records_from_target_output() - second_sync_bookmarks = menagerie.get_state(conn_id) - - # Test by stream - for stream in expected_streams: - with self.subTest(stream=stream): - # record counts - first_sync_count = first_sync_record_count.get(stream, 0) - second_sync_count = second_sync_record_count.get(stream, 0) - - # data from record messages - first_sync_messages = [record.get('data') for record in - first_sync_records.get(stream).get('messages') - if record.get('action') == 'upsert'] - second_sync_messages = [record.get('data') for record in - second_sync_records.get(stream).get('messages') - if record.get('action') == 'upsert'] - - # replication key for comparing data - self.assertEqual(1, len(list(replication_keys[stream])), - msg="Compound primary keys require a change to test expectations") - replication_key = list(replication_keys[stream])[0] - - # bookmarked states (top level objects) - first_bookmark_key_value = first_sync_bookmarks.get('bookmarks').get(stream) - second_bookmark_key_value = second_sync_bookmarks.get('bookmarks').get(stream) - - # Verify the first sync sets a bookmark of the expected form - self.assertIsNotNone(first_bookmark_key_value) - self.assertIsNotNone(first_bookmark_key_value.get(replication_key)) - - # Verify the second sync sets a bookmark of the expected form - self.assertIsNotNone(second_bookmark_key_value) - self.assertIsNotNone(second_bookmark_key_value.get(replication_key)) - - # bookmarked states (actual values) - first_bookmark_value = first_bookmark_key_value.get(replication_key) - second_bookmark_value = second_bookmark_key_value.get(replication_key) - # bookmarked values as utc for comparing against records - first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) - second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) - - # Verify the second sync bookmark is Equal to the first sync bookmark - self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test - - # Verify the second sync records respect the previous (simulated) bookmark value - simulated_bookmark_value = new_states['bookmarks'][stream][replication_key] - for record in second_sync_messages: - replication_key_value = record.get(replication_key) - self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, - msg="Second sync records do not repect the previous bookmark.") - - # Verify the first sync bookmark value is the max replication key value for a given stream - for record in first_sync_messages: - replication_key_value = record.get(replication_key) - self.assertLessEqual(replication_key_value, first_bookmark_value_utc, - msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced") - - # Verify the second sync bookmark value is the max replication key value for a given stream - for record in second_sync_messages: - replication_key_value = record.get(replication_key) - self.assertLessEqual(replication_key_value, second_bookmark_value_utc, - msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced") - - # Verify the number of records in the 2nd sync is less then the first - self.assertLess(second_sync_count, first_sync_count) + bookmark_format ="%Y-%m-%dT%H:%M:%S.%fZ" + + initial_bookmarks = {} + streams_replication_method = {} + def streams_replication_methods(self): + streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs + if 'forced-replication-method' not in catalog['metadata'].keys()] + if len(streams_to_set_rep_method) > 0: + self.streams_replication_method = {stream: 'INCREMENTAL' + for stream in streams_to_set_rep_method} + return self.streams_replication_method + + def adjusted_expected_replication_method(self): + streams_to_set_rep_method = [catalog['tap_stream_id'] for catalog in BookmarkTest.test_catalogs + if 'forced-replication-method' not in catalog['metadata'].keys()] + expected_replication_methods = self.expected_replication_method() + if self.streams_replication_method: + for stream in streams_to_set_rep_method : + expected_replication_methods[stream] = self.streams_replication_method[stream] + return expected_replication_methods + return expected_replication_methods - # Verify at least 1 record was replicated in the second sync - self.assertGreater(second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) diff --git a/tests/test_salesforce_select_by_default.py b/tests/test_salesforce_select_by_default.py new file mode 100644 index 00000000..4780a016 --- /dev/null +++ b/tests/test_salesforce_select_by_default.py @@ -0,0 +1,51 @@ +from tap_tester import connections, runner, menagerie + +from sfbase import SFBaseTest + + +class SFSelectByDefault(SFBaseTest): + @staticmethod + def name(): + return "tt_sf_select_by_default" + + @staticmethod + def streams_to_test(): + return { + 'Account', # "2021-11-11T03:50:52.000000Z" + } + + def setUp(self): + self.salesforce_api = 'BULK' + self.start_date = "2021-11-11T00:00:00Z" + # instantiate connection + SFSelectByDefault.conn_id = connections.ensure_connection(self) + + # run check mode + SFSelectByDefault.found_catalogs = self.run_and_verify_check_mode(self.conn_id) + + # table and field selection + test_catalogs = [catalog for catalog in self.found_catalogs + if catalog.get('tap_stream_id') in self.streams_to_test()] + + SFSelectByDefault.test_streams = self.streams_to_test() + + self.perform_and_verify_table_selection(self.conn_id, test_catalogs) + # run initial sync + SFSelectByDefault.synced_records = runner.get_records_from_target_output() + SFSelectByDefault.actual_fields = runner.examine_target_output_for_fields() + + def test_no_unexpected_streams_replicated(self): + # gather results + synced_stream_names = set(self.synced_records.keys()) + self.assertSetEqual(synced_stream_names, self.test_streams) + + def test_default_fields_for_streams_are_replicated(self): + expected_rep_keys = self.get_select_by_default_fields(self.found_catalogs, self.conn_id) + for stream in self.test_streams: + with self.subTest(stream=stream): + # gather results + fields_replicated = self.actual_fields.get(stream, set()) + # verify that all fields are sent to the target + # test the combination of all records + self.assertSetEqual(fields_replicated, expected_rep_keys[stream], + logging=f"verify all fields are replicated for stream {stream}")