From b5bac3109adacc081e00d5f12d508d0e51ea92cb Mon Sep 17 00:00:00 2001 From: Tyler Michael Smith Date: Tue, 26 Nov 2024 14:13:04 -0500 Subject: [PATCH] format Signed-off-by: Tyler Michael Smith --- vllm/v1/executor/multiproc_executor.py | 2 +- vllm/v1/worker/gpu_worker.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 5c5558be8a9e6..d884fefd6b684 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -111,7 +111,7 @@ def initialize(self, num_gpu_blocks: int) -> None: success_vals = self.run_on_workers('initialize', num_gpu_blocks) if not all(success_vals): raise RuntimeError("Worker initialization failed.") - + self.scheduler_output_mq.wait_until_ready() self.model_output_mq.wait_until_ready() diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index bd836c70db8a5..89807b20ceca8 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -234,7 +234,8 @@ class WorkerInitRequestType: without separate encoding step. """ DETERMINE_NUM_BLOCKS = b'\x00' - INITIALIZE = b'\x01' # Initialize cache and begin worker execution + INITIALIZE = b'\x01' # Initialize cache and begin worker execution + @dataclass class WorkerInitResponseType: @@ -264,7 +265,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: send_socket.send_multipart( (WorkerInitRequestType.DETERMINE_NUM_BLOCKS, )) - # Receive response + # Receive response type_frame, data_frame = recv_socket.recv_multipart(copy=False) response_type = type_frame.buffer response_data = data_frame.buffer @@ -278,7 +279,7 @@ def initialize(self, num_gpu_blocks: int) -> bool: zmq.constants.PUSH) as send_socket, \ make_zmq_socket(self.initialization_input_path, zmq.constants.PULL) as recv_socket: - + # Send initialization message msg = pickle.dumps(num_gpu_blocks, protocol=pickle.HIGHEST_PROTOCOL) @@ -292,6 +293,7 @@ def initialize(self, num_gpu_blocks: int) -> bool: raise ValueError(f"Unknown RequestType: {response_type}") return pickle.loads(response_data) + class WorkerProc: """Wrapper that runs one Worker in a separate process.""" @@ -326,7 +328,8 @@ def __init__( zmq.constants.PUSH) as ready_socket: payload = pickle.dumps(output_mq_handle, protocol=pickle.HIGHEST_PROTOCOL) - ready_socket.send_multipart((WorkerInitResponseType.READY, payload)) + ready_socket.send_multipart( + (WorkerInitResponseType.READY, payload)) self.worker.initialize() self.worker.load_model() @@ -434,7 +437,7 @@ def model_initialization_loop(self, init_input_path, init_output_path): copy=False) elif request_type == WorkerInitRequestType.INITIALIZE: # Initialize cache with the number of requested gpu blocks - try: + try: request_data = request[1].buffer num_gpu_blocks = pickle.loads(request_data) self.worker.initialize_cache(num_gpu_blocks) @@ -451,20 +454,19 @@ def model_initialization_loop(self, init_input_path, init_output_path): raise e # Send a success response. Order is important: - # The executor will call wait_until_ready() on its + # The executor will call wait_until_ready() on its # message queues after receiving this message. send_socket.send_multipart( (WorkerInitResponseType.INITIALIZE_SUCCESS, pickle.dumps(True)), copy=False) - # Ensure message queues are ready. + # Ensure message queues are ready. # Must happen after sending the INITIALIZE_SUCESS message. self.scheduler_output_receiver.wait_until_ready() if self.model_output_mq is not None: self.model_output_mq.wait_until_ready() - # Exit initialization loop to begin model execution loop return else: