Skip to content

Commit

Permalink
Merge pull request #19 from sendbird/release-1.0.2
Browse files Browse the repository at this point in the history
Release v1.0.2
  • Loading branch information
jjh-kim authored Jul 26, 2024
2 parents dd2fcde + c531ba0 commit 6bef254
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 108 deletions.
4 changes: 3 additions & 1 deletion doc/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
This sections provides list of possible issues and solutions that may occur when using SB-OSC.

### apply_dml_events_validation_batch_size
When setting `apply_dml_events_validation_batch_size` there are two factors to consider. Since the binlog resolution is in seconds, if the number of DML events in a second is greater than the batch size, the validation process can hang indefinitely. In this case, it is recommended to increase the batch size.
~~When setting `apply_dml_events_validation_batch_size` there are two factors to consider. Since the binlog resolution is in seconds, if the number of DML events in a second is greater than the batch size, the validation process can hang indefinitely. In this case, it is recommended to increase the batch size.~~
-> This issue was fixed by [#10](https://github.com/sendbird/sb-osc/pull/10)


Another factor is `max_allowed_packet` of MySQL. Apply DML events stage uses query with IN clause containing `apply_dml_events_validation_batch_size` number of PKs. If the size of this query exceeds `max_allowed_packet`, the query will not return properly. In this case, it is recommended to decrease the batch size. Also, you might need to kill running queries since it may hang indefinitely in this case.

Expand Down
21 changes: 12 additions & 9 deletions src/sbosc/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ def apply_dml_events_validation(self):
is_valid = self.validator.apply_dml_events_validation()

if is_valid:
# Analyze table
with self.db.cursor(host='dest') as cursor:
cursor: Cursor
metadata = self.redis_data.metadata
cursor.execute(f"ANALYZE TABLE {metadata.destination_db}.{metadata.destination_table}")
self.logger.info("Finished ANALYZE TABLE on destination table")
full_dml_event_validation_executed = self.validator.full_dml_event_validation()
if full_dml_event_validation_executed: # Validation did not skip
# Returning will call apply_dml_events_validation again
# full_dml_event_validation may take a long time
# So, apply_dml_events_validation needs to be called again to validate the latest DML events
return

if not self.is_preferred_window():
self.logger.info("Waiting for preferred window")
Expand All @@ -194,9 +194,12 @@ def apply_dml_events_validation(self):
time.sleep(config.WAIT_INTERVAL_UNTIL_AUTO_SWAP_IN_SECONDS)
return

is_valid = self.validator.full_dml_event_validation()
if is_valid is not None: # Validation did not skip
return
# Analyze table
with self.db.cursor(host='dest') as cursor:
cursor: Cursor
metadata = self.redis_data.metadata
cursor.execute(f"ANALYZE TABLE {metadata.destination_db}.{metadata.destination_table}")
self.logger.info("Finished ANALYZE TABLE on destination table")

self.redis_data.set_current_stage(Stage.SWAP_TABLES)
self.interval = 1
Expand Down
108 changes: 62 additions & 46 deletions src/sbosc/controller/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import MySQLdb
from MySQLdb.cursors import Cursor

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Generator, List

from modules.db import Database
from sbosc.exceptions import StopFlagSet
Expand Down Expand Up @@ -39,7 +39,7 @@ def __init__(self, controller: 'Controller'):
def set_stop_flag(self):
self.stop_flag = True

def handle_operational_error(self, e, range_queue, start_range, end_range):
def __handle_operational_error(self, e, range_queue, start_range, end_range):
if e.args[0] == 2013:
self.logger.warning("Query timeout. Retry with smaller batch size")
range_queue.put((start_range, start_range + (end_range - start_range) // 2))
Expand All @@ -50,7 +50,7 @@ def handle_operational_error(self, e, range_queue, start_range, end_range):
range_queue.put((start_range, end_range))
time.sleep(3)

def validate_bulk_import_batch(self, range_queue: Queue, failed_pks):
def __validate_bulk_import_batch(self, range_queue: Queue, failed_pks):
with self.source_conn_pool.get_connection() as source_conn, self.dest_conn_pool.get_connection() as dest_conn:
while not range_queue.empty():
if len(failed_pks) > 0:
Expand All @@ -68,7 +68,7 @@ def validate_bulk_import_batch(self, range_queue: Queue, failed_pks):
failed_pks.extend(not_imported_pks)
return False
except MySQLdb.OperationalError as e:
self.handle_operational_error(e, range_queue, batch_start_pk, batch_end_pk)
self.__handle_operational_error(e, range_queue, batch_start_pk, batch_end_pk)
source_conn.ping(True)
dest_conn.ping(True)
continue
Expand All @@ -83,23 +83,23 @@ def bulk_import_validation(self):
metadata = self.redis_data.metadata
range_queue = Queue()
start_pk = 0
while start_pk < metadata.max_id:
while start_pk <= metadata.max_id:
range_queue.put((start_pk, min(start_pk + self.bulk_import_batch_size, metadata.max_id)))
start_pk += self.bulk_import_batch_size + 1
failed_pks = []

with concurrent.futures.ThreadPoolExecutor(max_workers=self.thread_count) as executor:
threads = []
for _ in range(self.thread_count):
threads.append(executor.submit(self.validate_bulk_import_batch, range_queue, failed_pks))
threads.append(executor.submit(self.__validate_bulk_import_batch, range_queue, failed_pks))
is_valid = all([thread.result() for thread in threads])
if not is_valid:
self.logger.critical(f"Failed to validate bulk import. Failed pks: {failed_pks}")
else:
self.logger.info("Bulk import validation succeeded")
return is_valid

def get_timestamp_range(self):
def __get_timestamp_range(self):
start_timestamp = None
end_timestamp = None
with self.db.cursor() as cursor:
Expand All @@ -126,6 +126,8 @@ def get_timestamp_range(self):
if cursor.rowcount > 0:
start_timestamp = cursor.fetchone()[0]

# This ensures that all events up to the last event timestamp are all saved in the event tables
# save_current_binlog_position are called after save_events_to_db
cursor.execute(f'''
SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status
WHERE migration_id = {self.migration_id} ORDER BY id DESC LIMIT 1
Expand All @@ -134,52 +136,57 @@ def get_timestamp_range(self):
end_timestamp = cursor.fetchone()[0]
return start_timestamp, end_timestamp

def execute_apply_dml_events_validation_query(
self, source_cursor, dest_cursor, table, start_timestamp, end_timestamp, unmatched_pks):
def __execute_apply_dml_events_validation_query(
self, source_cursor, dest_cursor, table, event_pks: list, unmatched_pks: list):
metadata = self.redis_data.metadata
if table == 'inserted_pk':
not_inserted_pks = self.migration_operation.get_not_inserted_pks(
source_cursor, dest_cursor, start_timestamp, end_timestamp)
not_inserted_pks = self.migration_operation.get_not_inserted_pks(source_cursor, dest_cursor, event_pks)
if not_inserted_pks:
self.logger.warning(f"Found {len(not_inserted_pks)} unmatched inserted pks")
self.logger.warning(f"Found {len(not_inserted_pks)} unmatched inserted pks: {not_inserted_pks}")
unmatched_pks.extend([(pk, UnmatchType.NOT_UPDATED) for pk in not_inserted_pks])
elif table == 'updated_pk':
not_updated_pks = self.migration_operation.get_not_updated_pks(
source_cursor, dest_cursor, start_timestamp, end_timestamp)
not_updated_pks = self.migration_operation.get_not_updated_pks(source_cursor, dest_cursor, event_pks)
if not_updated_pks:
self.logger.warning(f"Found {len(not_updated_pks)} unmatched updated pks")
self.logger.warning(f"Found {len(not_updated_pks)} unmatched updated pks: {not_updated_pks}")
unmatched_pks.extend([(pk, UnmatchType.NOT_UPDATED) for pk in not_updated_pks])
elif table == 'deleted_pk':
source_cursor.execute(f'''
SELECT source_pk FROM {config.SBOSC_DB}.deleted_pk_{self.migration_id}
WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp}
''')
if source_cursor.rowcount > 0:
target_pks = ','.join([str(row[0]) for row in source_cursor.fetchall()])
if event_pks:
event_pks_str = ','.join([str(pk) for pk in event_pks])
dest_cursor.execute(f'''
SELECT id FROM {metadata.destination_db}.{metadata.destination_table} WHERE id IN ({target_pks})
SELECT id FROM {metadata.destination_db}.{metadata.destination_table} WHERE id IN ({event_pks_str})
''')
deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
not_deleted_pks = set([row[0] for row in dest_cursor.fetchall()])
if dest_cursor.rowcount > 0:
# Check if deleted pks are reinserted
source_cursor.execute(f'''
SELECT id FROM {metadata.source_db}.{metadata.source_table} WHERE id IN ({target_pks})
SELECT id FROM {metadata.source_db}.{metadata.source_table} WHERE id IN ({event_pks_str})
''')
reinserted_pks = set([row[0] for row in source_cursor.fetchall()])
if reinserted_pks:
deleted_pks = deleted_pks - reinserted_pks
self.logger.warning(f"Found {len(reinserted_pks)} reinserted pks")
self.logger.warning(f"Found {len(deleted_pks)} unmatched deleted pks")
unmatched_pks.extend([(pk, UnmatchType.NOT_REMOVED) for pk in deleted_pks])

def validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_pks):
not_deleted_pks = not_deleted_pks - reinserted_pks
self.logger.warning(f"Found {len(reinserted_pks)} reinserted pks: {reinserted_pks}")
self.logger.warning(f"Found {len(not_deleted_pks)} unmatched deleted pks: {not_deleted_pks}")
unmatched_pks.extend([(pk, UnmatchType.NOT_REMOVED) for pk in not_deleted_pks])

def __get_event_pk_batch(self, cursor, table, start_timestamp, end_timestamp) -> Generator[List[int], None, None]:
cursor.execute(f'''
SELECT source_pk FROM {config.SBOSC_DB}.{table}_{self.migration_id}
WHERE event_timestamp BETWEEN {start_timestamp} AND {end_timestamp}
''')
event_pks = [row[0] for row in cursor.fetchall()]
while event_pks:
yield event_pks[:self.apply_dml_events_batch_size]
event_pks = event_pks[self.apply_dml_events_batch_size:]

def __validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_pks):
with self.source_conn_pool.get_connection() as source_conn, self.dest_conn_pool.get_connection() as dest_conn:
while not range_queue.empty():
if self.stop_flag:
raise StopFlagSet()

try:
batch_start_timestamp, batch_end_timestamp = range_queue.get_nowait()
self.logger.info(f"Validating {table} from {batch_start_timestamp} to {batch_end_timestamp}")
except Empty:
self.logger.warning("Range queue is empty")
continue
Expand All @@ -195,7 +202,7 @@ def validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_p
WHERE event_timestamp BETWEEN {batch_start_timestamp} AND {batch_end_timestamp}
''')
event_count = source_cursor.fetchone()[0]
if event_count > self.apply_dml_events_batch_size:
if event_count > self.apply_dml_events_batch_size and batch_end_timestamp > batch_start_timestamp:
range_queue.put((
batch_start_timestamp,
batch_start_timestamp + (batch_end_timestamp - batch_start_timestamp) // 2
Expand All @@ -208,17 +215,20 @@ def validate_apply_dml_events_batch(self, table, range_queue: Queue, unmatched_p

else:
try:
self.execute_apply_dml_events_validation_query(
source_cursor, dest_cursor, table,
batch_start_timestamp, batch_end_timestamp, unmatched_pks
event_pk_batch = self.__get_event_pk_batch(
source_cursor, table, batch_start_timestamp, batch_end_timestamp
)
while event_pks := next(event_pk_batch, None):
self.__execute_apply_dml_events_validation_query(
source_cursor, dest_cursor, table, event_pks, unmatched_pks
)
except MySQLdb.OperationalError as e:
self.handle_operational_error(e, range_queue, batch_start_timestamp, batch_end_timestamp)
self.__handle_operational_error(e, range_queue, batch_start_timestamp, batch_end_timestamp)
source_conn.ping(True)
dest_conn.ping(True)
continue

def validate_unmatched_pks(self):
def __validate_unmatched_pks(self):
self.logger.info("Validating unmatched pks")
with self.db.cursor() as cursor:
cursor: Cursor
Expand Down Expand Up @@ -276,7 +286,7 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
if table_rows > 0:
range_queue = Queue()
batch_start_timestamp = start_timestamp
while batch_start_timestamp < end_timestamp:
while batch_start_timestamp <= end_timestamp:
batch_duration = \
(end_timestamp - start_timestamp) * self.apply_dml_events_batch_size // table_rows
batch_end_timestamp = min(batch_start_timestamp + batch_duration, end_timestamp)
Expand All @@ -287,15 +297,15 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
threads = []
for _ in range(self.thread_count):
threads.append(executor.submit(
self.validate_apply_dml_events_batch, table, range_queue, unmatched_pks))
self.__validate_apply_dml_events_batch, table, range_queue, unmatched_pks))
for thread in threads:
thread.result()

cursor.executemany(f'''
INSERT IGNORE INTO {config.SBOSC_DB}.unmatched_rows (source_pk, migration_id, unmatch_type)
VALUES (%s, {self.migration_id}, %s)
''', unmatched_pks)
self.validate_unmatched_pks()
self.__validate_unmatched_pks()
cursor.execute(
f"SELECT COUNT(1) FROM {config.SBOSC_DB}.unmatched_rows WHERE migration_id = {self.migration_id}")
unmatched_rows = cursor.fetchone()[0]
Expand All @@ -307,7 +317,7 @@ def validate_apply_dml_events(self, start_timestamp, end_timestamp):
def apply_dml_events_validation(self):
self.logger.info("Start apply DML events validation")

start_timestamp, end_timestamp = self.get_timestamp_range()
start_timestamp, end_timestamp = self.__get_timestamp_range()
if start_timestamp is None:
self.logger.warning("No events found. Skipping apply DML events validation")
return True
Expand All @@ -329,8 +339,12 @@ def apply_dml_events_validation(self):

def full_dml_event_validation(self):
"""
:return: True if validation succeeded, False if validation failed, None if validation is skipped
:return: True if validation ran, False if validation skipped
"""
if self.full_dml_event_validation_interval == 0:
self.logger.info("Full DML event validation is disabled")
return False

self.logger.info("Start full DML event validation")

with self.db.cursor(role='reader') as cursor:
Expand All @@ -344,8 +358,10 @@ def full_dml_event_validation(self):
last_validation_time = cursor.fetchone()[0]
if datetime.now() - last_validation_time < timedelta(hours=self.full_dml_event_validation_interval):
self.logger.info(
"Last validation was done less than 1 hour ago. Skipping full DML event validation")
return
f"Last validation was done less than {self.full_dml_event_validation_interval} hour ago. "
f"Skipping full DML event validation"
)
return False

cursor.execute(f'''
SELECT MIN(event_timestamps.min_ts) FROM (
Expand All @@ -358,7 +374,7 @@ def full_dml_event_validation(self):
start_timestamp = cursor.fetchone()[0]
if start_timestamp is None:
self.logger.warning("No events found. Skipping full DML event validation")
return
return False

cursor.execute(f'''
SELECT last_event_timestamp FROM {config.SBOSC_DB}.event_handler_status
Expand All @@ -368,7 +384,7 @@ def full_dml_event_validation(self):
end_timestamp = cursor.fetchone()[0]
if end_timestamp is None:
self.logger.warning("Failed to get valid end_timestamp")
return
return False

is_valid = self.validate_apply_dml_events(start_timestamp, end_timestamp)

Expand All @@ -379,4 +395,4 @@ def full_dml_event_validation(self):
VALUES ({self.migration_id}, {end_timestamp}, {is_valid}, NOW())
''')

return is_valid
return True
Loading

0 comments on commit 6bef254

Please sign in to comment.