Skip to content

Commit

Permalink
Revert "Revert "Add more observability to Ray Data operator metrics (#14
Browse files Browse the repository at this point in the history
)" (#22)"

This reverts commit 3895456.
  • Loading branch information
votrou authored Oct 8, 2024
1 parent 3895456 commit 7f2bd61
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 12 deletions.
30 changes: 29 additions & 1 deletion dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,34 @@
fill=0,
stack=True,
),
Panel(
id=38,
title="In-Task Backpressure Time",
description="Time spent within a running task in backpressure.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_in_task_backpressure_time{{{global_filters}}}) by (dataset, operator)",
legend="In-Task Backpressure Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
Panel(
id=39,
title="Task CPU Time",
description="Time spent using CPU within a running task.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_task_cpu_time{{{global_filters}}}) by (dataset, operator)",
legend="Task CPU Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
# Ray Data Metrics (Object Store Memory)
Panel(
id=13,
Expand Down Expand Up @@ -548,4 +576,4 @@
panels=DATA_GRAFANA_PANELS,
standard_global_filters=['dataset=~"$DatasetID"', 'SessionName=~"$SessionName"'],
base_json_file_name="data_grafana_dashboard_base.json",
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,22 @@ class OpRuntimeMetrics:
"metrics_group": "tasks",
},
)
task_cpu_time: float = field(
default=0,
metadata={
"description": "Time actively using CPU within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)
in_task_backpressure_time: float = field(
default=0,
metadata={
"description": "Time spent waiting idly on generator outputs to be yielded within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)

# === Object store memory metrics ===
obj_store_mem_internal_inqueue_blocks: int = field(
Expand Down Expand Up @@ -273,7 +289,6 @@ class OpRuntimeMetrics:
"metrics_group": "object_store_memory",
},
)

# === Miscellaneous metrics ===
# Use "metrics_group: "misc" in the metadata for new metrics in this section.

Expand Down Expand Up @@ -485,6 +500,9 @@ def on_task_output_generated(self, task_index: int, output: RefBundle):
for block_ref, meta in output.blocks:
assert meta.exec_stats and meta.exec_stats.wall_time_s
self.block_generation_time += meta.exec_stats.wall_time_s
if meta.exec_stats.backpressure_time:
self.in_task_backpressure_time += meta.exec_stats.backpressure_time
self.task_cpu_time += meta.exec_stats.cpu_time_s
assert meta.num_rows is not None
self.rows_task_outputs_generated += meta.num_rows
trace_allocation(block_ref, "operator_output")
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union

import ray
import time
from ray import ObjectRef
from ray._raylet import ObjectRefGenerator
from ray.data._internal.compute import (
Expand Down Expand Up @@ -422,9 +423,11 @@ def _map_task(
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
finish_time = time.perf_counter()
yield b_out
yield m_out
stats = BlockExecStats.builder()
stats.prev_map_task_finish_time = finish_time


class _BlockRefBundler:
Expand Down
12 changes: 2 additions & 10 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def __init__(self, max_stats=1000):
# Object store memory-related metrics
self.execution_metrics_obj_store_memory = (
self._create_prometheus_metrics_for_execution_metrics(
metrics_group="obj_store_memory",
metrics_group="object_store_memory",
tag_keys=op_tags_keys,
)
)
Expand Down Expand Up @@ -280,7 +280,7 @@ def __init__(self, max_stats=1000):
)
self.iter_next_batch_s = Gauge(
"data_iter_next_batch_seconds",
description="Seconds spent getting next batch", # Need a better description for this?
description="Seconds spent getting next batch",
tag_keys=iter_tag_keys,
)
self.iter_format_batch_s = Gauge(
Expand Down Expand Up @@ -309,7 +309,6 @@ def __init__(self, max_stats=1000):
description="Number of blocks in remote nodes",
tag_keys=iter_tag_keys,
)

self.iter_blocks_unknown = Gauge(
"data_iter_blocks_unknown",
description="Number of blocks with unknown location",
Expand All @@ -322,13 +321,6 @@ def __init__(self, max_stats=1000):
tag_keys=iter_tag_keys,
)


self.streaming_exec_schedule_s = Gauge(
"data_streaming_exec_schedule_seconds",
description="Seconds spent streaming executor scheduling",
tag_keys=iter_tag_keys,
)

def _create_prometheus_metrics_for_execution_metrics(
self, metrics_group: str, tag_keys: Tuple[str, ...]
) -> Dict[str, Gauge]:
Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(self):
self.wall_time_s: Optional[float] = None
self.udf_time_s: Optional[float] = 0
self.cpu_time_s: Optional[float] = None
self.backpressure_time: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
# Max memory usage. May be an overestimate since we do not
# differentiate from previous tasks on the same worker.
Expand Down Expand Up @@ -166,6 +167,7 @@ class _BlockExecStatsBuilder:
def __init__(self):
self.start_time = time.perf_counter()
self.start_cpu = time.process_time()
self.prev_map_task_finish_time = None

def build(self) -> "BlockExecStats":
self.end_time = time.perf_counter()
Expand All @@ -176,6 +178,8 @@ def build(self) -> "BlockExecStats":
stats.end_time_s = self.end_time
stats.wall_time_s = self.end_time - self.start_time
stats.cpu_time_s = self.end_cpu - self.start_cpu
if self.prev_map_task_finish_time:
stats.backpressure_time = self.start_time - self.prev_map_task_finish_time
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
Expand Down

0 comments on commit 7f2bd61

Please sign in to comment.