Skip to content

Commit

Permalink
#475 - Ensure workers shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
Hermann Romanek committed Apr 28, 2024
1 parent 48b9645 commit 218b6a3
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/sniffles/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ class SnifflesWorker(threading.Thread):
externals: list = None
recycle: bool = False
_running = False
pid: int = None

class Shutdown(Exception):
"""
Expand All @@ -474,13 +475,17 @@ def __init__(self, process_id: int, config: Namespace, tasks: list[Task], recycl
self.pipe_main, self.pipe_worker = multiprocessing.Pipe()

self.process = multiprocessing.Process(
target=self.run_worker
target=self.run_worker,
daemon=True
)

self._logger = logging.getLogger('sniffles.worker')

super().__init__(target=self.run_parent)

def __str__(self):
return f'Worker {self.id} @ process {self.pid}'

def start(self) -> None:
self._logger.info(f'Starting worker {self.id}')
self._running = True
Expand All @@ -500,7 +505,8 @@ def maybe_recycle(self):
self.process.join(2)
# Start new one
self.process = multiprocessing.Process(
target=self.run_worker
target=self.run_worker,
daemon=True
)
self.process.start()

Expand Down Expand Up @@ -551,7 +557,11 @@ def run_parent(self):
except:
...
else:
self._logger.info(f'Worker {self.id} done')
if self.process.exitcode is None:
self._logger.warning(f'Worker {self.id} refused to shut down gracefully, killing it.')
self.process.kill()
self.process.join(2)
self._logger.info(f'Worker {self.id} done (code {self.process.exitcode}).')

def run_worker(self):
"""
Expand Down

0 comments on commit 218b6a3

Please sign in to comment.