Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Michael Smith <[email protected]>
  • Loading branch information
tlrmchlsmth committed Nov 26, 2024
1 parent 9322db5 commit b5bac31
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
2 changes: 1 addition & 1 deletion vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def initialize(self, num_gpu_blocks: int) -> None:
success_vals = self.run_on_workers('initialize', num_gpu_blocks)
if not all(success_vals):
raise RuntimeError("Worker initialization failed.")

self.scheduler_output_mq.wait_until_ready()
self.model_output_mq.wait_until_ready()

Expand Down
18 changes: 10 additions & 8 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ class WorkerInitRequestType:
without separate encoding step.
"""
DETERMINE_NUM_BLOCKS = b'\x00'
INITIALIZE = b'\x01' # Initialize cache and begin worker execution
INITIALIZE = b'\x01' # Initialize cache and begin worker execution


@dataclass
class WorkerInitResponseType:
Expand Down Expand Up @@ -264,7 +265,7 @@ def determine_num_available_blocks(self) -> Tuple[int, int]:
send_socket.send_multipart(
(WorkerInitRequestType.DETERMINE_NUM_BLOCKS, ))

# Receive response
# Receive response
type_frame, data_frame = recv_socket.recv_multipart(copy=False)
response_type = type_frame.buffer
response_data = data_frame.buffer
Expand All @@ -278,7 +279,7 @@ def initialize(self, num_gpu_blocks: int) -> bool:
zmq.constants.PUSH) as send_socket, \
make_zmq_socket(self.initialization_input_path,
zmq.constants.PULL) as recv_socket:

# Send initialization message
msg = pickle.dumps(num_gpu_blocks,
protocol=pickle.HIGHEST_PROTOCOL)
Expand All @@ -292,6 +293,7 @@ def initialize(self, num_gpu_blocks: int) -> bool:
raise ValueError(f"Unknown RequestType: {response_type}")
return pickle.loads(response_data)


class WorkerProc:
"""Wrapper that runs one Worker in a separate process."""

Expand Down Expand Up @@ -326,7 +328,8 @@ def __init__(
zmq.constants.PUSH) as ready_socket:
payload = pickle.dumps(output_mq_handle,
protocol=pickle.HIGHEST_PROTOCOL)
ready_socket.send_multipart((WorkerInitResponseType.READY, payload))
ready_socket.send_multipart(
(WorkerInitResponseType.READY, payload))

self.worker.initialize()
self.worker.load_model()
Expand Down Expand Up @@ -434,7 +437,7 @@ def model_initialization_loop(self, init_input_path, init_output_path):
copy=False)
elif request_type == WorkerInitRequestType.INITIALIZE:
# Initialize cache with the number of requested gpu blocks
try:
try:
request_data = request[1].buffer
num_gpu_blocks = pickle.loads(request_data)
self.worker.initialize_cache(num_gpu_blocks)
Expand All @@ -451,20 +454,19 @@ def model_initialization_loop(self, init_input_path, init_output_path):
raise e

# Send a success response. Order is important:
# The executor will call wait_until_ready() on its
# The executor will call wait_until_ready() on its
# message queues after receiving this message.
send_socket.send_multipart(
(WorkerInitResponseType.INITIALIZE_SUCCESS,
pickle.dumps(True)),
copy=False)

# Ensure message queues are ready.
# Ensure message queues are ready.
# Must happen after sending the INITIALIZE_SUCESS message.
self.scheduler_output_receiver.wait_until_ready()
if self.model_output_mq is not None:
self.model_output_mq.wait_until_ready()


# Exit initialization loop to begin model execution loop
return
else:
Expand Down

0 comments on commit b5bac31

Please sign in to comment.