Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch replication methods #173

Merged
merged 11 commits into from
Nov 16, 2023
49 changes: 48 additions & 1 deletion tests/sfbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ def expected_metadata():
'LoginAsEvent': incremental_created_date, # new
'LoginEvent': default_full, # new
'LoginGeo': default, # new
'LoginHistory': {BaseCase.PRIMARY_KEYS: {'Id'}, BaseCase.REPLICATION_KEYS: {'LoginTime'},BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,},
'LoginHistory': {BaseCase.PRIMARY_KEYS: {'Id'}, BaseCase.REPLICATION_KEYS:
{'LoginTime'},BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,},
'LoginIp': incremental_created_date,
'LogoutEvent': default_full, # new
# 'MLField': default, # removed # 6/13/2022 added back 7/10/2022, removed 06/12/2023
Expand Down Expand Up @@ -1130,6 +1131,52 @@ def get_unsupported_by_bulk_api(self):

return unsupported_streams_bulk_only | unsupported_streams_rest

def get_full_table_streams(self):
full_table_streams = {
'EventBusSubscriber',
'ContentFolderLink',
'TabDefinition',
'ReportEvent',
'FormulaFunctionCategory',
'FormulaFunction',
'UserSetupEntityAccess',
'AuraDefinitionBundleInfo',
'DatacloudAddress',
'ContentTagSubscription',
'FeedAttachment',
'EmbeddedServiceDetail',
'UriEvent',
'DashboardComponent',
'RecentlyViewed',
'IdpEventLog',
'PlatformEventUsageMetric',
'UserPermissionAccess',
'LightningUriEvent',
'Publisher',
'CronJobDetail',
'EmbeddedServiceLabel',
'DatacloudDandBCompany',
'ContentDocumentSubscription',
'ThirdPartyAccountLink',
'ContentUserSubscription',
'LogoutEvent',
'ContentWorkspaceSubscription',
'LoginEvent',
'UserAppMenuItem',
'AppDefinition',
'DatacloudContact',
'SalesStore',
'DatacloudCompany',
'FormulaFunctionAllowedType',
'ApexPageInfo'
}
return full_table_streams

def switchable_streams(self):
streams = self.expected_stream_names().difference(self.get_full_table_streams())
final_list = streams.intersection(self.get_streams_with_data())
return final_list

def is_unsupported_by_rest_api(self, stream):
"""returns True if stream is unsupported by REST API"""

Expand Down
109 changes: 109 additions & 0 deletions tests/test_salesforce_switch_rep_method_ft_incrmntl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from tap_tester import runner, menagerie, connections
from sfbase import SFBaseTest


class SFSwitchRepMethodIncrmntl(SFBaseTest):

start_date = '2000-11-11T00:00:00Z'#to get max data available for testing
@staticmethod
def name():
return "tt_sf_table_switch_rep_method_ft_incrmntl"


def expected_sync_streams(self):
streams = self.switchable_streams() - {'FlowDefinitionView','EntityDefinition'}
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
# Excluded the above two streams due to the bug TDL-24514
return self.partition_streams(streams)


def test_run(self):
self.salesforce_api = 'BULK'

