Skip to content

Commit

Permalink
v1: Make MultiprocExecutor.workers non-optional
Browse files Browse the repository at this point in the history
Suggestion from @tlrmchlsmth.

Signed-off-by: Mark McLoughlin <[email protected]>
  • Loading branch information
markmc committed Dec 13, 2024
1 parent 8d022d8 commit 75cd514
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, vllm_config: VllmConfig) -> None:
scheduler_output_handle = self.rpc_broadcast_mq.export_handle()

# Create workers
self.workers: Optional[List[WorkerProcHandle]] = []
self.workers: List[WorkerProcHandle] = []
for rank in range(self.world_size):
worker = WorkerProc.make_worker_process(vllm_config, rank, rank,
distributed_init_method,
Expand Down Expand Up @@ -125,7 +125,6 @@ def collective_rpc(self,
self.rpc_broadcast_mq.enqueue((method, args, kwargs))

responses = [None] * self.world_size
assert self.workers is not None
for w in self.workers:
dequeue_timeout = timeout - (time.monotonic() - start_time
) if timeout is not None else None
Expand Down Expand Up @@ -173,7 +172,6 @@ def wait_for_termination(procs, timeout):
return False

# Send SIGTERM if still running
assert self.workers is not None
active_procs = [w.proc for w in self.workers if w.proc.is_alive()]
for p in active_procs:
p.terminate()
Expand All @@ -184,10 +182,8 @@ def wait_for_termination(procs, timeout):
p.kill()

self._cleanup_sockets()
self.workers = None

def _cleanup_sockets(self):
assert self.workers is not None
for w in self.workers:
# Remove the zmq ipc socket file
socket_path = w.ready_path.replace("ipc://", "")
Expand All @@ -196,7 +192,8 @@ def _cleanup_sockets(self):

def shutdown(self):
"""Properly shut down the executor and its workers"""
if (hasattr(self, 'workers') and self.workers is not None):
if getattr(self, 'shutting_down', False):
self.shutting_down = True
for w in self.workers: #TODO: not sure if needed
w.worker_response_mq = None
self._ensure_worker_termination()
Expand Down

0 comments on commit 75cd514

Please sign in to comment.