diff --git a/src/sbosc/eventhandler/eventhandler.py b/src/sbosc/eventhandler/eventhandler.py index 6b421aa..259fe7e 100644 --- a/src/sbosc/eventhandler/eventhandler.py +++ b/src/sbosc/eventhandler/eventhandler.py @@ -1,6 +1,5 @@ import concurrent.futures import time -from queue import Queue, Empty from threading import Thread from MySQLdb.cursors import Cursor, DictCursor @@ -85,7 +84,7 @@ def __init__(self): 'passwd': secret.PASSWORD, } - self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.thread_count) + self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=self.thread_count) self.log_file = None self.log_pos = None @@ -258,26 +257,21 @@ def apply_dml_events_pre_validation(self): else: self.redis_data.set_current_stage(Stage.ADD_INDEX) - def parse_binlog_batch(self, thread_id, batch_queue: Queue, done_batch: list): + @staticmethod + def parse_binlog_batch(stream): event_store = EventStore() - while batch_queue.qsize() > 0 and not self.stop_flag: - try: - binlog_file, start_pos = batch_queue.get_nowait() - except Empty: - self.logger.warning('Binlog batch queue is empty') - continue - stream = self.create_binlog_stream(binlog_file, start_pos, thread_id) - for event in stream: - event_store.add_event(event) - if stream.log_file != binlog_file: - break - - done_batch.append((stream.log_file, stream.log_pos)) - stream.close() - return event_store + start_file = stream.log_file + for event in stream: + event_store.add_event(event) + if stream.log_file != start_file: + break + end_file = stream.log_file + end_pos = stream.log_pos + stream.close() + return event_store, (end_file, end_pos) def follow_event_stream(self): - file_queue = Queue() + target_files = [] # Create binlog batch queue with self.db.cursor(DictCursor) as cursor: @@ -293,25 +287,31 @@ def follow_event_stream(self): ] for log_file in binlog_files[:self.thread_count]: start_pos = self.log_pos if log_file == self.log_file else 4 - file_queue.put((log_file, start_pos)) + target_files.append((log_file, start_pos)) # Parse binlog batches threads = [] - done_files = [] - queued_files = file_queue.qsize() event_store = EventStore() result_event_stores = [] + done_files = [] - for i in range(self.thread_count): - threads.append(self.executor.submit(self.parse_binlog_batch, i, file_queue, done_files)) + for thread_id in range(len(target_files)): + binlog_file, start_pos = target_files[thread_id] + stream = self.create_binlog_stream(binlog_file, start_pos, thread_id) + threads.append(self.executor.submit(self.parse_binlog_batch, stream)) done, not_done = concurrent.futures.wait(threads, timeout=self.thread_timeout) if len(not_done) > 0: self.set_stop_flag() raise Exception('Binlog batch parsing timed out') + for thread in threads: - result_event_stores.append(thread.result()) + result_event_store, done_file = thread.result() + result_event_stores.append(result_event_store) + done_files.append(done_file) - if len(done_files) == queued_files: + if self.stop_flag: + self.logger.info('Binlog parsing stopped') + else: self.log_file, self.log_pos = max(done_files) self.handled_binlog_files = self.handled_binlog_files | set([binlog_file for binlog_file, _ in done_files]) @@ -340,8 +340,3 @@ def follow_event_stream(self): if len(binlog_files) == 1: self.redis_data.set_last_catchup_timestamp(last_binlog_check_timestamp) - - elif self.stop_flag: - self.logger.info('Binlog parsing stopped') - else: - self.logger.error('Binlog parsing failed')