Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
robertgshaw2-redhat committed Nov 6, 2024
1 parent cf5e63c commit 11498be
Showing 1 changed file with 67 additions and 70 deletions.
137 changes: 67 additions & 70 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def from_engine_args(
)

def shutdown(self):
"""Shutdown the EngineCore."""
self.engine_core.shutdown()

if hasattr(self, "output_handler"):
Expand All @@ -118,6 +117,72 @@ def shutdown(self):
def _get_executor_cls(cls, vllm_config: VllmConfig):
return GPUExecutor

async def add_request(
self,
request_id: str,
prompt: PromptType,
params: Union[SamplingParams, PoolingParams],
arrival_time: Optional[float] = None,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]:
"""Add request_id to the EngineCore and return a Generator."""

if self.detokenizer.is_request_active(request_id):
raise KeyError(f"Request {request_id} already exists.")

# 1) Create a new request in the RequestTracker.
stream = self._add_request_to_streams(request_id,
verbose=self.log_requests)

# 2) Convert input --> DetokenizerRequest / EngineCoreRequest.
detokenizer_req, engine_core_req = self.processor.process_inputs(
request_id, prompt, params, arrival_time, lora_request,
trace_headers, prompt_adapter_request, priority)

# 3) Add the request to Detokenizer (this process).
self.detokenizer.add_request(detokenizer_req)

# 4) Add the EngineCoreRequest to EngineCore (separate process).
await self.engine_core.add_request_async(engine_core_req)

# 5) Return the generator.
return stream.generator()


async def generate(
self,
prompt: PromptType,
sampling_params: SamplingParams,
request_id: str,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[RequestOutput, None]:

# We start the output_handler on the first call to generate() so that
# we can call __init__ before the event loop starts, which enables us
# to handle startup failure gracefully in the OpenAI server.
if not self.is_output_handler_running:
self.output_handler = asyncio.create_task(
self._run_output_handler())
self.is_output_handler_running = True

async for output in await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
):
yield output


async def _abort_requests(
self,
request_ids: Union[str, List[str]],
Expand Down Expand Up @@ -154,7 +219,7 @@ def _add_request_to_streams(
request_id: str,
verbose: bool = False,
) -> AsyncStream:
"""Add a request to the request request streams."""
"""Add a request to the request streams."""

if request_id in self.request_streams:
raise ValueError(f"Request id {request_id} already running.")
Expand Down Expand Up @@ -183,74 +248,6 @@ def _process_request_outputs(self, request_outputs: List[RequestOutput]):
self.request_streams[request_id].finish()
self.request_streams.pop(request_id)

async def add_request(
self,
request_id: str,
prompt: PromptType,
params: Union[SamplingParams, PoolingParams],
arrival_time: Optional[float] = None,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[Union[RequestOutput, EmbeddingRequestOutput], None]:
"""Add request_id to the EngineCore and return a Generator."""

if self.detokenizer.is_request_active(request_id):
raise KeyError(f"Request {request_id} already exists.")

# 1) Create a new request in the RequestTracker.
stream = self._add_request_to_streams(request_id, verbose=True)

# 2) Convert input --> DetokenizerRequest / EngineCoreRequest.
detokenizer_req, engine_core_req = self.processor.process_inputs(
request_id, prompt, params, arrival_time, lora_request,
trace_headers, prompt_adapter_request, priority)

# 3) Add the request to Detokenizer (this process).
self.detokenizer.add_request(detokenizer_req)

# 4) Add the EngineCoreRequest to EngineCore (separate process).
await self.engine_core.add_request_async(engine_core_req)

# 5) Return the generator.
return stream.generator()

# TODO: we should support multiple prompts in one call, as you
# can do with LLM.generate. So that for multi-prompt completion
# requests we don't need to send multiple messages to core proc,
# and so we don't need multiple streams which then get
# re-multiplexed in the API server anyhow.
async def generate(
self,
prompt: PromptType,
sampling_params: SamplingParams,
request_id: str,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[RequestOutput, None]:

# We start the output_handler on the first call to generate() so that
# we can call __init__ before the event loop starts, which enables us
# to handle startup failure gracefully in the OpenAI server.
if not self.is_output_handler_running:
self.output_handler = asyncio.create_task(
self._run_output_handler())
self.is_output_handler_running = True

async for output in await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
):
yield output

async def _run_output_handler(self):
"""Background loop: pulls from EngineCore and pushes to AsyncStreams."""

Expand Down

0 comments on commit 11498be

Please sign in to comment.