Skip to content

Commit

Permalink
Add support for reporting metrics in completion response headers in o…
Browse files Browse the repository at this point in the history
…rca format

Signed-off-by: Kunjan Patel <[email protected]>
  • Loading branch information
coolkp committed Nov 21, 2024
1 parent d5b2844 commit 430d948
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 15 deletions.
27 changes: 22 additions & 5 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ def __init__(
self.observability_config = observability_config or ObservabilityConfig(
)
self.log_stats = log_stats
self.snapshot_stats: Optional[Stats] = None
self.use_cached_outputs = use_cached_outputs

if not self.model_config.skip_tokenizer_init:
Expand Down Expand Up @@ -1195,8 +1196,16 @@ def _process_model_outputs(self,
seq_group,
self.seq_id_to_seq_group,
use_cache=self.use_cached_outputs)
if request_output:
ctx.request_outputs.append(request_output)

if (request_output and isinstance(request_output, RequestOutput)
and self.snapshot_stats):
request_output.metrics.gpu_kv_cache_utilisation = (
self.snapshot_stats.gpu_cache_usage_sys)
request_output.metrics.cpu_kv_cache_utilisation = (
self.snapshot_stats.cpu_cache_usage_sys)
request_output.metrics.running_lora_adapters = ",".join(
self.snapshot_stats.running_lora_adapters)
ctx.request_outputs.append(request_output)

# When we process a single request, we skip it for the next time,
# and invoke the request output callback (if there was final output)
Expand Down Expand Up @@ -1238,6 +1247,14 @@ def _process_model_outputs(self,
self.seq_id_to_seq_group,
use_cache=self.use_cached_outputs)
if request_output:
# if (request_output and isinstance(request_output, RequestOutput)

Check failure on line 1250 in vllm/engine/llm_engine.py

View workflow job for this annotation

GitHub Actions / ruff (3.12)

Ruff (E501)

vllm/engine/llm_engine.py:1250:81: E501 Line too long (82 > 80)
# and self.snapshot_stats):
# request_output.metrics.gpu_kv_cache_utilisation = (
# self.snapshot_stats.gpu_cache_usage_sys)
# request_output.metrics.cpu_kv_cache_utilisation = (
# self.snapshot_stats.cpu_cache_usage_sys)
# request_output.metrics.running_lora_adapters = ",".join(
# self.snapshot_stats.running_lora_adapters)
ctx.request_outputs.append(request_output)

# For multi-step with streaming, create outputs each iteration
Expand Down Expand Up @@ -1607,11 +1624,11 @@ def do_log_stats(self,
finished_before: Optional[List[int]] = None,
skip: Optional[List[int]] = None) -> None:
"""Forced log when no requests active."""
self.snapshot_stats = self._get_stats(scheduler_outputs, model_output,
finished_before, skip)
if self.log_stats:
stats = self._get_stats(scheduler_outputs, model_output,
finished_before, skip)
for logger in self.stat_loggers.values():
logger.log(stats)
logger.log(self.snapshot_stats)

def _get_stats(self,
scheduler_outputs: Optional[SchedulerOutputs],
Expand Down
10 changes: 7 additions & 3 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from vllm.entrypoints.logger import RequestLogger
from vllm.entrypoints.openai.cli_args import (make_arg_parser,
validate_parsed_serve_args)
from vllm.entrypoints.openai.orca_header import metrics_header
# yapf conflicts with isort for this block
# yapf: disable
from vllm.entrypoints.openai.protocol import (ChatCompletionRequest,
Expand Down Expand Up @@ -346,13 +347,13 @@ async def create_chat_completion(request: ChatCompletionRequest,
message="The model does not support Chat Completions API")

generator = await handler.create_chat_completion(request, raw_request)

if isinstance(generator, ErrorResponse):
return JSONResponse(content=generator.model_dump(),
status_code=generator.code)

elif isinstance(generator, ChatCompletionResponse):
return JSONResponse(content=generator.model_dump())
header = metrics_header(generator.metrics)
return JSONResponse(content=generator.model_dump(), headers=header)

return StreamingResponse(content=generator, media_type="text/event-stream")

Expand All @@ -369,7 +370,8 @@ async def create_completion(request: CompletionRequest, raw_request: Request):
return JSONResponse(content=generator.model_dump(),
status_code=generator.code)
elif isinstance(generator, CompletionResponse):
return JSONResponse(content=generator.model_dump())
header = metrics_header(generator.metrics)
return JSONResponse(content=generator.model_dump(), headers=header)

return StreamingResponse(content=generator, media_type="text/event-stream")

Expand Down Expand Up @@ -539,6 +541,7 @@ def init_app_state(
base_model_paths,
args.response_role,
lora_modules=args.lora_modules,
orca_format=args.orca_format,
prompt_adapters=args.prompt_adapters,
request_logger=request_logger,
chat_template=resolved_chat_template,
Expand All @@ -553,6 +556,7 @@ def init_app_state(
model_config,
base_model_paths,
lora_modules=args.lora_modules,
orca_format=args.orca_format,
prompt_adapters=args.prompt_adapters,
request_logger=request_logger,
return_tokens_as_token_ids=args.return_tokens_as_token_ids,
Expand Down
8 changes: 8 additions & 0 deletions vllm/entrypoints/openai/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ def make_arg_parser(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
help="The file path to the chat template, "
"or the template in single-line form "
"for the specified model")
parser.add_argument(
"--orca-format",
type=str,
default="",
choices=["BIN", "TEXT", "JSON"],
help='Enable ORCA metrics reporting in response header'
'select one of valid formats from [BIN, TEXT, JSON]',
)
parser.add_argument(
'--chat-template-content-format',
type=str,
Expand Down
82 changes: 82 additions & 0 deletions vllm/entrypoints/openai/orca_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
from typing import List, Mapping, Optional, Tuple

from vllm.entrypoints.openai.protocol import EngineMetrics
from vllm.logger import init_logger

logger = init_logger(__name__)


def create_orca_header(format: str,
named_metrics: List[Tuple[str, float]],
metadata_fields=None) -> Optional[Mapping[str, str]]:
"""
Creates ORCA headers named 'endpoint-load-metrics' in the specified format
and adds custom metrics to named_metrics.
ORCA headers format description: https://docs.google.com/document/d/1C1ybMmDKJIVlrbOLbywhu9iRYo4rilR-cT50OTtOFTs/edit?tab=t.0
ORCA proto https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto
Parameters:
- format (str): The format of the header ('BIN', 'TEXT', 'JSON').
- named_metrics (List[Tuple[str, float]]): List of tuples with metric names
and their corresponding double values.
- metadata_fields (list): List of additional metadata fields
(currently unsupported).
Returns:
- Optional[Mapping[str,str]]: A dictionary with header key as
'endpoint-load-metrics' and values as the ORCA header strings with
format prefix and data in with named_metrics in.
"""

if metadata_fields:
logger.warning("Warning: `metadata_fields` are not supported in the"
"ORCA response header yet.")

if format not in ["BIN", "TEXT", "JSON"]:
logger.warning(
"Warning: `%s` format is not supported in the ORCA response header",
format,
)
return None

header = {}
orca_report = {
"named_metrics": {
metric_name: value
for metric_name, value in named_metrics
if isinstance(metric_name, str) and isinstance(value, float)
}
}
# output example:
# endpoint-load-metrics: BIN
# CZqZmZmZmbk/MQAAAAAAAABAQg4KA2ZvbxGamZmZmZm5P0IOCgNiYXIRmpmZmZmZyT8=
if format == "BIN":
logger.warning("orca header format BIN not yet supported")

# output example:
# endpoint-load-metrics: TEXT named_metrics.kv_cache_utilization=0.4
elif format == "TEXT":
native_http_header = ", ".join([
f"named_metrics.{metric_name}={value}"
for metric_name, value in named_metrics
if isinstance(metric_name, str) and isinstance(value, float)
])
header["endpoint-load-metrics"] = f"TEXT {native_http_header}"

# output example:
# endpoint-load-metrics: JSON “named_metrics”: {“custom-metric-util”: 0.4}
elif format == "JSON":
header["endpoint-load-metrics"] = f"JSON {json.dumps(orca_report)}"

return header


def metrics_header(m: EngineMetrics) -> Optional[Mapping[str, str]]:
if not m.format:
return None
named_metrics: List[Tuple[str, float]] = []
for metric, val in vars(m).items():
if isinstance(val, float) and metric != "format":
named_metrics.append((str(metric), float(val)))
return create_orca_header(str(m.format), named_metrics)
8 changes: 8 additions & 0 deletions vllm/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class UsageInfo(OpenAIBaseModel):
prompt_tokens_details: Optional[PromptTokenUsageInfo] = None


class EngineMetrics(OpenAIBaseModel):
kv_cache_utilization: Optional[float] = 0.
active_models: Optional[str] = ""
format: Optional[str] = ""


class RequestResponseMetadata(BaseModel):
request_id: str
final_usage_info: Optional[UsageInfo] = None
Expand Down Expand Up @@ -838,6 +844,7 @@ class CompletionResponse(OpenAIBaseModel):
model: str
choices: List[CompletionResponseChoice]
usage: UsageInfo
metrics: EngineMetrics


class CompletionResponseStreamChoice(OpenAIBaseModel):
Expand Down Expand Up @@ -951,6 +958,7 @@ class ChatCompletionResponse(OpenAIBaseModel):
model: str
choices: List[ChatCompletionResponseChoice]
usage: UsageInfo
metrics: EngineMetrics
prompt_logprobs: Optional[List[Optional[Dict[int, Logprob]]]] = None


Expand Down
26 changes: 23 additions & 3 deletions vllm/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
ChatCompletionRequest, ChatCompletionResponse,
ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice,
ChatCompletionStreamResponse, ChatMessage, DeltaFunctionCall, DeltaMessage,
DeltaToolCall, ErrorResponse, FunctionCall, PromptTokenUsageInfo,
RequestResponseMetadata, ToolCall, UsageInfo)
DeltaToolCall, EngineMetrics, ErrorResponse, FunctionCall,
PromptTokenUsageInfo, RequestResponseMetadata, ToolCall, UsageInfo)
from vllm.entrypoints.openai.serving_engine import (BaseModelPath,
LoRAModulePath,
OpenAIServing,
Expand All @@ -29,7 +29,7 @@
from vllm.logger import init_logger
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import BeamSearchParams, SamplingParams
from vllm.sequence import Logprob
from vllm.sequence import Logprob, RequestMetrics
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
from vllm.transformers_utils.tokenizers import maybe_serialize_tool_calls
from vllm.utils import iterate_with_cancellation
Expand All @@ -50,6 +50,7 @@ def __init__(
prompt_adapters: Optional[List[PromptAdapterPath]],
request_logger: Optional[RequestLogger],
chat_template: Optional[str],
orca_format: Optional[str] = "",
chat_template_content_format: ChatTemplateContentFormatOption,
return_tokens_as_token_ids: bool = False,
enable_auto_tools: bool = False,
Expand All @@ -60,6 +61,7 @@ def __init__(
model_config=model_config,
base_model_paths=base_model_paths,
lora_modules=lora_modules,
orca_format=orca_format,
prompt_adapters=prompt_adapters,
request_logger=request_logger,
return_tokens_as_token_ids=return_tokens_as_token_ids)
Expand Down Expand Up @@ -612,6 +614,8 @@ async def chat_completion_full_generator(
assert final_res is not None

choices: List[ChatCompletionResponseChoice] = []
last_req_finished_time = 0.
last_req_metrics: Optional[RequestMetrics] = None

role = self.get_chat_request_role(request)
for output in final_res.outputs:
Expand Down Expand Up @@ -706,6 +710,15 @@ async def chat_completion_full_generator(
stop_reason=output.stop_reason)
choices.append(choice_data)

# get latest output's metrics
res_metrics = final_res.metrics
if res_metrics:
if res_metrics.last_token_time > last_req_finished_time:
last_req_metrics = res_metrics
if (res_metrics.finished_time and
res_metrics.finished_time > last_req_finished_time):
last_req_metrics = res_metrics

if request.echo or request.continue_final_message:
last_msg_content: Union[str, List[Dict[str, str]]] = ""
if conversation and "content" in conversation[-1] and conversation[
Expand Down Expand Up @@ -736,13 +749,20 @@ async def chat_completion_full_generator(

request_metadata.final_usage_info = usage

metrics = EngineMetrics(
kv_cache_utilization=(last_req_metrics.gpu_kv_cache_utilisation
if last_req_metrics is not None else 0.0),
active_models=(last_req_metrics.running_lora_adapters
if last_req_metrics is not None else ""),
format=self.orca_format)
response = ChatCompletionResponse(
id=request_id,
created=created_time,
model=model_name,
choices=choices,
usage=usage,
prompt_logprobs=final_res.prompt_logprobs,
metrics=metrics,
)

return response
Expand Down
Loading

0 comments on commit 430d948

Please sign in to comment.