diff --git a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py index 3bee67b323359..f91ede06e76ae 100644 --- a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py +++ b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py @@ -388,6 +388,34 @@ fill=0, stack=True, ), + Panel( + id=38, + title="In-Task Backpressure Time", + description="Time spent within a running task in backpressure.", + unit="seconds", + targets=[ + Target( + expr="sum(ray_data_in_task_backpressure_time{{{global_filters}}}) by (dataset, operator)", + legend="In-Task Backpressure Time: {{dataset}}, {{operator}}", + ) + ], + fill=0, + stack=True, + ), + Panel( + id=39, + title="Task CPU Time", + description="Time spent using CPU within a running task.", + unit="seconds", + targets=[ + Target( + expr="sum(ray_data_task_cpu_time{{{global_filters}}}) by (dataset, operator)", + legend="Task CPU Time: {{dataset}}, {{operator}}", + ) + ], + fill=0, + stack=True, + ), # Ray Data Metrics (Object Store Memory) Panel( id=13, @@ -548,4 +576,4 @@ panels=DATA_GRAFANA_PANELS, standard_global_filters=['dataset=~"$DatasetID"', 'SessionName=~"$SessionName"'], base_json_file_name="data_grafana_dashboard_base.json", -) +) \ No newline at end of file diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 5ea4f4cdc71e8..26fba15685130 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -208,6 +208,22 @@ class OpRuntimeMetrics: "metrics_group": "tasks", }, ) + task_cpu_time: float = field( + default=0, + metadata={ + "description": "Time actively using CPU within tasks", + "metrics_group": "tasks", + "map_only": True, + }, + ) + in_task_backpressure_time: float = field( + default=0, + metadata={ + "description": "Time spent waiting idly on generator outputs to be yielded within tasks", + "metrics_group": "tasks", + "map_only": True, + }, + ) # === Object store memory metrics === obj_store_mem_internal_inqueue_blocks: int = field( @@ -273,7 +289,6 @@ class OpRuntimeMetrics: "metrics_group": "object_store_memory", }, ) - # === Miscellaneous metrics === # Use "metrics_group: "misc" in the metadata for new metrics in this section. @@ -485,6 +500,9 @@ def on_task_output_generated(self, task_index: int, output: RefBundle): for block_ref, meta in output.blocks: assert meta.exec_stats and meta.exec_stats.wall_time_s self.block_generation_time += meta.exec_stats.wall_time_s + if meta.exec_stats.backpressure_time: + self.in_task_backpressure_time += meta.exec_stats.backpressure_time + self.task_cpu_time += meta.exec_stats.cpu_time_s assert meta.num_rows is not None self.rows_task_outputs_generated += meta.num_rows trace_allocation(block_ref, "operator_output") diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 069e5ff306c6e..6f9992faf5304 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -6,6 +6,7 @@ from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union import ray +import time from ray import ObjectRef from ray._raylet import ObjectRefGenerator from ray.data._internal.compute import ( @@ -422,9 +423,11 @@ def _map_task( m_out.exec_stats = stats.build() m_out.exec_stats.udf_time_s = map_transformer.udf_time() m_out.exec_stats.task_idx = ctx.task_idx + finish_time = time.perf_counter() yield b_out yield m_out stats = BlockExecStats.builder() + stats.prev_map_task_finish_time = finish_time class _BlockRefBundler: diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index caa81bd6d5132..0debf89338489 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -233,7 +233,7 @@ def __init__(self, max_stats=1000): # Object store memory-related metrics self.execution_metrics_obj_store_memory = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="obj_store_memory", + metrics_group="object_store_memory", tag_keys=op_tags_keys, ) ) @@ -280,7 +280,7 @@ def __init__(self, max_stats=1000): ) self.iter_next_batch_s = Gauge( "data_iter_next_batch_seconds", - description="Seconds spent getting next batch", # Need a better description for this? + description="Seconds spent getting next batch", tag_keys=iter_tag_keys, ) self.iter_format_batch_s = Gauge( @@ -309,7 +309,6 @@ def __init__(self, max_stats=1000): description="Number of blocks in remote nodes", tag_keys=iter_tag_keys, ) - self.iter_blocks_unknown = Gauge( "data_iter_blocks_unknown", description="Number of blocks with unknown location", @@ -322,13 +321,6 @@ def __init__(self, max_stats=1000): tag_keys=iter_tag_keys, ) - - self.streaming_exec_schedule_s = Gauge( - "data_streaming_exec_schedule_seconds", - description="Seconds spent streaming executor scheduling", - tag_keys=iter_tag_keys, - ) - def _create_prometheus_metrics_for_execution_metrics( self, metrics_group: str, tag_keys: Tuple[str, ...] ) -> Dict[str, Gauge]: diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 308b2e0624040..39c9305809719 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -135,6 +135,7 @@ def __init__(self): self.wall_time_s: Optional[float] = None self.udf_time_s: Optional[float] = 0 self.cpu_time_s: Optional[float] = None + self.backpressure_time: Optional[float] = None self.node_id = ray.runtime_context.get_runtime_context().get_node_id() # Max memory usage. May be an overestimate since we do not # differentiate from previous tasks on the same worker. @@ -166,6 +167,7 @@ class _BlockExecStatsBuilder: def __init__(self): self.start_time = time.perf_counter() self.start_cpu = time.process_time() + self.prev_map_task_finish_time = None def build(self) -> "BlockExecStats": self.end_time = time.perf_counter() @@ -176,6 +178,8 @@ def build(self) -> "BlockExecStats": stats.end_time_s = self.end_time stats.wall_time_s = self.end_time - self.start_time stats.cpu_time_s = self.end_cpu - self.start_cpu + if self.prev_map_task_finish_time: + stats.backpressure_time = self.start_time - self.prev_map_task_finish_time if resource is None: # NOTE(swang): resource package is not supported on Windows. This # is only the memory usage at the end of the task, not the peak