From 5ed212bc2af494a849279df1841ae3c574617c2b Mon Sep 17 00:00:00 2001 From: youxiduo Date: Fri, 25 Aug 2023 15:01:09 +0800 Subject: [PATCH] [CELEBORN-917][GLUTEN] Record read metric should be compatible with Gluten shuffle serde ### What changes were proposed in this pull request? When updating record read metric, we should consider if the input record is `ColumnarBatch`. So if the serde is the Gluten columnar batch, we should use `ColumnarBatch.numRows`. ### Why are the changes needed? Make the shuffle record read metric correct. ### Does this PR introduce _any_ user-facing change? yes, the metrics changed ### How was this patch tested? manually test before: image after: image Closes #1838 from ulysses-you/gluten. Lead-authored-by: youxiduo Co-authored-by: Xiduo You Signed-off-by: Cheng Pan --- .../celeborn/CelebornShuffleReader.scala | 14 ++++-- .../GlutenColumnarBatchSerdeHelper.scala | 44 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) create mode 100644 client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index 5ce89190738..36ee314489b 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -97,11 +97,17 @@ class CelebornShuffleReader[K, C]( }).flatMap( serializerInstance.deserializeStream(_).asKeyValueIterator) + val iterWithUpdatedRecordsRead = + if (GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName)) { + GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter, metrics) + } else { + recordIter.map { record => + metrics.incRecordsRead(1) + record + } + } val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( - recordIter.map { record => - metrics.incRecordsRead(1) - record - }, + iterWithUpdatedRecordsRead, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala new file mode 100644 index 00000000000..259bb954d21 --- /dev/null +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.celeborn + +import org.apache.spark.shuffle.ShuffleReadMetricsReporter +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A helper class to be compatible with Gluten Celeborn. + */ +object GlutenColumnarBatchSerdeHelper { + + def isGlutenSerde(serdeName: String): Boolean = { + // scalastyle:off + // see Gluten + // https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala + // scalastyle:on + "org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName) + } + + def withUpdatedRecordsRead( + input: Iterator[(Any, Any)], + metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = { + input.map { record => + metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows()) + record + } + } +}