From b9435ce91b4492215680d3fe3ee7c0be8fc39d8d Mon Sep 17 00:00:00 2001 From: Jimmy Kim Date: Fri, 10 May 2024 11:20:45 +0900 Subject: [PATCH 1/2] add SBOSC_DB to config --- src/config/__init__.py | 6 +- src/config/config.py | 1 + src/sbosc/component.py | 4 +- src/sbosc/controller/controller.py | 27 ++++---- src/sbosc/controller/initializer.py | 20 +++--- src/sbosc/controller/validator.py | 46 +++++++------ src/sbosc/eventhandler/eventhandler.py | 16 ++--- src/sbosc/eventhandler/eventloader.py | 39 +++++------ src/sbosc/monitor/monitor.py | 15 +++-- src/sbosc/operations/base.py | 5 +- src/sbosc/operations/operation.py | 3 +- tests/configs/config.yaml | 5 +- tests/conftest.py | 17 ++--- tests/test_controller.py | 89 +++++++++++++------------- tests/test_eventhandler.py | 34 +++++----- tests/test_monitor.py | 8 +-- tests/test_worker.py | 32 ++++----- 17 files changed, 189 insertions(+), 178 deletions(-) diff --git a/src/config/__init__.py b/src/config/__init__.py index f833ee4..caf507a 100644 --- a/src/config/__init__.py +++ b/src/config/__init__.py @@ -2,6 +2,6 @@ from config.secret import Secret from config.env import Env -config = Config() # override by setting env CONFIG_FILE -secret = Secret() # override by setting env SECRET_FILE -env = Env() # override with environment variables +config: Config = Config() # override by setting env CONFIG_FILE +secret: Secret = Secret() # override by setting env SECRET_FILE +env: Env = Env() # override with environment variables diff --git a/src/config/config.py b/src/config/config.py index 198168d..c430d0e 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -41,6 +41,7 @@ class IndexConfig: class Config: # Migration plan + SBOSC_DB = 'sbosc' SOURCE_WRITER_ENDPOINT = '' SOURCE_READER_ENDPOINT = '' SOURCE_CLUSTER_ID = None # optional diff --git a/src/sbosc/component.py b/src/sbosc/component.py index 20b0ab6..84090ea 100644 --- a/src/sbosc/component.py +++ b/src/sbosc/component.py @@ -50,8 +50,8 @@ def get_migration_id(self): with self.db.cursor() as cursor: cursor: Cursor try: - cursor.execute(''' - SELECT id FROM sbosc.migration_plan + cursor.execute(f''' + SELECT id FROM {config.SBOSC_DB}.migration_plan WHERE source_cluster_id = %s AND source_db = %s AND source_table = %s AND destination_cluster_id = %s AND destination_db = %s AND destination_table = %s ''', ( diff --git a/src/sbosc/controller/controller.py b/src/sbosc/controller/controller.py index 25c9d7a..84ec8bb 100644 --- a/src/sbosc/controller/controller.py +++ b/src/sbosc/controller/controller.py @@ -104,8 +104,8 @@ def create_bulk_import_chunks(self): # Save chunk info to database with self.db.cursor() as cursor: cursor: Cursor - cursor.executemany(''' - INSERT INTO sbosc.chunk_info (migration_id, chunk_id, start_pk, end_pk, created_at) + cursor.executemany(f''' + INSERT INTO {config.SBOSC_DB}.chunk_info (migration_id, chunk_id, start_pk, end_pk, created_at) VALUES (%s, %s, %s, %s, NOW()) ''', chunks) @@ -128,8 +128,8 @@ def validate_bulk_import(self): self.logger.info("Restoring missing chunks from database") with self.db.cursor() as cursor: cursor: Cursor - cursor.execute(''' - SELECT chunk_id FROM sbosc.chunk_info + cursor.execute(f''' + SELECT chunk_id FROM {config.SBOSC_DB}.chunk_info WHERE migration_id = %s ''', [self.migration_id]) for chunk_id, in cursor.fetchall(): @@ -214,8 +214,8 @@ def add_index(self): cursor: Cursor index_info = None - cursor.execute(''' - SELECT index_name FROM sbosc.index_creation_status + cursor.execute(f''' + SELECT index_name FROM {config.SBOSC_DB}.index_creation_status WHERE migration_id = %s AND ended_at IS NULL AND started_at IS NOT NULL ''', (self.migration_id,)) @@ -240,7 +240,7 @@ def add_index(self): else: cursor.execute(f''' - SELECT index_name, index_columns, is_unique FROM sbosc.index_creation_status + SELECT index_name, index_columns, is_unique FROM {config.SBOSC_DB}.index_creation_status WHERE migration_id = %s AND ended_at IS NULL LIMIT {config.INDEX_CREATED_PER_QUERY} ''', (self.migration_id,)) @@ -259,8 +259,8 @@ def add_index(self): started_at = datetime.now() with self.db.cursor() as cursor: cursor: Cursor - cursor.executemany(''' - UPDATE sbosc.index_creation_status SET started_at = %s + cursor.executemany(f''' + UPDATE {config.SBOSC_DB}.index_creation_status SET started_at = %s WHERE migration_id = %s AND index_name = %s ''', [(started_at, self.migration_id, index_name) for index_name in index_names]) @@ -282,8 +282,8 @@ def add_index(self): ended_at = datetime.now() with self.db.cursor() as cursor: cursor: Cursor - cursor.executemany(''' - UPDATE sbosc.index_creation_status SET ended_at = %s + cursor.executemany(f''' + UPDATE {config.SBOSC_DB}.index_creation_status SET ended_at = %s WHERE migration_id = %s AND index_name = %s ''', [(ended_at, self.migration_id, index_name) for index_name in index_names]) @@ -362,8 +362,9 @@ def swap_tables(self): else: cursor.execute(f"RENAME TABLE {destination_table} TO {source_table}") self.redis_data.set_current_stage(Stage.DONE) - cursor.execute(''' - UPDATE sbosc.migration_plan SET ended_at = FROM_UNIXTIME(%s), final_max_id = %s WHERE id = %s + cursor.execute(f''' + UPDATE {config.SBOSC_DB}.migration_plan + SET ended_at = FROM_UNIXTIME(%s), final_max_id = %s WHERE id = %s ''', (after_rename_table_timestamp, final_max_id, self.migration_id)) self.logger.info("Tables swapped") self.slack.send_message("Tables swapped", color="good") diff --git a/src/sbosc/controller/initializer.py b/src/sbosc/controller/initializer.py index 84179f2..7873e1d 100644 --- a/src/sbosc/controller/initializer.py +++ b/src/sbosc/controller/initializer.py @@ -27,13 +27,13 @@ def __init__(self): def check_database_setup(self): with self.db.cursor() as cursor: cursor: Cursor - cursor.execute("SELECT 1 FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = 'sbosc'") + cursor.execute(f"SELECT 1 FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = '{config.SBOSC_DB}'") if cursor.rowcount == 0: self.logger.info("SB-OSC database not found") return False - cursor.execute(''' + cursor.execute(f''' SELECT 1 FROM information_schema.TABLES - WHERE TABLE_SCHEMA = 'sbosc' AND TABLE_NAME IN (%s) + WHERE TABLE_SCHEMA = '{config.SBOSC_DB}' AND TABLE_NAME IN (%s) ''' % ','.join(['%s'] * len(REQUIRED_TABLES)), REQUIRED_TABLES) if cursor.rowcount != len(REQUIRED_TABLES): self.logger.info("Required tables not found") @@ -43,11 +43,11 @@ def check_database_setup(self): def setup_database(self): with self.db.cursor() as cursor: cursor: Cursor - cursor.execute("CREATE DATABASE IF NOT EXISTS sbosc;") + cursor.execute(f"CREATE DATABASE IF NOT EXISTS {config.SBOSC_DB};") self.logger.info("Database created") # Controller tables - cursor.execute("USE sbosc;") + cursor.execute(f"USE {config.SBOSC_DB};") cursor.execute(''' CREATE TABLE IF NOT EXISTS migration_plan ( id int PRIMARY KEY AUTO_INCREMENT, @@ -190,8 +190,8 @@ def init_migration(self): with self.db.cursor() as cursor: # Insert migration plan cursor: Cursor - cursor.execute(''' - INSERT INTO sbosc.migration_plan + cursor.execute(f''' + INSERT INTO {config.SBOSC_DB}.migration_plan (source_cluster_id, source_db, source_table, destination_cluster_id, destination_db, destination_table, created_at) VALUES (%s, %s, %s, %s, %s, %s, NOW()) @@ -208,8 +208,8 @@ def init_migration(self): # Insert index creation status for index in config.INDEXES: - cursor.execute(''' - INSERT INTO sbosc.index_creation_status + cursor.execute(f''' + INSERT INTO {config.SBOSC_DB}.index_creation_status (migration_id, index_name, index_columns, is_unique, created_at) VALUES (%s, %s, %s, %s, NOW()) ''', ( @@ -223,7 +223,7 @@ def init_migration(self): dml_log_tables = [f'{table}_{migration_id}' for table in ['inserted_pk', 'updated_pk', 'deleted_pk']] for table in dml_log_tables: cursor.execute(f''' - CREATE TABLE IF NOT EXISTS sbosc.{table} ( + CREATE TABLE IF NOT EXISTS {config.SBOSC_DB}.{table} ( source_pk bigint PRIMARY KEY, event_timestamp bigint, KEY `idx_{table}_event_timestamp` (`event_timestamp`) diff --git a/src/sbosc/controller/validator.py b/src/sbosc/controller/validator.py index 2881481..f7683d6 100644 --- a/src/sbosc/controller/validator.py +++ b/src/sbosc/controller/validator.py @@ -107,7 +107,7 @@ def get_timestamp_range(self): # Get last validated event timestamp cursor.execute(f''' - SELECT last_validated_timestamp FROM sbosc.apply_dml_events_validation_status + SELECT last_validated_timestamp FROM {config.SBOSC_DB}.apply_dml_events_validation_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') if cursor.rowcount > 0: @@ -115,16 +115,19 @@ def get_timestamp_range(self): else: cursor.execute(f''' SELECT MIN(event_timestamps.min_ts) FROM ( - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.inserted_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.updated_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.deleted_pk_{self.migration_id} + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} ) AS event_timestamps; ''') if cursor.rowcount > 0: start_timestamp = cursor.fetchone()[0] cursor.execute(f''' - SELECT last_event_timestamp FROM sbosc.event_handler_status + SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') if cursor.rowcount > 0: @@ -148,7 +151,7 @@ def execute_apply_dml_events_validation_query( unmatched_pks.extend([(pk, UnmatchType.NOT_UPDATED) for pk in not_updated_pks]) elif table == 'deleted_pk': source_cursor.execute(f''' - SELECT source_pk FROM sbosc.deleted_pk_{self.migration_id} + SELECT source_pk FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp} ''') if source_cursor.rowcount > 0: @@ -188,7 +191,7 @@ def validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_p dest_cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;") source_cursor.execute(f''' - SELECT COUNT(1) FROM sbosc.{table}_{self.migration_id} + SELECT COUNT(1) FROM {config.SBOSC_DB}.{table}_{self.migration_id} WHERE event_timestamp BETWEEN {batch_start_timestamp} AND {batch_end_timestamp} ''') event_count = source_cursor.fetchone()[0] @@ -220,7 +223,7 @@ def validate_unmatched_pks(self): with self.db.cursor() as cursor: cursor: Cursor cursor.execute(f''' - SELECT source_pk, unmatch_type FROM sbosc.unmatched_rows + SELECT source_pk, unmatch_type FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id} LIMIT {self.apply_dml_events_batch_size} ''') if cursor.rowcount > 0: @@ -237,7 +240,7 @@ def validate_unmatched_pks(self): not_updated_pks = not_updated_pks - matched_pks matched_pks_str = ','.join([str(pk) for pk in matched_pks]) cursor.execute(f''' - DELETE FROM sbosc.unmatched_rows WHERE source_pk IN ({matched_pks_str}) + DELETE FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk IN ({matched_pks_str}) AND unmatch_type = '{UnmatchType.NOT_UPDATED}' ''') if len(not_removed_pks) > 0: @@ -246,7 +249,7 @@ def validate_unmatched_pks(self): not_removed_pks = not_removed_pks - matched_pks matched_pks_str = ','.join([str(pk) for pk in matched_pks]) cursor.execute(f''' - DELETE FROM sbosc.unmatched_rows WHERE source_pk IN ({matched_pks_str}) + DELETE FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk IN ({matched_pks_str}) AND unmatch_type = '{UnmatchType.NOT_REMOVED}' ''') self.redis_data.updated_pk_set.add(not_updated_pks - not_removed_pks) @@ -262,11 +265,11 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp): self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}") for table in ['inserted_pk', 'updated_pk', 'deleted_pk']: cursor.execute(f''' - ANALYZE TABLE sbosc.{table}_{self.migration_id} + ANALYZE TABLE {config.SBOSC_DB}.{table}_{self.migration_id} ''') cursor.execute(f''' SELECT TABLE_ROWS FROM information_schema.TABLES - WHERE TABLE_SCHEMA = 'sbosc' AND TABLE_NAME = '{table}_{self.migration_id}' + WHERE TABLE_SCHEMA = '{config.SBOSC_DB}' AND TABLE_NAME = '{table}_{self.migration_id}' ''') table_rows = cursor.fetchone()[0] @@ -289,11 +292,12 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp): thread.result() cursor.executemany(f''' - INSERT IGNORE INTO sbosc.unmatched_rows (source_pk, migration_id, unmatch_type) + INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type) VALUES (%s, {self.migration_id}, %s) ''', unmatched_pks) self.validate_unmatched_pks() - cursor.execute(f"SELECT COUNT(1) FROM sbosc.unmatched_rows WHERE migration_id = {self.migration_id}") + cursor.execute( + f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}") unmatched_rows = cursor.fetchone()[0] # Even though validation logic is based on data in tables following valid condition can be achieved. @@ -316,7 +320,7 @@ def apply_dml_events_validation(self): with self.db.cursor() as cursor: cursor: Cursor cursor.execute(f''' - INSERT INTO sbosc.apply_dml_events_validation_status + INSERT INTO {config.SBOSC_DB}.apply_dml_events_validation_status (migration_id, last_validated_timestamp, is_valid, created_at) VALUES ({self.migration_id}, {end_timestamp}, {is_valid}, NOW()) ''') @@ -332,7 +336,7 @@ def full_dml_event_validation(self): with self.db.cursor(role='reader') as cursor: cursor: Cursor cursor.execute(f''' - SELECT created_at FROM sbosc.full_dml_event_validation_status + SELECT created_at FROM {config.SBOSC_DB}.full_dml_event_validation_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') @@ -345,9 +349,9 @@ def full_dml_event_validation(self): cursor.execute(f''' SELECT MIN(event_timestamps.min_ts) FROM ( - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.inserted_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.updated_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.deleted_pk_{self.migration_id} + SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} ) AS event_timestamps; ''') if cursor.rowcount > 0: @@ -357,7 +361,7 @@ def full_dml_event_validation(self): return cursor.execute(f''' - SELECT last_event_timestamp FROM sbosc.event_handler_status + SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') if cursor.rowcount > 0: @@ -370,7 +374,7 @@ def full_dml_event_validation(self): with self.db.cursor() as cursor: cursor.execute(f''' - INSERT INTO sbosc.full_dml_event_validation_status + INSERT INTO {config.SBOSC_DB}.full_dml_event_validation_status (migration_id, last_validated_timestamp, is_valid, created_at) VALUES ({self.migration_id}, {end_timestamp}, {is_valid}, NOW()) ''') diff --git a/src/sbosc/eventhandler/eventhandler.py b/src/sbosc/eventhandler/eventhandler.py index 2030555..4b547b1 100644 --- a/src/sbosc/eventhandler/eventhandler.py +++ b/src/sbosc/eventhandler/eventhandler.py @@ -104,7 +104,7 @@ def init_event_handler(self): with self.db.cursor(DictCursor) as cursor: cursor: DictCursor cursor.execute(f''' - SELECT log_file, log_pos, last_event_timestamp, created_at FROM sbosc.event_handler_status + SELECT log_file, log_pos, last_event_timestamp, created_at FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') @@ -155,8 +155,8 @@ def create_binlog_stream(self, log_file, log_pos, thread_id=0) -> BinLogStreamRe def save_current_binlog_position(self): with self.db.cursor() as cursor: cursor: Cursor - cursor.execute(''' - INSERT INTO sbosc.event_handler_status + cursor.execute(f''' + INSERT INTO {config.SBOSC_DB}.event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) VALUES (%s, %s, %s, %s, NOW()) ''', (self.migration_id, self.log_file, self.log_pos, self.event_store.last_event_timestamp)) @@ -171,7 +171,7 @@ def save_events_to_db(self): (f'deleted_pk_{self.migration_id}', self.event_store.delete_event_timestamp.items()) ]: cursor.executemany(f''' - INSERT INTO sbosc.{table_name} (source_pk, event_timestamp) + INSERT INTO {config.SBOSC_DB}.{table_name} (source_pk, event_timestamp) VALUES (%s, %s) ON DUPLICATE KEY UPDATE event_timestamp = VALUES(event_timestamp) ''', list(events)) self.event_store.clear() @@ -214,7 +214,7 @@ def are_indexes_created(self): with self.db.cursor() as cursor: cursor: Cursor cursor.execute(f''' - SELECT COUNT(1) FROM sbosc.index_creation_status + SELECT COUNT(1) FROM {config.SBOSC_DB}.index_creation_status WHERE migration_id = {self.migration_id} AND ended_at IS NULL ''') return cursor.fetchone()[0] == 0 @@ -240,11 +240,11 @@ def apply_dml_events_pre_validation(self): self.save() with self.db.cursor() as cursor: cursor: Cursor - cursor.execute(f"SELECT COUNT(1) FROM sbosc.inserted_pk_{self.migration_id}") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id}") inserted_count = cursor.fetchone()[0] - cursor.execute(f"SELECT COUNT(1) FROM sbosc.updated_pk_{self.migration_id}") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.updated_pk_{self.migration_id}") updated_count = cursor.fetchone()[0] - cursor.execute(f"SELECT COUNT(1) FROM sbosc.deleted_pk_{self.migration_id}") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id}") deleted_count = cursor.fetchone()[0] if inserted_count + updated_count + deleted_count > 0: while self.event_store.last_event_timestamp != self.event_loader.last_loaded_timestamp: diff --git a/src/sbosc/eventhandler/eventloader.py b/src/sbosc/eventhandler/eventloader.py index ae53139..454e85a 100644 --- a/src/sbosc/eventhandler/eventloader.py +++ b/src/sbosc/eventhandler/eventloader.py @@ -57,7 +57,7 @@ def get_start_timestamp(self): # Get last loaded event timestamp cursor.execute(f''' - SELECT last_loaded_timestamp FROM sbosc.apply_dml_events_status + SELECT last_loaded_timestamp FROM {config.SBOSC_DB}.apply_dml_events_status WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1 ''') if cursor.rowcount > 0: @@ -65,9 +65,12 @@ def get_start_timestamp(self): else: cursor.execute(f''' SELECT MIN(event_timestamps.min_ts) FROM ( - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.inserted_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.updated_pk_{self.migration_id} UNION - SELECT MIN(event_timestamp) AS min_ts FROM sbosc.deleted_pk_{self.migration_id} + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION + SELECT MIN(event_timestamp) AS min_ts + FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} ) AS event_timestamps; ''') if cursor.rowcount > 0: @@ -82,9 +85,9 @@ def get_max_timestamp(self): cursor.execute(f''' SELECT MAX(event_timestamps.max_ts) FROM ( - SELECT MAX(event_timestamp) AS max_ts FROM sbosc.inserted_pk_{self.migration_id} UNION - SELECT MAX(event_timestamp) AS max_ts FROM sbosc.updated_pk_{self.migration_id} UNION - SELECT MAX(event_timestamp) AS max_ts FROM sbosc.deleted_pk_{self.migration_id} + SELECT MAX(event_timestamp) AS max_ts FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION + SELECT MAX(event_timestamp) AS max_ts FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION + SELECT MAX(event_timestamp) AS max_ts FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} ) AS event_timestamps; ''') if cursor.rowcount > 0: @@ -100,17 +103,17 @@ def get_end_timestamp(self, start_timestamp): while not found_end_timestamp and self.batch_duration > 0 and not self.stop_flag: cursor.execute(f''' - SELECT COUNT(1) FROM sbosc.inserted_pk_{self.migration_id} + SELECT COUNT(1) FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {start_timestamp + self.batch_duration} ''') inserted_count = cursor.fetchone()[0] cursor.execute(f''' - SELECT COUNT(1) FROM sbosc.updated_pk_{self.migration_id} + SELECT COUNT(1) FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {start_timestamp + self.batch_duration} ''') updated_count = cursor.fetchone()[0] cursor.execute(f''' - SELECT COUNT(1) FROM sbosc.deleted_pk_{self.migration_id} + SELECT COUNT(1) FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {start_timestamp + self.batch_duration} ''') deleted_count = cursor.fetchone()[0] @@ -140,10 +143,10 @@ def get_pk_batch(self, start_timestamp, end_timestamp) -> (set, set, int): # Updated pks cursor.execute(f''' SELECT updated_pks.source_pk, updated_pks.event_timestamp FROM ( - SELECT source_pk, event_timestamp FROM sbosc.inserted_pk_{self.migration_id} + SELECT source_pk, event_timestamp FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp} UNION - SELECT source_pk, event_timestamp FROM sbosc.updated_pk_{self.migration_id} + SELECT source_pk, event_timestamp FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp} ) AS updated_pks ''') @@ -155,7 +158,7 @@ def get_pk_batch(self, start_timestamp, end_timestamp) -> (set, set, int): # Removed pks cursor.execute(f''' - SELECT source_pk, event_timestamp FROM sbosc.deleted_pk_{self.migration_id} + SELECT source_pk, event_timestamp FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id} WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp} ''') for source_pk, event_timestamp in cursor.fetchall(): @@ -181,8 +184,8 @@ def load_events_from_db(self): # Also it will prevent eventhandler from moving to next stage to early even before loading events with self.db.cursor(role='reader') as cursor: cursor: Cursor - cursor.execute(''' - SELECT last_event_timestamp FROM sbosc.event_handler_status + cursor.execute(f''' + SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = %s ORDER BY id LIMIT 1 ''', (self.migration_id,)) if cursor.rowcount > 0: @@ -204,9 +207,9 @@ def load_events_from_db(self): # Save last loaded event timestamp with self.db.cursor() as cursor: cursor: Cursor - cursor.execute(''' - INSERT INTO sbosc.apply_dml_events_status (migration_id, last_loaded_timestamp, created_at) - VALUES (%s, %s, NOW()) + cursor.execute(f''' + INSERT INTO {config.SBOSC_DB}.apply_dml_events_status + (migration_id, last_loaded_timestamp, created_at) VALUES (%s, %s, NOW()) ''', (self.migration_id, max_timestamp)) self.last_loaded_timestamp = max_timestamp self.logger.info(f"Loaded events from database. Last loaded timestamp: {self.last_loaded_timestamp}") diff --git a/src/sbosc/monitor/monitor.py b/src/sbosc/monitor/monitor.py index d0e577d..21794b9 100644 --- a/src/sbosc/monitor/monitor.py +++ b/src/sbosc/monitor/monitor.py @@ -274,17 +274,17 @@ def submit_event_handler_timestamps(self): # last_event_timestamp, last_loaded_timestamp, last_catchup_timestamp with self.db.cursor() as cursor: cursor: Cursor - cursor.execute(''' + cursor.execute(f''' SELECT MIN(last_event_timestamp), MAX(last_event_timestamp) - FROM sbosc.event_handler_status WHERE migration_id = %s AND last_event_timestamp > 1 + FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = %s AND last_event_timestamp > 1 ''' % self.migration_id) start_timestamp, last_timestamp = cursor.fetchone() if start_timestamp: last_event_timestamp = last_timestamp - start_timestamp self.metric_sender.submit('sb_osc_last_event_timestamp', last_event_timestamp) - cursor.execute(''' - SELECT last_loaded_timestamp FROM sbosc.apply_dml_events_status + cursor.execute(f''' + SELECT last_loaded_timestamp FROM {config.SBOSC_DB}.apply_dml_events_status WHERE migration_id = %s ORDER BY id DESC LIMIT 1 ''' % self.migration_id) if cursor.rowcount > 0: @@ -316,8 +316,8 @@ def check_migration_status(self): remaining_binlog_size = 0 if time.time() - self.redis_data.last_catchup_timestamp > 2: with self.db.cursor() as cursor: - cursor.execute(''' - SELECT log_file, log_pos FROM sbosc.event_handler_status + cursor.execute(f''' + SELECT log_file, log_pos FROM {config.SBOSC_DB}.event_handler_status WHERE migration_id = %s ORDER BY id DESC LIMIT 1 ''' % self.migration_id) if cursor.rowcount > 0: @@ -338,6 +338,7 @@ def check_migration_status(self): # unmatched_pks with self.db.cursor() as cursor: - cursor.execute("SELECT COUNT(1) FROM sbosc.unmatched_rows WHERE migration_id = %s" % self.migration_id) + cursor.execute( + f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = %s" % self.migration_id) unmatched_pks = cursor.fetchone()[0] self.metric_sender.submit('sb_osc_unmatched_rows', unmatched_pks) diff --git a/src/sbosc/operations/base.py b/src/sbosc/operations/base.py index c77ca3b..c981d24 100644 --- a/src/sbosc/operations/base.py +++ b/src/sbosc/operations/base.py @@ -1,6 +1,7 @@ import pandas as pd from MySQLdb.cursors import Cursor +from config import config from sbosc.operations.operation import MigrationOperation import sbosc.operations.utils as operation_utils @@ -113,7 +114,7 @@ def get_rematched_removed_pks(self, db, not_removed_pks): with db.cursor() as cursor: cursor: Cursor cursor.execute(f''' - SELECT source_pk FROM sbosc.unmatched_rows WHERE source_pk NOT IN ( + SELECT source_pk FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk NOT IN ( SELECT id FROM {self.destination_db}.{self.destination_table} WHERE id IN ({not_removed_pks_str}) ) AND source_pk IN ({not_removed_pks_str}) @@ -266,7 +267,7 @@ def get_rematched_removed_pks(self, db, not_removed_pks): with db.cursor(host='source', role='reader') as cursor: cursor: Cursor query = f''' - SELECT source_pk FROM sbosc.unmatched_rows + SELECT source_pk FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk IN ({not_removed_pks_str}) ''' if still_not_removed_pks_str: diff --git a/src/sbosc/operations/operation.py b/src/sbosc/operations/operation.py index 1717440..7c0d21b 100644 --- a/src/sbosc/operations/operation.py +++ b/src/sbosc/operations/operation.py @@ -4,6 +4,7 @@ from MySQLdb.cursors import Cursor +from config import config from modules.db import Database from modules.redis import RedisData @@ -54,7 +55,7 @@ def _get_event_pks( 'update': f'updated_pk_{self.migration_id}' } cursor.execute(f''' - SELECT source_pk FROM sbosc.{table_names[event_type]} + SELECT source_pk FROM {config.SBOSC_DB}.{table_names[event_type]} WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp} ''') return ','.join([str(row[0]) for row in cursor.fetchall()]) diff --git a/tests/configs/config.yaml b/tests/configs/config.yaml index c33771e..76c37b8 100644 --- a/tests/configs/config.yaml +++ b/tests/configs/config.yaml @@ -1,10 +1,11 @@ +sbosc_db: "sbosc" source_writer_endpoint: "127.0.0.1" source_reader_endpoint: "127.0.0.1" source_cluster_id: "test" -source_db: "sbosc" +source_db: "source" source_table: "source_table" destination_cluster_id: "test" -destination_db: "sbosc" +destination_db: "dest" destination_table: "destination_table" min_chunk_size: 1000 max_chunk_count: 10 diff --git a/tests/conftest.py b/tests/conftest.py index 126a2bf..a17cfc0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,7 +47,6 @@ def cursor(config, secret): port=secret.PORT, user=secret.USERNAME, password=secret.PASSWORD, - db=config.SOURCE_DB, autocommit=True ) with connection.cursor() as cursor: @@ -56,7 +55,7 @@ def cursor(config, secret): @pytest.fixture def sqlalchemy_engine(config, secret): - return create_engine(f'mysql+mysqldb://{secret.USERNAME}:@{config.SOURCE_WRITER_ENDPOINT}:{secret.PORT}/sbosc') + return create_engine(f'mysql+mysqldb://{secret.USERNAME}:@{config.SOURCE_WRITER_ENDPOINT}:{secret.PORT}/{config.SOURCE_DB}') @pytest.fixture(autouse=True) @@ -80,15 +79,13 @@ def init_migration(config, cursor, redis_data): from sbosc.const import Stage from sbosc.controller.initializer import Initializer - cursor.execute(f''' - SELECT table_name FROM information_schema.tables - WHERE table_schema = '{config.SOURCE_DB}' - ''') - for table, in cursor.fetchall(): - cursor.execute(f'DROP TABLE {table}') + for db in [config.SOURCE_DB, config.DESTINATION_DB, config.SBOSC_DB]: + cursor.execute(f'DROP DATABASE IF EXISTS {db}') + cursor.execute(f'CREATE DATABASE {db}') + + cursor.execute(f'CREATE TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} (id int)') + cursor.execute(f'CREATE TABLE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} (id int)') - for table in [config.SOURCE_TABLE, config.DESTINATION_TABLE]: - cursor.execute(f"CREATE TABLE {table} (id int)") migration_id = Initializer().init_migration() # Validate Initializer.init_migration diff --git a/tests/test_controller.py b/tests/test_controller.py index ed92c36..df1601a 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -24,22 +24,22 @@ @pytest.fixture def setup_table(sqlalchemy_engine, cursor, request): param = request.param if hasattr(request, 'param') else '' - cursor.execute(f"DROP TABLE IF EXISTS {config.SOURCE_TABLE}") + cursor.execute(f"DROP TABLE IF EXISTS {config.SOURCE_DB}.{config.SOURCE_TABLE}") if param == 'with_data': df = pd.DataFrame(np.random.choice(['a', 'b', 'c'], size=(TABLE_SIZE, 3)), columns=['A', 'B', 'C']) df['id'] = range(1, 1 + len(df)) - df.to_sql(config.SOURCE_TABLE, sqlalchemy_engine, if_exists='replace', index=False) - df.to_sql(config.DESTINATION_TABLE, sqlalchemy_engine, if_exists='replace', index=False) - cursor.execute(f"ALTER TABLE {config.SOURCE_TABLE} MODIFY COLUMN id int AUTO_INCREMENT PRIMARY KEY") - cursor.execute(f"ALTER TABLE {config.DESTINATION_TABLE} MODIFY COLUMN id int AUTO_INCREMENT PRIMARY KEY") + df.to_sql(config.SOURCE_TABLE, sqlalchemy_engine, if_exists='replace', index=False, schema=config.SOURCE_DB) + df.to_sql(config.DESTINATION_TABLE, sqlalchemy_engine, if_exists='replace', index=False, schema=config.DESTINATION_DB) + cursor.execute(f"ALTER TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} MODIFY COLUMN id int AUTO_INCREMENT PRIMARY KEY") + cursor.execute(f"ALTER TABLE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} MODIFY COLUMN id int AUTO_INCREMENT PRIMARY KEY") else: cursor.execute(f''' - CREATE TABLE {config.SOURCE_TABLE} ( + CREATE TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} ( id INT PRIMARY KEY AUTO_INCREMENT, A CHAR(1), B CHAR(1), C CHAR(1) ) ''') - cursor.execute(f"INSERT INTO {config.SOURCE_TABLE} (id, A, B, C) VALUES ({TABLE_SIZE}, 'a', 'b', 'c')") + cursor.execute(f"INSERT INTO {config.SOURCE_DB}.{config.SOURCE_TABLE} (id, A, B, C) VALUES ({TABLE_SIZE}, 'a', 'b', 'c')") @pytest.fixture @@ -108,13 +108,13 @@ def test_bulk_import_validation(controller: Controller, setup_table, cursor, ove assert controller.validator.bulk_import_validation() delete_pks = random.sample(range(1, TABLE_SIZE), 10) cursor.execute(f''' - DELETE FROM {config.SOURCE_TABLE} + DELETE FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE id IN ({','.join([str(i) for i in delete_pks[:5]])}) ''') assert controller.validator.bulk_import_validation() controller.validator.stop_flag = False cursor.execute(f''' - DELETE FROM {config.DESTINATION_TABLE} + DELETE FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE} WHERE id IN ({','.join([str(i) for i in delete_pks])}) ''') assert not controller.validator.bulk_import_validation() @@ -125,7 +125,7 @@ def test_bulk_import_validation(controller: Controller, setup_table, cursor, ove @pytest.mark.parametrize('case', ['normal', 'on_restart']) def test_add_index(controller: Controller, setup_table, cursor, case): cursor.execute(f''' - ALTER TABLE {config.DESTINATION_TABLE} + ALTER TABLE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} MODIFY COLUMN A VARCHAR(128), MODIFY COLUMN B VARCHAR(128), MODIFY COLUMN C VARCHAR(128) ''') @@ -138,20 +138,20 @@ def test_add_index(controller: Controller, setup_table, cursor, case): IndexConfig('idx_6', 'B,C') ] - cursor.execute("TRUNCATE TABLE index_creation_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.index_creation_status") cursor.executemany(f''' - INSERT INTO index_creation_status + INSERT INTO {config.SBOSC_DB}.index_creation_status (migration_id, index_name, index_columns, is_unique) VALUES (%s, %s, %s, %s) ''', [(1, index.name, index.columns, index.unique) for index in config.INDEXES]) if case == 'on_restart': - cursor.execute("UPDATE index_creation_status SET started_at = NOW() WHERE id = 1") + cursor.execute(f"UPDATE {config.SBOSC_DB}.index_creation_status SET started_at = NOW() WHERE id = 1") def delayed_add_index(): time.sleep(100) cursor.execute(f''' - ALTER TABLE {config.DESTINATION_TABLE} ADD INDEX idx_1 (A) + ALTER TABLE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} ADD INDEX idx_1 (A) ''') threading.Thread(target=delayed_add_index).start() @@ -177,31 +177,31 @@ def test_apply_dml_events_validation(controller: Controller, setup_table, redis_ insert_events = [(random.randint(1, TABLE_SIZE), random.randint(*timestamp_range)) for _ in range(500)] update_events = [(random.randint(1, TABLE_SIZE), random.randint(*timestamp_range)) for _ in range(500)] delete_events = [(random.randint(1, TABLE_SIZE), random.randint(*timestamp_range)) for _ in range(500)] - cursor.executemany(''' - INSERT IGNORE INTO inserted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.inserted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) ''', insert_events) - cursor.executemany(''' - INSERT IGNORE INTO updated_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.updated_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) ''', update_events) - cursor.executemany(''' - INSERT IGNORE INTO deleted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.deleted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) ''', delete_events) - cursor.execute("TRUNCATE TABLE event_handler_status") - cursor.execute("TRUNCATE TABLE apply_dml_events_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.event_handler_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.apply_dml_events_status") # Event handler status doesn't have any row assert not controller.validator.apply_dml_events_validation() # Insert row to event handler status and validate cursor.execute(f''' - INSERT INTO event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) + INSERT INTO {config.SBOSC_DB}.event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) VALUES (1, 'mysql-bin.000001', 4, {timestamp_range[1]}, NOW()) ''') controller.validator.apply_dml_events_validation() # Check if the validation is correct - cursor.execute(''' - SELECT COUNT(1) FROM unmatched_rows WHERE migration_id = 1 AND unmatch_type = 'not_removed' + cursor.execute(f''' + SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = 1 AND unmatch_type = 'not_removed' ''') not_removed_event_count = cursor.fetchone()[0] @@ -211,33 +211,33 @@ def test_apply_dml_events_validation(controller: Controller, setup_table, redis_ # Delete rows from source table and validate cursor.execute(f''' - DELETE FROM apply_dml_events_validation_status WHERE migration_id = 1 + DELETE FROM {config.SBOSC_DB}.apply_dml_events_validation_status WHERE migration_id = 1 ''') cursor.execute(f''' - DELETE FROM {config.SOURCE_TABLE} WHERE id IN (SELECT source_pk FROM deleted_pk_1) + DELETE FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE id IN (SELECT source_pk FROM {config.SBOSC_DB}.deleted_pk_1) ''') controller.validator.apply_dml_events_validation() # Check if the validation is correct - cursor.execute(''' - SELECT COUNT(1) FROM unmatched_rows WHERE migration_id = 1 AND unmatch_type = 'not_removed' + cursor.execute(f''' + SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = 1 AND unmatch_type = 'not_removed' ''') not_removed_event_count = cursor.fetchone()[0] - cursor.execute("SELECT COUNT(1) FROM deleted_pk_1") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.deleted_pk_1") deleted_pk_count = cursor.fetchone()[0] assert not_removed_event_count == deleted_pk_count # Delete rows from destination table and validate cursor.execute(f''' - DELETE FROM {config.SOURCE_TABLE} + DELETE FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE id IN ({','.join([str(i) for i in [i[0] for i in delete_events]])}) ''') cursor.execute(f''' - DELETE FROM {config.DESTINATION_TABLE} + DELETE FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE} WHERE id IN ({','.join([str(i) for i in [i[0] for i in delete_events]])}) ''') assert controller.validator.apply_dml_events_validation() - cursor.execute("SELECT COUNT(1) FROM unmatched_rows") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows") assert cursor.fetchone()[0] == 0 # Add new insert, update event @@ -246,22 +246,22 @@ def test_apply_dml_events_validation(controller: Controller, setup_table, redis_ (random.randint(TABLE_SIZE, TABLE_SIZE * 2), random.randint(*new_timestamp_range)) for _ in range(500)] new_update_events = [(random.randint(1, TABLE_SIZE), random.randint(*new_timestamp_range)) for _ in range(500)] - cursor.executemany(''' - INSERT IGNORE INTO inserted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.inserted_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) ''', new_insert_events) - cursor.executemany(''' - INSERT IGNORE INTO updated_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) + cursor.executemany(f''' + INSERT IGNORE INTO {config.SBOSC_DB}.updated_pk_1 (source_pk, event_timestamp) VALUES (%s, %s) ''', new_update_events) cursor.executemany(f''' - INSERT IGNORE INTO {config.SOURCE_TABLE} (id, A, B, C) VALUES (%s, %s, %s, %s) + INSERT IGNORE INTO {config.SOURCE_DB}.{config.SOURCE_TABLE} (id, A, B, C) VALUES (%s, %s, %s, %s) ''', [(i[0], 'a', 'b', 'c') for i in new_insert_events]) cursor.execute(f''' - UPDATE {config.SOURCE_TABLE} SET A = 'x' WHERE id IN ({','.join([str(i) for i in [i[0] for i in new_update_events]])}) + UPDATE {config.SOURCE_DB}.{config.SOURCE_TABLE} SET A = 'x' WHERE id IN ({','.join([str(i) for i in [i[0] for i in new_update_events]])}) ''') cursor.execute(f''' - INSERT INTO event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) + INSERT INTO {config.SBOSC_DB}.event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) VALUES (1, 'mysql-bin.000001', 4, {new_timestamp_range[1]}, NOW()) ''') @@ -269,18 +269,19 @@ def test_apply_dml_events_validation(controller: Controller, setup_table, redis_ # Apply changes to destination table cursor.executemany(f''' - INSERT IGNORE INTO {config.DESTINATION_TABLE} (id, A, B, C) VALUES (%s, %s, %s, %s) + INSERT IGNORE INTO {config.DESTINATION_DB}.{config.DESTINATION_TABLE} (id, A, B, C) VALUES (%s, %s, %s, %s) ''', [(i[0], 'a', 'b', 'c') for i in new_insert_events]) cursor.execute(f''' - UPDATE {config.DESTINATION_TABLE} SET A = 'x' WHERE id IN ({','.join([str(i) for i in [i[0] for i in new_update_events]])}) + UPDATE {config.DESTINATION_DB}.{config.DESTINATION_TABLE} SET A = 'x' WHERE id IN ({','.join([str(i) for i in [i[0] for i in new_update_events]])}) ''') assert controller.validator.apply_dml_events_validation() - cursor.execute("SELECT COUNT(1) FROM unmatched_rows") + cursor.execute(f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows") assert cursor.fetchone()[0] == 0 # Test full validation assert controller.validator.full_dml_event_validation() + cursor.execute(f"USE {config.SBOSC_DB}") cursor.execute("TRUNCATE TABLE event_handler_status") cursor.execute("TRUNCATE TABLE inserted_pk_1") cursor.execute("TRUNCATE TABLE updated_pk_1") @@ -333,4 +334,4 @@ def test_swap_tables(controller: Controller, setup_table, cursor, redis_data: Re assert cursor.fetchone()[0] == 1 # Clean up - cursor.execute(f"DROP TABLE _{config.SOURCE_TABLE}_old_{datetime.now().strftime('%Y%m%d')}") + cursor.execute(f"DROP TABLE {config.SOURCE_DB}._{config.SOURCE_TABLE}_old_{datetime.now().strftime('%Y%m%d')}") diff --git a/tests/test_eventhandler.py b/tests/test_eventhandler.py index efd3661..5fce90b 100644 --- a/tests/test_eventhandler.py +++ b/tests/test_eventhandler.py @@ -23,23 +23,23 @@ def ignore_deprecation_warning(): @pytest.fixture def setup_table(cursor): # Truncate table - cursor.execute("TRUNCATE TABLE event_handler_status") - cursor.execute("TRUNCATE TABLE apply_dml_events_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.event_handler_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.apply_dml_events_status") # Source table - cursor.execute(f"DROP TABLE IF EXISTS {config.SOURCE_TABLE}") + cursor.execute(f"DROP TABLE IF EXISTS {config.SOURCE_DB}.{config.SOURCE_TABLE}") cursor.execute(f''' - CREATE TABLE {config.SOURCE_TABLE} ( + CREATE TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} ( id INT PRIMARY KEY AUTO_INCREMENT, A CHAR(10), B CHAR(10), C CHAR(10) ) ''') - print(f"Created table {config.SOURCE_TABLE}") + print(f"Created table {config.SOURCE_DB}.{config.SOURCE_TABLE}") @pytest.fixture(autouse=True) def init_redis(redis_data): redis_data.set_current_stage(Stage.APPLY_DML_EVENTS_VALIDATION) - redis_data.metadata.source_db = 'sbosc' + redis_data.metadata.source_db = config.SOURCE_DB redis_data.metadata.source_table = config.SOURCE_TABLE redis_data.updated_pk_set.delete() redis_data.removed_pk_set.delete() @@ -68,7 +68,7 @@ def test_event_handler(event_handler, cursor, redis_data: RedisData): # test insert event cursor.executemany( - f'INSERT INTO {config.SOURCE_TABLE} (A, B, C) VALUES (%s, %s, %s)', + f'INSERT INTO {config.SOURCE_DB}.{config.SOURCE_TABLE} (A, B, C) VALUES (%s, %s, %s)', [('a', 'b', 'c'), ('d', 'e', 'f')] ) time.sleep(100) @@ -78,7 +78,7 @@ def test_event_handler(event_handler, cursor, redis_data: RedisData): # test update event on same pk cursor.executemany( - f'UPDATE {config.SOURCE_TABLE} SET A=%s, B=%s, C=%s WHERE id=%s', + f'UPDATE {config.SOURCE_DB}.{config.SOURCE_TABLE} SET A=%s, B=%s, C=%s WHERE id=%s', [('a', 'b', 'c', 1), ('d', 'e', 'f', 1)] ) time.sleep(100) @@ -86,7 +86,7 @@ def test_event_handler(event_handler, cursor, redis_data: RedisData): assert set(redis_data.updated_pk_set.get(1)) == {'1'} # test delete event - cursor.execute(f'DELETE FROM {config.SOURCE_TABLE} WHERE id=1') + cursor.execute(f'DELETE FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE id=1') time.sleep(100) assert len(redis_data.updated_pk_set) == 0 assert len(redis_data.removed_pk_set) == 1 @@ -104,7 +104,7 @@ def test_event_handler_save_to_database(event_handler, cursor, redis_data): # Remove previous data for table in ['inserted_pk_1', 'updated_pk_1', 'deleted_pk_1']: - cursor.execute(f'TRUNCATE TABLE {table}') + cursor.execute(f'TRUNCATE TABLE {config.SBOSC_DB}.{table}') redis_data.updated_pk_set.delete() redis_data.removed_pk_set.delete() @@ -115,7 +115,7 @@ def test_event_handler_save_to_database(event_handler, cursor, redis_data): # Insert events for _ in range(insert_events): - cursor.execute(f'INSERT INTO {config.SOURCE_TABLE} (A, B, C) VALUES (%s, %s, %s)', ('a', 'b', 'c')) + cursor.execute(f'INSERT INTO {config.SOURCE_DB}.{config.SOURCE_TABLE} (A, B, C) VALUES (%s, %s, %s)', ('a', 'b', 'c')) after_insert = time.time() while redis_data.last_catchup_timestamp < after_insert: print("Waiting for INSERT events to be processed...") @@ -126,7 +126,7 @@ def test_event_handler_save_to_database(event_handler, cursor, redis_data): target_id = random.choice(range(1, insert_events + 1)) # Update events are only created when data is changed cursor.execute( - f'UPDATE {config.SOURCE_TABLE} SET A=%s, B=%s, C=%s WHERE id=%s', + f'UPDATE {config.SOURCE_DB}.{config.SOURCE_TABLE} SET A=%s, B=%s, C=%s WHERE id=%s', (f'a{i}', f'b{i}', f'c{i}', target_id) ) after_update = time.time() @@ -137,21 +137,21 @@ def test_event_handler_save_to_database(event_handler, cursor, redis_data): # Delete events deleted_ids = random.sample(range(1, insert_events + 1), delete_events) for target_id in deleted_ids: - cursor.execute(f'DELETE FROM {config.SOURCE_TABLE} WHERE id=%s', (target_id,)) + cursor.execute(f'DELETE FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE id=%s', (target_id,)) after_delete = time.time() while redis_data.last_catchup_timestamp < after_delete: print("Waiting for DELETE events to be processed...") time.sleep(100) # Check if the events have been saved to the database - cursor.execute(f'SELECT COUNT(*) FROM inserted_pk_1') + cursor.execute(f'SELECT COUNT(*) FROM {config.SBOSC_DB}.inserted_pk_1') assert cursor.fetchone()[0] == insert_events - cursor.execute(f'SELECT COUNT(*) FROM {config.SOURCE_TABLE} WHERE A != %s', ('a',)) + cursor.execute(f'SELECT COUNT(*) FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} WHERE A != %s', ('a',)) updated_source_rows = cursor.fetchone()[0] cursor.execute( - f'SELECT COUNT(*) FROM updated_pk_1 WHERE source_pk NOT IN (SELECT source_pk FROM deleted_pk_1)') + f'SELECT COUNT(*) FROM {config.SBOSC_DB}.updated_pk_1 WHERE source_pk NOT IN (SELECT source_pk FROM {config.SBOSC_DB}.deleted_pk_1)') assert cursor.fetchone()[0] == updated_source_rows - cursor.execute(f'SELECT COUNT(*) FROM deleted_pk_1') + cursor.execute(f'SELECT COUNT(*) FROM {config.SBOSC_DB}.deleted_pk_1') assert cursor.fetchone()[0] == delete_events # Set current stage to APPLY_DML_EVENTS diff --git a/tests/test_monitor.py b/tests/test_monitor.py index fd15bd0..a9a566d 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -111,8 +111,8 @@ def test_update_worker_config(monitor, redis_data): def test_check_migration_status(monitor, cursor, redis_data): - cursor.execute("TRUNCATE TABLE sbosc.event_handler_status") - cursor.execute("TRUNCATE TABLE sbosc.apply_dml_events_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.event_handler_status") + cursor.execute(f"TRUNCATE TABLE {config.SBOSC_DB}.apply_dml_events_status") monitor.redis_data.metadata.max_id = 0 monitor.check_migration_status() metric_set = get_metric_names(monitor) @@ -131,7 +131,7 @@ def test_check_migration_status(monitor, cursor, redis_data): assert metric_set == expected_metrics cursor.execute(f''' - INSERT INTO sbosc.event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) + INSERT INTO {config.SBOSC_DB}.event_handler_status (migration_id, log_file, log_pos, last_event_timestamp, created_at) VALUES (1, 'mysql-bin.000001', 4, 2, NOW()) ''') redis_data.set_last_catchup_timestamp(time.time()) @@ -144,7 +144,7 @@ def test_check_migration_status(monitor, cursor, redis_data): assert metric_set == expected_metrics cursor.execute(f''' - INSERT INTO sbosc.apply_dml_events_status (migration_id, last_loaded_timestamp, created_at) + INSERT INTO {config.SBOSC_DB}.apply_dml_events_status (migration_id, last_loaded_timestamp, created_at) VALUES (1, NOW(), NOW()) ''') monitor.check_migration_status() diff --git a/tests/test_worker.py b/tests/test_worker.py index a0a4497..4318ae3 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -40,14 +40,14 @@ def setup_table(sqlalchemy_engine, cursor, request): else: dest_df = pd.DataFrame(columns=SOURCE_COLUMNS) - source_df.to_sql(config.SOURCE_TABLE, sqlalchemy_engine, if_exists='replace', index=False) - dest_df.to_sql(config.DESTINATION_TABLE, sqlalchemy_engine, if_exists='replace', index=False) + source_df.to_sql(config.SOURCE_TABLE, sqlalchemy_engine, if_exists='replace', index=False, schema=config.SOURCE_DB) + dest_df.to_sql(config.DESTINATION_TABLE, sqlalchemy_engine, if_exists='replace', index=False, schema=config.DESTINATION_DB) if "sparse" in param: - cursor.execute(f'alter table {config.SOURCE_TABLE} modify column id int primary KEY AUTO_INCREMENT;') + cursor.execute(f'alter table {config.SOURCE_DB}.{config.SOURCE_TABLE} modify column id int primary KEY AUTO_INCREMENT;') else: - cursor.execute(f'alter table {config.SOURCE_TABLE} add column id int primary KEY AUTO_INCREMENT;') - cursor.execute(f'alter table {config.DESTINATION_TABLE} add column id int primary KEY AUTO_INCREMENT;') + cursor.execute(f'alter table {config.SOURCE_DB}.{config.SOURCE_TABLE} add column id int primary KEY AUTO_INCREMENT;') + cursor.execute(f'alter table {config.DESTINATION_DB}.{config.DESTINATION_TABLE} add column id int primary KEY AUTO_INCREMENT;') @pytest.fixture(autouse=True) @@ -77,7 +77,7 @@ def worker_manager(): # Test # ######## def test_setup_table(cursor, setup_table): - cursor.execute(f"SELECT * FROM {config.SOURCE_TABLE} LIMIT 1") + cursor.execute(f"SELECT * FROM {config.SOURCE_DB}.{config.SOURCE_TABLE} LIMIT 1") result = cursor.fetchall() assert result[0][0] in TEST_TABLE_VALUES @@ -161,10 +161,10 @@ def test_bulk_import(setup_table, cursor, worker_manager, request_id, redis_data }) # Check tables - cursor.execute(f"SELECT COUNT(1) FROM {config.SOURCE_TABLE}") + cursor.execute(f"SELECT COUNT(1) FROM {config.SOURCE_DB}.{config.SOURCE_TABLE}") assert cursor.fetchone()[0] == TABLE_SIZE - cursor.execute(f"SELECT MAX(id) FROM {config.DESTINATION_TABLE}") + cursor.execute(f"SELECT MAX(id) FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE}") if "duplicate_key" in request_id: assert cursor.fetchone()[0] == TABLE_SIZE // 20 else: @@ -198,7 +198,7 @@ def test_bulk_import(setup_table, cursor, worker_manager, request_id, redis_data t2 = time.time() print(f"Execution time: {t2 - t1}") - cursor.execute(f"SELECT COUNT(1) FROM {config.DESTINATION_TABLE}") + cursor.execute(f"SELECT COUNT(1) FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE}") assert cursor.fetchone()[0] == TABLE_SIZE # test worker metrics @@ -232,7 +232,7 @@ def test_apply_dml_events(setup_table, worker_manager, cursor, redis_data, overr # update rows modified_column = SOURCE_COLUMNS[0] cursor.execute(f''' - UPDATE {config.SOURCE_TABLE} SET {modified_column} = 'x' WHERE id IN ({','.join(str(pk) for pk in updated_pks)}) + UPDATE {config.SOURCE_DB}.{config.SOURCE_TABLE} SET {modified_column} = 'x' WHERE id IN ({','.join(str(pk) for pk in updated_pks)}) ''') redis_data.set_current_stage(Stage.APPLY_DML_EVENTS) @@ -242,13 +242,13 @@ def test_apply_dml_events(setup_table, worker_manager, cursor, redis_data, overr # check updated rows cursor.execute(f''' - SELECT count(*) FROM {config.DESTINATION_TABLE} WHERE {modified_column} = 'x' + SELECT count(*) FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE} WHERE {modified_column} = 'x' ''') assert cursor.fetchone()[0] == len(updated_pks) # check removed rows cursor.execute(f''' - SELECT count(*) FROM {config.DESTINATION_TABLE} WHERE id IN ({','.join(str(pk) for pk in removed_pks)}) + SELECT count(*) FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE} WHERE id IN ({','.join(str(pk) for pk in removed_pks)}) ''') assert cursor.fetchone()[0] == 0 @@ -262,11 +262,11 @@ def test_swap_tables(setup_table, worker_manager, cursor, redis_data, request_id redis_data.worker_config.thread_count = 1 def insert_and_check(source_table): - cursor.execute(f"INSERT INTO {source_table} (A, B, C) VALUES (1, 2, 3)") + cursor.execute(f"INSERT INTO {config.SOURCE_DB}.{source_table} (A, B, C) VALUES (1, 2, 3)") last_inserted_id = cursor.lastrowid redis_data.updated_pk_set.add([last_inserted_id]) for _ in range(10): - cursor.execute(f"SELECT * FROM {config.DESTINATION_TABLE} WHERE id = {last_inserted_id}") + cursor.execute(f"SELECT * FROM {config.DESTINATION_DB}.{config.DESTINATION_TABLE} WHERE id = {last_inserted_id}") if cursor.rowcount > 0: break time.sleep(100) @@ -274,13 +274,13 @@ def insert_and_check(source_table): # swap table stage old_source_table = f"_{config.SOURCE_TABLE}_old" - cursor.execute(f"RENAME TABLE {config.SOURCE_TABLE} TO {old_source_table}") + cursor.execute(f"RENAME TABLE {config.SOURCE_DB}.{config.SOURCE_TABLE} TO {config.SOURCE_DB}.{old_source_table}") redis_data.set_old_source_table(old_source_table) redis_data.set_current_stage(Stage.SWAP_TABLES) insert_and_check(old_source_table) - cursor.execute(f"RENAME TABLE {old_source_table} TO {config.SOURCE_TABLE}") + cursor.execute(f"RENAME TABLE {config.SOURCE_DB}.{old_source_table} TO {config.SOURCE_DB}.{config.SOURCE_TABLE}") redis_data.set_old_source_table(None) time.sleep(100) From a6f93fb64c3f0f4a04244c9f1587f4a6f86585b5 Mon Sep 17 00:00:00 2001 From: Jimmy Kim Date: Mon, 13 May 2024 10:17:02 +0900 Subject: [PATCH 2/2] set default db to None (#4) --- src/config/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config/config.py b/src/config/config.py index c430d0e..6562e1f 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -45,12 +45,12 @@ class Config: SOURCE_WRITER_ENDPOINT = '' SOURCE_READER_ENDPOINT = '' SOURCE_CLUSTER_ID = None # optional - SOURCE_DB = 'sbosc' + SOURCE_DB = None SOURCE_TABLE = '' DESTINATION_WRITER_ENDPOINT = None DESTINATION_READER_ENDPOINT = None DESTINATION_CLUSTER_ID = None # optional - DESTINATION_DB = 'sbosc' + DESTINATION_DB = None DESTINATION_TABLE = '' MIN_CHUNK_SIZE = 100000 MAX_CHUNK_COUNT = 200