Skip to content

Commit

Permalink
Merge pull request #2 from sendbird/develop
Browse files Browse the repository at this point in the history
add SBOSC_DB as config
  • Loading branch information
jjh-kim authored May 13, 2024
2 parents 88ab350 + a6f93fb commit 322d697
Show file tree
Hide file tree
Showing 17 changed files with 191 additions and 180 deletions.
6 changes: 3 additions & 3 deletions src/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
from config.secret import Secret
from config.env import Env

config = Config() # override by setting env CONFIG_FILE
secret = Secret() # override by setting env SECRET_FILE
env = Env() # override with environment variables
config: Config = Config() # override by setting env CONFIG_FILE
secret: Secret = Secret() # override by setting env SECRET_FILE
env: Env = Env() # override with environment variables
5 changes: 3 additions & 2 deletions src/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class IndexConfig:

class Config:
# Migration plan
SBOSC_DB = 'sbosc'
SOURCE_WRITER_ENDPOINT = ''
SOURCE_READER_ENDPOINT = ''
SOURCE_CLUSTER_ID = None # optional
SOURCE_DB = 'sbosc'
SOURCE_DB = None
SOURCE_TABLE = ''
DESTINATION_WRITER_ENDPOINT = None
DESTINATION_READER_ENDPOINT = None
DESTINATION_CLUSTER_ID = None # optional
DESTINATION_DB = 'sbosc'
DESTINATION_DB = None
DESTINATION_TABLE = ''
MIN_CHUNK_SIZE = 100000
MAX_CHUNK_COUNT = 200
Expand Down
4 changes: 2 additions & 2 deletions src/sbosc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def get_migration_id(self):
with self.db.cursor() as cursor:
cursor: Cursor
try:
cursor.execute('''
SELECT id FROM sbosc.migration_plan
cursor.execute(f'''
SELECT id FROM {config.SBOSC_DB}.migration_plan
WHERE source_cluster_id = %s AND source_db = %s AND source_table = %s
AND destination_cluster_id = %s AND destination_db = %s AND destination_table = %s
''', (
Expand Down
27 changes: 14 additions & 13 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def create_bulk_import_chunks(self):
# Save chunk info to database
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany('''
INSERT INTO sbosc.chunk_info (migration_id, chunk_id, start_pk, end_pk, created_at)
cursor.executemany(f'''
INSERT INTO {config.SBOSC_DB}.chunk_info (migration_id, chunk_id, start_pk, end_pk, created_at)
VALUES (%s, %s, %s, %s, NOW())
''', chunks)

Expand All @@ -128,8 +128,8 @@ def validate_bulk_import(self):
self.logger.info("Restoring missing chunks from database")
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute('''
SELECT chunk_id FROM sbosc.chunk_info
cursor.execute(f'''
SELECT chunk_id FROM {config.SBOSC_DB}.chunk_info
WHERE migration_id = %s
''', [self.migration_id])
for chunk_id, in cursor.fetchall():
Expand Down Expand Up @@ -214,8 +214,8 @@ def add_index(self):
cursor: Cursor

index_info = None
cursor.execute('''
SELECT index_name FROM sbosc.index_creation_status
cursor.execute(f'''
SELECT index_name FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL AND started_at IS NOT NULL
''', (self.migration_id,))

Expand All @@ -240,7 +240,7 @@ def add_index(self):

else:
cursor.execute(f'''
SELECT index_name, index_columns, is_unique FROM sbosc.index_creation_status
SELECT index_name, index_columns, is_unique FROM {config.SBOSC_DB}.index_creation_status
WHERE migration_id = %s AND ended_at IS NULL LIMIT {config.INDEX_CREATED_PER_QUERY}
''', (self.migration_id,))

Expand All @@ -259,8 +259,8 @@ def add_index(self):
started_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany('''
UPDATE sbosc.index_creation_status SET started_at = %s
cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET started_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(started_at, self.migration_id, index_name) for index_name in index_names])

Expand All @@ -282,8 +282,8 @@ def add_index(self):
ended_at = datetime.now()
with self.db.cursor() as cursor:
cursor: Cursor
cursor.executemany('''
UPDATE sbosc.index_creation_status SET ended_at = %s
cursor.executemany(f'''
UPDATE {config.SBOSC_DB}.index_creation_status SET ended_at = %s
WHERE migration_id = %s AND index_name = %s
''', [(ended_at, self.migration_id, index_name) for index_name in index_names])

Expand Down Expand Up @@ -362,8 +362,9 @@ def swap_tables(self):
else:
cursor.execute(f"RENAME TABLE {destination_table} TO {source_table}")
self.redis_data.set_current_stage(Stage.DONE)
cursor.execute('''
UPDATE sbosc.migration_plan SET ended_at = FROM_UNIXTIME(%s), final_max_id = %s WHERE id = %s
cursor.execute(f'''
UPDATE {config.SBOSC_DB}.migration_plan
SET ended_at = FROM_UNIXTIME(%s), final_max_id = %s WHERE id = %s
''', (after_rename_table_timestamp, final_max_id, self.migration_id))
self.logger.info("Tables swapped")
self.slack.send_message("Tables swapped", color="good")
Expand Down
20 changes: 10 additions & 10 deletions src/sbosc/controller/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ def __init__(self):
def check_database_setup(self):
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute("SELECT 1 FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = 'sbosc'")
cursor.execute(f"SELECT 1 FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = '{config.SBOSC_DB}'")
if cursor.rowcount == 0:
self.logger.info("SB-OSC database not found")
return False
cursor.execute('''
cursor.execute(f'''
SELECT 1 FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'sbosc' AND TABLE_NAME IN (%s)
WHERE TABLE_SCHEMA = '{config.SBOSC_DB}' AND TABLE_NAME IN (%s)
''' % ','.join(['%s'] * len(REQUIRED_TABLES)), REQUIRED_TABLES)
if cursor.rowcount != len(REQUIRED_TABLES):
self.logger.info("Required tables not found")
Expand All @@ -43,11 +43,11 @@ def check_database_setup(self):
def setup_database(self):
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute("CREATE DATABASE IF NOT EXISTS sbosc;")
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {config.SBOSC_DB};")
self.logger.info("Database created")

# Controller tables
cursor.execute("USE sbosc;")
cursor.execute(f"USE {config.SBOSC_DB};")
cursor.execute('''
CREATE TABLE IF NOT EXISTS migration_plan (
id int PRIMARY KEY AUTO_INCREMENT,
Expand Down Expand Up @@ -190,8 +190,8 @@ def init_migration(self):
with self.db.cursor() as cursor:
# Insert migration plan
cursor: Cursor
cursor.execute('''
INSERT INTO sbosc.migration_plan
cursor.execute(f'''
INSERT INTO {config.SBOSC_DB}.migration_plan
(source_cluster_id, source_db, source_table,
destination_cluster_id, destination_db, destination_table, created_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
Expand All @@ -208,8 +208,8 @@ def init_migration(self):

# Insert index creation status
for index in config.INDEXES:
cursor.execute('''
INSERT INTO sbosc.index_creation_status
cursor.execute(f'''
INSERT INTO {config.SBOSC_DB}.index_creation_status
(migration_id, index_name, index_columns, is_unique, created_at)
VALUES (%s, %s, %s, %s, NOW())
''', (
Expand All @@ -223,7 +223,7 @@ def init_migration(self):
dml_log_tables = [f'{table}_{migration_id}' for table in ['inserted_pk', 'updated_pk', 'deleted_pk']]
for table in dml_log_tables:
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS sbosc.{table} (
CREATE TABLE IF NOT EXISTS {config.SBOSC_DB}.{table} (
source_pk bigint PRIMARY KEY,
event_timestamp bigint,
KEY `idx_{table}_event_timestamp` (`event_timestamp`)
Expand Down
46 changes: 25 additions & 21 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,27 @@ def get_timestamp_range(self):

# Get last validated event timestamp
cursor.execute(f'''
SELECT last_validated_timestamp FROM sbosc.apply_dml_events_validation_status
SELECT last_validated_timestamp FROM {config.SBOSC_DB}.apply_dml_events_validation_status
WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1
''')
if cursor.rowcount > 0:
start_timestamp = cursor.fetchone()[0]
else:
cursor.execute(f'''
SELECT MIN(event_timestamps.min_ts) FROM (
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.inserted_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.updated_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.deleted_pk_{self.migration_id}
SELECT MIN(event_timestamp) AS min_ts
FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts
FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts
FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id}
) AS event_timestamps;
''')
if cursor.rowcount > 0:
start_timestamp = cursor.fetchone()[0]

cursor.execute(f'''
SELECT last_event_timestamp FROM sbosc.event_handler_status
SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status
WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1
''')
if cursor.rowcount > 0:
Expand All @@ -148,7 +151,7 @@ def execute_apply_dml_events_validation_query(
unmatched_pks.extend([(pk, UnmatchType.NOT_UPDATED) for pk in not_updated_pks])
elif table == 'deleted_pk':
source_cursor.execute(f'''
SELECT source_pk FROM sbosc.deleted_pk_{self.migration_id}
SELECT source_pk FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id}
WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp}
''')
if source_cursor.rowcount > 0:
Expand Down Expand Up @@ -188,7 +191,7 @@ def validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_p
dest_cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;")

source_cursor.execute(f'''
SELECT COUNT(1) FROM sbosc.{table}_{self.migration_id}
SELECT COUNT(1) FROM {config.SBOSC_DB}.{table}_{self.migration_id}
WHERE event_timestamp BETWEEN {batch_start_timestamp} AND {batch_end_timestamp}
''')
event_count = source_cursor.fetchone()[0]
Expand Down Expand Up @@ -220,7 +223,7 @@ def validate_unmatched_pks(self):
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(f'''
SELECT source_pk, unmatch_type FROM sbosc.unmatched_rows
SELECT source_pk, unmatch_type FROM {config.SBOSC_DB}.unmatched_rows
WHERE migration_id = {self.migration_id} LIMIT {self.apply_dml_events_batch_size}
''')
if cursor.rowcount > 0:
Expand All @@ -237,7 +240,7 @@ def validate_unmatched_pks(self):
not_updated_pks = not_updated_pks - matched_pks
matched_pks_str = ','.join([str(pk) for pk in matched_pks])
cursor.execute(f'''
DELETE FROM sbosc.unmatched_rows WHERE source_pk IN ({matched_pks_str})
DELETE FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk IN ({matched_pks_str})
AND unmatch_type = '{UnmatchType.NOT_UPDATED}'
''')
if len(not_removed_pks) > 0:
Expand All @@ -246,7 +249,7 @@ def validate_unmatched_pks(self):
not_removed_pks = not_removed_pks - matched_pks
matched_pks_str = ','.join([str(pk) for pk in matched_pks])
cursor.execute(f'''
DELETE FROM sbosc.unmatched_rows WHERE source_pk IN ({matched_pks_str})
DELETE FROM {config.SBOSC_DB}.unmatched_rows WHERE source_pk IN ({matched_pks_str})
AND unmatch_type = '{UnmatchType.NOT_REMOVED}'
''')
self.redis_data.updated_pk_set.add(not_updated_pks - not_removed_pks)
Expand All @@ -262,11 +265,11 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
self.logger.info(f"Start validating DML events from {start_timestamp} to {end_timestamp}")
for table in ['inserted_pk', 'updated_pk', 'deleted_pk']:
cursor.execute(f'''
ANALYZE TABLE sbosc.{table}_{self.migration_id}
ANALYZE TABLE {config.SBOSC_DB}.{table}_{self.migration_id}
''')
cursor.execute(f'''
SELECT TABLE_ROWS FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'sbosc' AND TABLE_NAME = '{table}_{self.migration_id}'
WHERE TABLE_SCHEMA = '{config.SBOSC_DB}' AND TABLE_NAME = '{table}_{self.migration_id}'
''')
table_rows = cursor.fetchone()[0]

Expand All @@ -289,11 +292,12 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
thread.result()

cursor.executemany(f'''
INSERT IGNORE INTO sbosc.unmatched_rows (source_pk, migration_id, unmatch_type)
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)
self.validate_unmatched_pks()
cursor.execute(f"SELECT COUNT(1) FROM sbosc.unmatched_rows WHERE migration_id = {self.migration_id}")
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]

# Even though validation logic is based on data in tables following valid condition can be achieved.
Expand All @@ -316,7 +320,7 @@ def apply_dml_events_validation(self):
with self.db.cursor() as cursor:
cursor: Cursor
cursor.execute(f'''
INSERT INTO sbosc.apply_dml_events_validation_status
INSERT INTO {config.SBOSC_DB}.apply_dml_events_validation_status
(migration_id, last_validated_timestamp, is_valid, created_at)
VALUES ({self.migration_id}, {end_timestamp}, {is_valid}, NOW())
''')
Expand All @@ -332,7 +336,7 @@ def full_dml_event_validation(self):
with self.db.cursor(role='reader') as cursor:
cursor: Cursor
cursor.execute(f'''
SELECT created_at FROM sbosc.full_dml_event_validation_status
SELECT created_at FROM {config.SBOSC_DB}.full_dml_event_validation_status
WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1
''')

Expand All @@ -345,9 +349,9 @@ def full_dml_event_validation(self):

cursor.execute(f'''
SELECT MIN(event_timestamps.min_ts) FROM (
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.inserted_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.updated_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM sbosc.deleted_pk_{self.migration_id}
SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.inserted_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.updated_pk_{self.migration_id} UNION
SELECT MIN(event_timestamp) AS min_ts FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id}
) AS event_timestamps;
''')
if cursor.rowcount > 0:
Expand All @@ -357,7 +361,7 @@ def full_dml_event_validation(self):
return

cursor.execute(f'''
SELECT last_event_timestamp FROM sbosc.event_handler_status
SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status
WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1
''')
if cursor.rowcount > 0:
Expand All @@ -370,7 +374,7 @@ def full_dml_event_validation(self):

with self.db.cursor() as cursor:
cursor.execute(f'''
INSERT INTO sbosc.full_dml_event_validation_status
INSERT INTO {config.SBOSC_DB}.full_dml_event_validation_status
(migration_id, last_validated_timestamp, is_valid, created_at)
VALUES ({self.migration_id}, {end_timestamp}, {is_valid}, NOW())
''')
Expand Down
Loading

0 comments on commit 322d697

Please sign in to comment.