Skip to content

Commit

Permalink
#475 - Removed threading from main process
Browse files Browse the repository at this point in the history
(cherry picked from commit 4db0a69)
  • Loading branch information
Hermann Romanek committed May 6, 2024
1 parent 218b6a3 commit ab99c07
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 52 deletions.
91 changes: 45 additions & 46 deletions src/sniffles/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,15 +448,15 @@ def execute(self) -> Result:
raise SnifflesWorker.Shutdown


class SnifflesWorker(threading.Thread):
class SnifflesWorker:
"""
Handle for a worker process. Since we're forking, this class will be available in
both the parent and the worker processes.
"""
id: int # sequential ID of this worker, starting with 0 for the first
externals: list = None
recycle: bool = False
_running = False
running = True
pid: int = None

class Shutdown(Exception):
Expand All @@ -481,16 +481,13 @@ def __init__(self, process_id: int, config: Namespace, tasks: list[Task], recycl

self._logger = logging.getLogger('sniffles.worker')

super().__init__(target=self.run_parent)

def __str__(self):
return f'Worker {self.id} @ process {self.pid}'

def start(self) -> None:
self._logger.info(f'Starting worker {self.id}')
self._running = True
self.running = True
self.process.start()
return super().start()

def maybe_recycle(self):
"""
Expand All @@ -510,66 +507,68 @@ def maybe_recycle(self):
)
self.process.start()

def run_parent(self):
def run_parent(self) -> bool:
"""
Worker thread, running in parent process
"""
try:
while self._running:
if self.task is None:
# we are not working on something...
if len(self.tasks) > 0:
# ...but there is more work to be done
self.maybe_recycle()

try:
self.task = self.tasks.pop(0)
except IndexError:
# another worker may have taken the last task
self._logger.debug(f'No more tasks to do for {self.id}')
else:
self.pipe_main.send(self.task)
self._logger.info(f'Dispatched task #{self.task.id} to worker {self.id} ({len(self.tasks)} tasks left)')
if self.task is None:
# we are not working on something...
if len(self.tasks) > 0:
# ...but there is more work to be done
self.maybe_recycle()

try:
self.task = self.tasks.pop(0)
except IndexError:
# another worker may have taken the last task
self._logger.debug(f'No more tasks to do for {self.id}')
else:
# ...and no more work available, so we shut down this worker
self._logger.info(f'Worker {self.id} shutting down...')
self.pipe_main.send(ShutdownTask())
self._running = False
self.pipe_main.send(self.task)
self._logger.info(f'Dispatched task #{self.task.id} to worker {self.id} ({len(self.tasks)} tasks left)')
else:
if self.pipe_main.poll(0.01):
self._logger.debug(f'Worker {self.id} got result for task {self.task.id}...')
result: Result = self.pipe_main.recv()

if result.error:
self._logger.error(f'Worker {self.id} received error: {result}')
else:
self._logger.info(f'Worker {self.id} got result for task #{result.task_id}')
# ...and no more work available, so we shut down this worker
self._logger.info(f'Worker {self.id} shutting down...')
self.pipe_main.send(ShutdownTask())
self.running = False
else:
if self.pipe_main.poll(0.01):
self._logger.debug(f'Worker {self.id} got result for task {self.task.id}...')
result: Result = self.pipe_main.recv()

self.task.add_result(result)
self.finished_tasks.append(self.task)
self.task = None
if result.error:
self._logger.error(f'Worker {self.id} received error: {result}')
else:
self._logger.info(f'Worker {self.id} got result for task #{result.task_id}')

self.process.join(10)
self.task.add_result(result)
self.finished_tasks.append(self.task)
self.task = None
except:
self._logger.exception(f'Unhandled error in worker {self.id}. This may result in an orphened worker process.')
try:
self.process.kill()
except:
...
else:
if self.process.exitcode is None:
self._logger.warning(f'Worker {self.id} refused to shut down gracefully, killing it.')
self.process.kill()
self.process.join(2)
self._logger.info(f'Worker {self.id} done (code {self.process.exitcode}).')

return self.running

def finalize(self):
self.process.join(10)

if self.process.exitcode is None:
self._logger.warning(f'Worker {self.id} refused to shut down gracefully, killing it.')
self.process.kill()
self.process.join(2)
self._logger.info(f'Worker {self.id} done (code {self.process.exitcode}).')

def run_worker(self):
"""
Entry point/main loop for the worker process
"""
self.pid = os.getpid()

while self._running:
while self.running:
try:
self._logger.debug(f'Worker {self.id} ({self.pid}) waiting for tasks...')

Expand All @@ -587,7 +586,7 @@ def run_worker(self):
del task
gc.collect()
except self.Shutdown:
self._running = False
self.running = False
except Exception as e:
self._logger.exception(f'Error in worker process')
self.pipe_worker.send(ErrorResult(e))
Expand Down
16 changes: 10 additions & 6 deletions src/sniffles/sniffles
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,6 @@ def Sniffles2_Main(processes: list[parallel.SnifflesWorker]):
if config.vcf is not None and config.sort:
task_id_calls = {}

for proc in processes:
proc.start()

log.info("")
if config.mode == "call_sample" or config.mode == "genotype_vcf":
if config.input_is_cram:
Expand All @@ -434,10 +431,17 @@ def Sniffles2_Main(processes: list[parallel.SnifflesWorker]):
#
analysis_start_time = time.time()

for p in processes:
p.start()

finished_tasks: list[parallel.Task] = []
for proc in processes:
proc.join()
finished_tasks.extend(proc.finished_tasks)

while any([p.run_parent() for p in processes if p.running]):
time.sleep(0.1)

for p in processes:
p.finalize()
finished_tasks.extend(p.finished_tasks)

log.info(f"Took {time.time() - analysis_start_time:.2f}s.")
log.info("")
Expand Down

0 comments on commit ab99c07

Please sign in to comment.