Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary kudo debug metrics #12146

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.nvidia.spark.rapids.jni.kudo.{KudoSerializer, KudoTable, KudoTableHea

import org.apache.spark.TaskContext
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_SER_CALC_HEADER_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_COPY_HEADER_TIME, METRIC_SHUFFLE_SER_STREAM_TIME}
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.{METRIC_DATA_SIZE, METRIC_SHUFFLE_DESER_STREAM_TIME, METRIC_SHUFFLE_SER_COPY_BUFFER_TIME, METRIC_SHUFFLE_SER_STREAM_TIME}
import org.apache.spark.sql.types.{DataType, NullType}
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -350,8 +350,6 @@ private class KudoSerializerInstance(
) extends SerializerInstance {
private val dataSize = metrics(METRIC_DATA_SIZE)
private val serTime = metrics(METRIC_SHUFFLE_SER_STREAM_TIME)
private val serCalcHeaderTime = metrics(METRIC_SHUFFLE_SER_CALC_HEADER_TIME)
private val serCopyHeaderTime = metrics(METRIC_SHUFFLE_SER_COPY_HEADER_TIME)
private val serCopyBufferTime = metrics(METRIC_SHUFFLE_SER_COPY_BUFFER_TIME)
private val deserTime = metrics(METRIC_SHUFFLE_DESER_STREAM_TIME)

Expand Down Expand Up @@ -403,10 +401,8 @@ private class KudoSerializerInstance(
.writeToStreamWithMetrics(writeInput)

dataSize += writeMetric.getWrittenBytes
serCalcHeaderTime += writeMetric.getCalcHeaderTime
if (measureBufferCopyTime) {
// These metrics will not show up in the UI if it's not modified
serCopyHeaderTime += writeMetric.getCopyHeaderTime
serCopyBufferTime += writeMetric.getCopyBufferTime
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ object GpuMetric extends Logging {
val FILECACHE_DATA_RANGE_READ_TIME = "filecacheDataRangeReadTime"
val DELETION_VECTOR_SCATTER_TIME = "deletionVectorScatterTime"
val DELETION_VECTOR_SIZE = "deletionVectorSize"
val CONCAT_HEADER_TIME = "concatHeaderTime"
val CONCAT_BUFFER_TIME = "concatBufferTime"
val COPY_TO_HOST_TIME = "d2hMemCopyTime"

// Metric Descriptions.
Expand Down Expand Up @@ -148,8 +146,6 @@ object GpuMetric extends Logging {
val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time"
val DESCRIPTION_DELETION_VECTOR_SCATTER_TIME = "deletion vector scatter time"
val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size"
val DESCRIPTION_CONCAT_HEADER_TIME = "concat header time"
val DESCRIPTION_CONCAT_BUFFER_TIME = "concat buffer time"
val DESCRIPTION_COPY_TO_HOST_TIME = "deviceToHost memory copy time"

def unwrap(input: GpuMetric): SQLMetric = input match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@ import scala.reflect.ClassTag
import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric.{CONCAT_BUFFER_TIME, CONCAT_HEADER_TIME}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.kudo.{KudoHostMergeResult, KudoSerializer, KudoTable}
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -56,8 +55,6 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long)
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME),
CONCAT_HEADER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_BUFFER_TIME),
)

override protected val outputBatchesLevel = MODERATE_LEVEL
Expand Down Expand Up @@ -232,10 +229,8 @@ case class RowCountOnlyMergeResult(rowCount: Int) extends CoalescedHostResult {
override def close(): Unit = {}
}

class KudoTableOperator(
kudo: Option[KudoSerializer] ,
kudoMergeHeaderTime: GpuMetric,
kudoMergeBufferTime: GpuMetric) extends SerializedTableOperator[KudoSerializedTableColumn] {
class KudoTableOperator(kudo: Option[KudoSerializer])
extends SerializedTableOperator[KudoSerializedTableColumn] {
require(kudo != null, "kudo serializer should not be null")
private val kudoTables = new util.ArrayList[KudoTable]()

Expand All @@ -259,8 +254,6 @@ class KudoTableOperator(
}

val result = kudo.get.mergeOnHost(kudoTables)
kudoMergeHeaderTime += result.getRight.getCalcHeaderTime
kudoMergeBufferTime += result.getRight.getMergeIntoHostBufferTime

KudoHostMergeResultWrapper(result.getLeft)
}
Expand Down Expand Up @@ -390,7 +383,7 @@ class KudoHostShuffleCoalesceIterator(
} else {
None
}
new KudoTableOperator(kudoSer, metricsMap(CONCAT_HEADER_TIME), metricsMap(CONCAT_BUFFER_TIME))
new KudoTableOperator(kudoSer)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -144,8 +144,6 @@ case class GpuShuffledHashJoinExec(
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME),
CONCAT_HEADER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_BUFFER_TIME),
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ object GpuShuffledSizedHashJoinExec {
Map(
OP_TIME -> metrics(OP_TIME),
CONCAT_TIME -> metrics(CONCAT_TIME),
CONCAT_HEADER_TIME -> metrics(CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> metrics(CONCAT_BUFFER_TIME)
).withDefaultValue(NoopMetric)
}

Expand Down Expand Up @@ -389,8 +387,6 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex
override lazy val additionalMetrics: Map[String, GpuMetric] = Map(
OP_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_OP_TIME),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME),
CONCAT_HEADER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_BUFFER_TIME),
BUILD_DATA_SIZE -> createSizeMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_DATA_SIZE),
BUILD_TIME -> createNanoTimingMetric(ESSENTIAL_LEVEL, DESCRIPTION_BUILD_TIME),
STREAM_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_STREAM_TIME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ object GpuShuffleExchangeExecBase {
val METRIC_DESC_SHUFFLE_WRITE_IO_TIME = "RAPIDS shuffle shuffle write io time"
val METRIC_SHUFFLE_READ_TIME = "rapidsShuffleReadTime"
val METRIC_DESC_SHUFFLE_READ_TIME = "RAPIDS shuffle shuffle read time"
val METRIC_SHUFFLE_SER_CALC_HEADER_TIME = "rapidsShuffleSerializationCalcHeaderTime"
val METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME = "RAPIDS shuffle serialization calc header time"
val METRIC_SHUFFLE_SER_COPY_HEADER_TIME = "rapidsShuffleSerializationCopyHeaderTime"
val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time"
val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time"

Expand All @@ -321,10 +317,6 @@ object GpuShuffleExchangeExecBase {
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_WRITE_IO_TIME),
METRIC_SHUFFLE_READ_TIME ->
gpu.createNanoTimingMetric(ESSENTIAL_LEVEL, METRIC_DESC_SHUFFLE_READ_TIME),
METRIC_SHUFFLE_SER_CALC_HEADER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_CALC_HEADER_TIME),
METRIC_SHUFFLE_SER_COPY_HEADER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME),
METRIC_SHUFFLE_SER_COPY_BUFFER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,8 +94,6 @@ case class GpuBroadcastHashJoinExec(
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME),
CONCAT_HEADER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_BUFFER_TIME)
)

override def requiredChildDistribution: Seq[Distribution] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -150,8 +150,6 @@ case class GpuBroadcastNestedLoopJoinExec(
NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES),
CONCAT_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_TIME),
CONCAT_HEADER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_CONCAT_BUFFER_TIME)
)

def isExecutorBroadcast(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,8 +74,6 @@ object GpuExecutorBroadcastHelper {
// to ensure this always a single batch for the following step.
val shuffleMetrics = Map(
CONCAT_TIME -> metricsMap(CONCAT_TIME),
CONCAT_HEADER_TIME -> metricsMap(CONCAT_HEADER_TIME),
CONCAT_BUFFER_TIME -> metricsMap(CONCAT_BUFFER_TIME),
OP_TIME -> metricsMap(OP_TIME),
).withDefaultValue(NoopMetric)

Expand Down