Skip to content

Commit

Permalink
Enhance cache service metrics (#13)
Browse files Browse the repository at this point in the history
* change cache metrics to maintain fixed-sized queue

change cache metrics to maintain fixed-sized queue

Adding metrics to llm engine stats and exposed through prometheus gauge

Adding metrics to llm engine stats and exposed through prometheus gauge

clear metrics counters upon update

* fix rebase error

* addressing comments

* addressing comments

* add metrics collection flag

* update with metrics flags

* addressing comments

addressing comments

* addressing comments

* addressing comments: avoid copying & moving counters out of metrics check

* add async ops metrics

* add async ops metrics

---------

Co-authored-by: Le Xu <[email protected]>
  • Loading branch information
happyandslow and Le Xu authored Dec 23, 2024
1 parent 279175d commit 8d34fa4
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 105 deletions.
56 changes: 54 additions & 2 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(
)
self.log_stats = log_stats
self.step_return_finished_only = step_return_finished_only
self.cache_service_metrics = CacheServiceMetrics
self.cache_service_metrics = CacheServiceMetrics()

if not self.model_config.skip_tokenizer_init:
self.tokenizer = self._init_tokenizer()
Expand Down Expand Up @@ -1822,7 +1822,14 @@ def _get_stats(self,
best_of_requests: List[int] = []
n_requests: List[int] = []
finished_reason_requests: List[str] = []


# Cache Service Metrics
cache_service_tokens_hit_rate: float
cache_service_blocks_hit_rate: float
cache_service_time_async_update_queue: List[int] = []
cache_service_time_async_update_exec: List[int] = []
cache_service_counter_async_update_updated: List[int] = []

# NOTE: This loop assumes prefill seq_groups are before
# decode seq_groups in scheduled_seq_groups.
if scheduler_outputs is not None:
Expand Down Expand Up @@ -1915,6 +1922,32 @@ def _get_stats(self,
spec_decode_metrics = model_output[0].spec_decode_worker_metrics
else:
spec_decode_metrics = None

if self.cache_service_metrics is not None:
cache_service_hit_tokens = self.cache_service_metrics.hit_tokens
cache_service_total_tokens = self.cache_service_metrics.total_tokens
cache_service_hit_blocks = self.cache_service_metrics.hit_blocks
cache_service_total_blocks = self.cache_service_metrics.total_blocks
cache_service_tokens_hit_rate = self.cache_service_metrics.get_tokens_hit_rate()
cache_service_blocks_hit_rate = self.cache_service_metrics.get_blocks_hit_rate()
cache_service_err_query = self.cache_service_metrics.err_query
cache_service_err_async_update_task_queue_full = self.cache_service_metrics.err_async_update_task_queue_full
cache_service_err_update = self.cache_service_metrics.err_update

cache_service_time_query = self.cache_service_metrics.time_query
cache_service_time_load = self.cache_service_metrics.time_load
cache_service_time_reshape = self.cache_service_metrics.time_reshape
cache_service_time_unload = self.cache_service_metrics.time_unload
cache_service_time_update = self.cache_service_metrics.time_update
cache_service_time_async_update_queue, cache_service_time_async_update_exec, cache_service_counter_async_update_updated = self.cache_service_metrics.get_async_metrics()

self.cache_service_metrics.time_query = []
self.cache_service_metrics.time_load = []
self.cache_service_metrics.time_reshape = []
self.cache_service_metrics.time_unload = []
self.cache_service_metrics.time_update = []
self.cache_service_metrics.reset_async_metrics()


return Stats(
now=now,
Expand Down Expand Up @@ -1947,6 +1980,25 @@ def _get_stats(self,
best_of_requests=best_of_requests,
n_requests=n_requests,
finished_reason_requests=finished_reason_requests,

# Cache Service
cache_service_hit_tokens = cache_service_hit_tokens,
cache_service_total_tokens = cache_service_total_tokens,
cache_service_hit_blocks = cache_service_hit_blocks,
cache_service_total_blocks = cache_service_total_blocks,
cache_service_tokens_hit_rate = cache_service_tokens_hit_rate,
cache_service_blocks_hit_rate = cache_service_blocks_hit_rate,
cache_service_err_query = cache_service_err_query,
cache_service_err_async_update_task_queue_full = cache_service_err_async_update_task_queue_full,
cache_service_err_update = cache_service_err_update,
cache_service_time_query = cache_service_time_query,
cache_service_time_load = cache_service_time_load,
cache_service_time_reshape = cache_service_time_reshape,
cache_service_time_unload = cache_service_time_unload,
cache_service_time_update = cache_service_time_update,
cache_service_time_async_update_queue = cache_service_time_async_update_queue,
cache_service_time_async_update_exec = cache_service_time_async_update_exec,
cache_service_counter_async_update_updated = cache_service_counter_async_update_updated,
)

def add_lora(self, lora_request: LoRARequest) -> bool:
Expand Down
172 changes: 166 additions & 6 deletions vllm/engine/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,132 @@ def __init__(self, labelnames: List[str], max_model_len: int):
labelnames=labelnames,
multiprocess_mode="sum",
)

self.gauge_cache_service_tokens_hit_rate = self._gauge_cls(
name="vllm:cache_service_tokens_hit_rate",
documentation="External cache service tokens hit rate.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_blocks_hit_rate = self._gauge_cls(
name="vllm:cache_service_blocks_hit_rate",
documentation="External cache service blocks hit rate.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_hit_tokens = self._gauge_cls(
name="vllm:cache_service_hit_tokens",
documentation="External cache service hit tokens.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_total_tokens = self._gauge_cls(
name="vllm:cache_service_total_tokens",
documentation="External cache service total tokens.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_hit_blocks = self._gauge_cls(
name="vllm:cache_service_hit_blocks",
documentation="External cache service hit blocks.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_total_blocks = self._gauge_cls(
name="vllm:cache_service_total_blocks",
documentation="External cache service total blocks.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_query = self._gauge_cls(
name="vllm:cache_service_err_query",
documentation="External cache service query errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_async_update_task_queue_full = self._gauge_cls(
name="vllm:cache_service_err_async_update_task_queue_full",
documentation="External cache service async update task queue full errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_update = self._gauge_cls(
name="vllm:cache_service_err_update",
documentation="External cache service update errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.histogram_cache_service_time_query_seconds = self._histogram_cls(
name="vllm:cache_service_time_query_seconds",
documentation="Histogram of cache service time query in seconds.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_load_seconds = self._histogram_cls(
name="vllm:cache_service_time_load_seconds",
documentation="Histogram of cache service time load.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_reshape_seconds = self._histogram_cls(
name="vllm:cache_service_time_reshape_seconds",
documentation="Histogram of cache service time update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_unload_seconds = self._histogram_cls(
name="vllm:cache_service_time_unload_seconds",
documentation="Histogram of cache service time unload.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_update_seconds = self._histogram_cls(
name="vllm:cache_service_time_update_seconds",
documentation="Histogram of cache service time update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_async_update_queue_seconds = self._histogram_cls(
name="vllm:cache_service_time_async_update_queue_seconds",
documentation="Histogram of cache service async update time in queue.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_async_update_exec_seconds = self._histogram_cls(
name="vllm:cache_service_time_async_update_exec_seconds",
documentation="Histogram of cache service async update time in execution.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_counter_async_update_updated_seconds = self._histogram_cls(
name="vllm:cache_service_counter_async_update_updated_seconds",
documentation="Histogram of cache service async update time in update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])


# end-metrics-definitions
Expand Down Expand Up @@ -375,12 +501,10 @@ def log(self, stats: Stats) -> None:
self._format_spec_decode_metrics_str(
self.spec_decode_metrics))

if self.external_cache_service_metrics is not None:
logger.info(
"Cache service hit rate: by tokens: %.2f%%, by blocks: %.2f%%",
0 if self.external_cache_service_metrics.total_tokens == 0 else self.external_cache_service_metrics.hit_tokens/self.external_cache_service_metrics.total_tokens * 100,
0 if self.external_cache_service_metrics.total_blocks == 0 else self.external_cache_service_metrics.hit_blocks/self.external_cache_service_metrics.total_blocks * 100,
)
logger.info(
"Cache service hit rate: by tokens: %.2f%%, by blocks: %.2f%%",
stats.cache_service_tokens_hit_rate, stats.cache_service_blocks_hit_rate
)
# Reset tracked stats for next interval.
self.num_prompt_tokens = []
self.num_generation_tokens = []
Expand Down Expand Up @@ -482,6 +606,42 @@ def _log_prometheus(self, stats: Stats) -> None:
self._log_histogram(self.metrics.histogram_n_request, stats.n_requests)
self._log_histogram(self.metrics.histogram_best_of_request,
stats.best_of_requests)

# Cache Service
self._log_gauge(self.metrics.gauge_cache_service_hit_tokens,
stats.cache_service_hit_tokens)
self._log_gauge(self.metrics.gauge_cache_service_total_tokens,
stats.cache_service_total_tokens)
self._log_gauge(self.metrics.gauge_cache_service_hit_blocks,
stats.cache_service_hit_blocks)
self._log_gauge(self.metrics.gauge_cache_service_total_blocks,
stats.cache_service_total_blocks)
self._log_gauge(self.metrics.gauge_cache_service_tokens_hit_rate,
stats.cache_service_tokens_hit_rate)
self._log_gauge(self.metrics.gauge_cache_service_blocks_hit_rate,
stats.cache_service_blocks_hit_rate)
self._log_gauge(self.metrics.gauge_cache_service_err_query,
stats.cache_service_err_query)
self._log_gauge(self.metrics.gauge_cache_service_err_async_update_task_queue_full,
stats.cache_service_err_async_update_task_queue_full)
self._log_gauge(self.metrics.gauge_cache_service_err_update,
stats.cache_service_err_update)
self._log_histogram(self.metrics.histogram_cache_service_time_query_seconds,
stats.cache_service_time_query)
self._log_histogram(self.metrics.histogram_cache_service_time_load_seconds,
stats.cache_service_time_load)
self._log_histogram(self.metrics.histogram_cache_service_time_reshape_seconds,
stats.cache_service_time_reshape)
self._log_histogram(self.metrics.histogram_cache_service_time_unload_seconds,
stats.cache_service_time_unload)
self._log_histogram(self.metrics.histogram_cache_service_time_update_seconds,
stats.cache_service_time_update)
self._log_histogram(self.metrics.histogram_cache_service_time_async_update_queue_seconds,
stats.cache_service_time_async_update_queue)
self._log_histogram(self.metrics.histogram_cache_service_time_async_update_exec_seconds,
stats.cache_service_time_async_update_exec)
self._log_histogram(self.metrics.histogram_cache_service_counter_async_update_updated_seconds,
stats.cache_service_counter_async_update_updated)

def _log_prometheus_interval(self, prompt_throughput: float,
generation_throughput: float) -> None:
Expand Down
23 changes: 21 additions & 2 deletions vllm/engine/metrics_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,27 @@ class Stats:
n_requests: List[int]
finished_reason_requests: List[str]

spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None
# Cache Service
cache_service_hit_tokens: int
cache_service_total_tokens: int
cache_service_hit_blocks: int
cache_service_total_blocks: int
cache_service_tokens_hit_rate: float
cache_service_blocks_hit_rate: float
cache_service_err_query: int
cache_service_err_async_update_task_queue_full: int
cache_service_err_update: int
cache_service_time_query: List[float]
cache_service_time_load: List[float]
cache_service_time_reshape: List[float]
cache_service_time_unload: List[float]
cache_service_time_update: List[float]
cache_service_time_async_update_queue: List[float]
cache_service_time_async_update_exec: List[float]
cache_service_counter_async_update_updated: List[float]


spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None


class SupportsMetricsInfo(Protocol):
Expand All @@ -71,7 +91,6 @@ def __init__(self, local_interval: float) -> None:
self.num_generation_tokens: List[int] = []
self.last_local_log = time.time()
self.local_interval = local_interval
self.external_cache_service_metrics = CacheServiceMetrics
self.spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None

@abstractmethod
Expand Down
Loading

0 comments on commit 8d34fa4

Please sign in to comment.