From f26c4aeecba481ce1445be7a998b0b97460a13bb Mon Sep 17 00:00:00 2001 From: Rui Qiao <161574667+ruisearch42@users.noreply.github.com> Date: Wed, 18 Dec 2024 23:38:02 -0800 Subject: [PATCH] [Misc] Optimize ray worker initialization time (#11275) Signed-off-by: Rui Qiao Co-authored-by: Cody Yu --- vllm/executor/ray_gpu_executor.py | 35 +++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 4bf5cbbd18ffe..e2c549cbd5331 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -123,6 +123,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Create the workers. driver_ip = get_ip() + workers = [] for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue @@ -138,20 +139,30 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote(vllm_config=self.vllm_config) + workers.append(worker) - if self.use_ray_spmd_worker: - self.workers.append(worker) - else: - worker_ip = ray.get(worker.get_node_ip.remote()) - if worker_ip == driver_ip and self.driver_dummy_worker is None: + worker_ip_refs = [ + worker.get_node_ip.remote() # type: ignore[attr-defined] + for worker in workers + ] + worker_ips = ray.get(worker_ip_refs) + + if not self.use_ray_spmd_worker: + for i in range(len(workers)): + worker = workers[i] + worker_ip = worker_ips[i] + if self.driver_dummy_worker is None and worker_ip == driver_ip: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker self.driver_worker = RayWorkerWrapper( vllm_config=self.vllm_config) - else: - # Else, added to the list of workers. - self.workers.append(worker) + workers.pop(i) + worker_ips.pop(i) + self.workers = workers + break + else: + self.workers = workers logger.debug("workers: %s", self.workers) logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker) @@ -161,14 +172,12 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", "adjusting the Ray placement group or running the driver on a " "GPU node.") - worker_ips = [ - ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined] - for worker in self.workers - ] ip_counts: Dict[str, int] = {} for ip in worker_ips: ip_counts[ip] = ip_counts.get(ip, 0) + 1 + worker_to_ip = dict(zip(self.workers, worker_ips)) + def sort_by_driver_then_worker_ip(worker): """ Sort the workers based on 3 properties: @@ -179,7 +188,7 @@ def sort_by_driver_then_worker_ip(worker): 3. Finally, if the work is on a node with smaller IP address, it should be placed first. """ - ip = ray.get(worker.get_node_ip.remote()) + ip = worker_to_ip[worker] return (ip != driver_ip, ip_counts[ip], ip) # After sorting, the workers on the same node will be