From ab99c07e5b972c6ec4312d425b44924481dcc1cb Mon Sep 17 00:00:00 2001 From: Hermann Romanek Date: Sun, 28 Apr 2024 23:25:56 +0200 Subject: [PATCH] #475 - Removed threading from main process (cherry picked from commit 4db0a6994fb34e8c79fb785e507f90214f07817f) --- src/sniffles/parallel.py | 91 ++++++++++++++++++++-------------------- src/sniffles/sniffles | 16 ++++--- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/src/sniffles/parallel.py b/src/sniffles/parallel.py index f51ea44..73cd737 100644 --- a/src/sniffles/parallel.py +++ b/src/sniffles/parallel.py @@ -448,7 +448,7 @@ def execute(self) -> Result: raise SnifflesWorker.Shutdown -class SnifflesWorker(threading.Thread): +class SnifflesWorker: """ Handle for a worker process. Since we're forking, this class will be available in both the parent and the worker processes. @@ -456,7 +456,7 @@ class SnifflesWorker(threading.Thread): id: int # sequential ID of this worker, starting with 0 for the first externals: list = None recycle: bool = False - _running = False + running = True pid: int = None class Shutdown(Exception): @@ -481,16 +481,13 @@ def __init__(self, process_id: int, config: Namespace, tasks: list[Task], recycl self._logger = logging.getLogger('sniffles.worker') - super().__init__(target=self.run_parent) - def __str__(self): return f'Worker {self.id} @ process {self.pid}' def start(self) -> None: self._logger.info(f'Starting worker {self.id}') - self._running = True + self.running = True self.process.start() - return super().start() def maybe_recycle(self): """ @@ -510,58 +507,60 @@ def maybe_recycle(self): ) self.process.start() - def run_parent(self): + def run_parent(self) -> bool: """ Worker thread, running in parent process """ try: - while self._running: - if self.task is None: - # we are not working on something... - if len(self.tasks) > 0: - # ...but there is more work to be done - self.maybe_recycle() - - try: - self.task = self.tasks.pop(0) - except IndexError: - # another worker may have taken the last task - self._logger.debug(f'No more tasks to do for {self.id}') - else: - self.pipe_main.send(self.task) - self._logger.info(f'Dispatched task #{self.task.id} to worker {self.id} ({len(self.tasks)} tasks left)') + if self.task is None: + # we are not working on something... + if len(self.tasks) > 0: + # ...but there is more work to be done + self.maybe_recycle() + + try: + self.task = self.tasks.pop(0) + except IndexError: + # another worker may have taken the last task + self._logger.debug(f'No more tasks to do for {self.id}') else: - # ...and no more work available, so we shut down this worker - self._logger.info(f'Worker {self.id} shutting down...') - self.pipe_main.send(ShutdownTask()) - self._running = False + self.pipe_main.send(self.task) + self._logger.info(f'Dispatched task #{self.task.id} to worker {self.id} ({len(self.tasks)} tasks left)') else: - if self.pipe_main.poll(0.01): - self._logger.debug(f'Worker {self.id} got result for task {self.task.id}...') - result: Result = self.pipe_main.recv() - - if result.error: - self._logger.error(f'Worker {self.id} received error: {result}') - else: - self._logger.info(f'Worker {self.id} got result for task #{result.task_id}') + # ...and no more work available, so we shut down this worker + self._logger.info(f'Worker {self.id} shutting down...') + self.pipe_main.send(ShutdownTask()) + self.running = False + else: + if self.pipe_main.poll(0.01): + self._logger.debug(f'Worker {self.id} got result for task {self.task.id}...') + result: Result = self.pipe_main.recv() - self.task.add_result(result) - self.finished_tasks.append(self.task) - self.task = None + if result.error: + self._logger.error(f'Worker {self.id} received error: {result}') + else: + self._logger.info(f'Worker {self.id} got result for task #{result.task_id}') - self.process.join(10) + self.task.add_result(result) + self.finished_tasks.append(self.task) + self.task = None except: self._logger.exception(f'Unhandled error in worker {self.id}. This may result in an orphened worker process.') try: self.process.kill() except: ... - else: - if self.process.exitcode is None: - self._logger.warning(f'Worker {self.id} refused to shut down gracefully, killing it.') - self.process.kill() - self.process.join(2) - self._logger.info(f'Worker {self.id} done (code {self.process.exitcode}).') + + return self.running + + def finalize(self): + self.process.join(10) + + if self.process.exitcode is None: + self._logger.warning(f'Worker {self.id} refused to shut down gracefully, killing it.') + self.process.kill() + self.process.join(2) + self._logger.info(f'Worker {self.id} done (code {self.process.exitcode}).') def run_worker(self): """ @@ -569,7 +568,7 @@ def run_worker(self): """ self.pid = os.getpid() - while self._running: + while self.running: try: self._logger.debug(f'Worker {self.id} ({self.pid}) waiting for tasks...') @@ -587,7 +586,7 @@ def run_worker(self): del task gc.collect() except self.Shutdown: - self._running = False + self.running = False except Exception as e: self._logger.exception(f'Error in worker process') self.pipe_worker.send(ErrorResult(e)) diff --git a/src/sniffles/sniffles b/src/sniffles/sniffles index 7eca35b..beb2c4a 100755 --- a/src/sniffles/sniffles +++ b/src/sniffles/sniffles @@ -415,9 +415,6 @@ def Sniffles2_Main(processes: list[parallel.SnifflesWorker]): if config.vcf is not None and config.sort: task_id_calls = {} - for proc in processes: - proc.start() - log.info("") if config.mode == "call_sample" or config.mode == "genotype_vcf": if config.input_is_cram: @@ -434,10 +431,17 @@ def Sniffles2_Main(processes: list[parallel.SnifflesWorker]): # analysis_start_time = time.time() + for p in processes: + p.start() + finished_tasks: list[parallel.Task] = [] - for proc in processes: - proc.join() - finished_tasks.extend(proc.finished_tasks) + + while any([p.run_parent() for p in processes if p.running]): + time.sleep(0.1) + + for p in processes: + p.finalize() + finished_tasks.extend(p.finished_tasks) log.info(f"Took {time.time() - analysis_start_time:.2f}s.") log.info("")