replication_keys = self.expected_replication_keys()
primary_keys = self.expected_primary_keys()
# SYNC 1
conn_id = connections.ensure_connection(self)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
expected_streams = self.expected_sync_streams()
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.select_all_streams_and_fields(conn_id, catalog_entries)
streams_replication_methods = {stream: self.FULL_TABLE
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# Run a sync job using orchestrator
fulltbl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
fulltbl_sync_records = runner.get_records_from_target_output()

fulltbl_sync_bookmarks = menagerie.get_state(conn_id)

#Switch the replication method from full table to Incremental
streams_replication_methods = {stream: self.INCREMENTAL
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# SYNC 2
incrmntl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
incrmntl_sync_records = runner.get_records_from_target_output()
incrmntl_sync_bookmarks = menagerie.get_state(conn_id)

# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):
# record counts
fulltbl_sync_count = fulltbl_sync_record_count.get(stream, 0)
incrmntl_sync_count = incrmntl_sync_record_count.get(stream, 0)
replication_key = list(replication_keys[stream])[0]

# Verify at least 1 record was replicated in the Incrmental sync
self.assertGreater(incrmntl_sync_count, 0,
msg="We are not fully testing bookmarking for {}".format(stream))
# data from record messages
"""
If implementing in tap-tester framework the primary key implementation should account
for compound primary keys
"""
self.assertEqual(1, len(list(primary_keys[stream])),
msg="Compound primary keys require a change to test expectations")
primary_key = list(primary_keys[stream])[0]
HarrisonMarcRose marked this conversation as resolved.
Show resolved Hide resolved
fulltbl_sync_messages = [record['data'] for record in
fulltbl_sync_records.get(stream, {}).get('messages')
if record.get('action') == 'upsert']
filtered_fulltbl_sync_messages = [message for message in fulltbl_sync_messages
if message[replication_key] >= self.start_date]
fulltbl_primary_keys = {message[primary_key] for message in filtered_fulltbl_sync_messages}
incrmntl_sync_messages = [record['data'] for record in
incrmntl_sync_records.get(stream, {}).get('messages')
if record.get('action') == 'upsert']
incrmntl_primary_keys = {message[primary_key] for message in incrmntl_sync_messages}

#Verify all records are synced in the second sync
self.assertTrue(fulltbl_primary_keys.issubset(incrmntl_primary_keys))
HarrisonMarcRose marked this conversation as resolved.
Show resolved Hide resolved

"""
Modify the the activate version assertion accordingly based on the outcome of BUG #TDL-24467
if needed
"""
#verify that the last message is not a activate version message for incremental sync
self.assertNotEqual('activate_version', incrmntl_sync_records[stream]['messages'][-1]['action'])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The activate_version message tells the loaders it is okay to go ahead and load data into the table. If we have sent all of the data, it is okay to start loading. So although this message may not be necessary, it would have no negative consequences.

Also, If they take the appropriate action and fix it so that they do not send the activate_version as the first message for a new table version for incremental, waiting until all data is ready before deleting the previous full table records, Then it would be necessary/critical to have this message be sent.

So in the final version we should assert that the only the last message is an activate version, and as a workaround we should verify that there is an activate message being sent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a qa task in the DOD of the ticket to modify the assertions based on the final implementation.


#verify that the table version incremented after every sync
self.assertGreater(incrmntl_sync_records[stream]['table_version'],
fulltbl_sync_records[stream]['table_version'],
msg = "Table version is not incremented after a successful sync")

# bookmarked states (top level objects)
incrmntl_bookmark_key_value = incrmntl_sync_bookmarks.get('bookmarks', {}).get(stream)

# bookmarked states (actual values)
incrmntl_bookmark_value = incrmntl_bookmark_key_value.get(replication_key)

# Verify the incremental sync sets a bookmark of the expected form
self.assertIsNotNone(incrmntl_bookmark_key_value)

#verify that bookmarks are present after switching to Incremental rep method
self.assertIsNotNone(incrmntl_bookmark_value)
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
109 changes: 109 additions & 0 deletions tests/test_salesforce_switch_rep_method_incrmntl_ft.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from tap_tester import runner, menagerie, connections
from sfbase import SFBaseTest


class SFSwitchRepMethodFulltable(SFBaseTest):

start_date = '2000-01-23T00:00:00Z'
@staticmethod
def name():
return "tt_sf_table_switch_rep_method_incrmntl_ft"

def expected_sync_streams(self):
streams = self.switchable_streams() - {'FlowDefinitionView', 'EntityDefinition'}
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
# Excluded the above two streams due to the bug TDL-24514
return self.partition_streams(streams)

def test_run(self):
self.salesforce_api = 'REST'
replication_keys = self.expected_replication_keys()
primary_keys = self.expected_primary_keys()
# SYNC 1
conn_id = connections.ensure_connection(self)

# Run in check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Select only the expected streams tables
expected_streams = self.expected_sync_streams()
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]
self.select_all_streams_and_fields(conn_id, catalog_entries)
streams_replication_methods = {stream: self.INCREMENTAL
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# Run a sync job using orchestrator
incrmntl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
incrmntl_sync_records = runner.get_records_from_target_output()

incrmntl_sync_bookmarks = menagerie.get_state(conn_id)

#Switch the replication method from incremental to full table
streams_replication_methods = {stream: self.FULL_TABLE
for stream in expected_streams}
self.set_replication_methods(conn_id, catalog_entries, streams_replication_methods)

# SYNC 2
fulltbl_sync_record_count = self.run_and_verify_sync_mode(conn_id)
fulltbl_sync_records = runner.get_records_from_target_output()
fulltbl_sync_bookmarks = menagerie.get_state(conn_id)

# Test by stream
for stream in expected_streams:
with self.subTest(stream=stream):
# record counts
incrmntl_sync_count = incrmntl_sync_record_count.get(stream, 0)
fulltbl_sync_count = fulltbl_sync_record_count.get(stream, 0)
replication_key = list(replication_keys[stream])[0]
# Verify at least 1 record was replicated in the fulltbl sync
self.assertGreater(fulltbl_sync_count, 0,
msg="We are not fully testing bookmarking for {}".format(stream))

# data from record messages
"""
If implementing in tap-tester framework the primary key implementation should account
for compound primary keys
"""
self.assertEqual(1, len(list(primary_keys[stream])),
msg="Compound primary keys require a change to test expectations")

primary_key = list(primary_keys[stream])[0]
incrmntl_sync_messages = [record['data'] for record in
incrmntl_sync_records.get(stream, {}).get('messages')
if record.get('action') == 'upsert']
incrmntl_primary_keys = {message[primary_key] for message in incrmntl_sync_messages}
fulltbl_sync_messages = [record['data'] for record in
fulltbl_sync_records.get(stream, {}).get('messages')
if record.get('action') == 'upsert']
filtered_fulltbl_sync_messages = [message for message in fulltbl_sync_messages
if message[replication_key] >= self.start_date]
fulltbl_primary_keys = {message[primary_key] for message in filtered_fulltbl_sync_messages}

#Verify all records are synced in the second sync
self.assertTrue(incrmntl_primary_keys.issubset(fulltbl_primary_keys))

#Verify that the fulltable sync count is greater or equal to incrmental sync count
self.assertGreaterEqual(fulltbl_sync_count, incrmntl_sync_count,
msg = "Full table sync didn't fetch all the records")
"""
Modify the the activate version assertion accordingly based on the outcome of BUG #TDL-24467
if needed
"""
#verify that last messages of every stream is the activate version message
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual('activate_version', fulltbl_sync_records[stream]['messages'][-1]
['action'])

#verify that table version is present for a fulltable sync
self.assertIsNotNone(fulltbl_sync_records[stream]['table_version'])

#Verify that the table version is incremented after every sync
self.assertGreater(fulltbl_sync_records[stream]['table_version'],
incrmntl_sync_records[stream]['table_version'])

# bookmarked states (top level objects)
fulltbl_bookmark_key_value = fulltbl_sync_bookmarks.get('bookmarks', {}).get(stream)

# bookmarked states (actual values)
fulltbl_bookmark_value = fulltbl_bookmark_key_value.get(replication_key)
#verify no bookmarks are present in fulltbl sync
self.assertIsNone(fulltbl_bookmark_value)