diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index a88bd9f2cfb..d57f6430a0f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,10 @@ package com.nvidia.spark.rapids import java.util -import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} -import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader} +import scala.reflect.ClassTag + +import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -66,78 +68,183 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long) val metricsMap = allMetrics val targetSize = targetBatchByteSize val dataTypes = GpuColumnVector.extractTypes(schema) + val readOption = CoalesceReadOption(new RapidsConf(conf)) child.executeColumnar().mapPartitions { iter => - new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(iter, targetSize, metricsMap), - dataTypes, metricsMap) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator(iter, targetSize, dataTypes, + readOption, metricsMap) + } + } +} + +/** A case class to pack some options. Now it has only one, but may have more in the future */ +case class CoalesceReadOption private(kudoEnabled: Boolean) + +object CoalesceReadOption { + def apply(conf: RapidsConf): CoalesceReadOption = { + // TODO get the value from conf + CoalesceReadOption(false) + } +} + +object GpuShuffleCoalesceUtils { + /** + * Return an iterator that will pull in batches from the input iterator, + * concatenate them up to the "targetSize" and move the concatenated result + * to the GPU for each output batch. + * The input iterator is expected to contain only serialized host batches just + * returned from the Shuffle deserializer. Otherwise, it will blow up. + * + * @param iter the input iterator containing only serialized host batches + * @param targetSize the target batch size for coalescing + * @param dataTypes the schema of the input batches + * @param readOption the coalesce read option + * @param metricsMap metrics map + * @param prefetchFirstBatch whether prefetching the first bundle of serialized + * batches with the total size up to the "targetSize". The + * prefetched batches will be cached on host until the "next()" + * is called. This is for some optimization cases in join. + */ + def getGpuShuffleCoalesceIterator( + iter: Iterator[ColumnarBatch], + targetSize: Long, + dataTypes: Array[DataType], + readOption: CoalesceReadOption, + metricsMap: Map[String, GpuMetric], + prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = { + val hostIter = if (readOption.kudoEnabled) { + // TODO replace with the actual Kudo host iterator + throw new UnsupportedOperationException("Kudo is not supported yet") + } else { + new HostShuffleCoalesceIterator(iter, targetSize, metricsMap) + } + val maybeBufferedIter = if (prefetchFirstBatch) { + val bufferedIter = new CloseableBufferedIterator(hostIter) + withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => + // Force a coalesce of the first batch before we grab the GPU semaphore + bufferedIter.headOption + } + bufferedIter + } else { + hostIter + } + new GpuShuffleCoalesceIterator(maybeBufferedIter, dataTypes, metricsMap) + } + + /** Get the buffer size of a serialized batch just returned by the Shuffle deserializer */ + def getSerializedBufferSize(cb: ColumnarBatch): Long = { + assert(cb.numCols() == 1) + val hmb = cb.column(0) match { + // TODO add the Kudo case + case serCol: SerializedTableColumn => serCol.hostBuffer + case o => throw new IllegalStateException(s"unsupported type: ${o.getClass}") + } + if (hmb != null) hmb.getLength else 0L + } +} + +/** + * A trait representing the shuffle coalesced result by the Shuffle coalesce iterator. + */ +sealed trait CoalescedHostResult extends AutoCloseable { + /** Convert itself to a GPU batch */ + def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch + + /** Get the data size */ + def getDataSize: Long +} + +/** + * A trait defining some operations on the table T. + * This is used by HostCoalesceIteratorBase to separate the table operations from + * the shuffle read process. + */ +sealed trait SerializedTableOperator[T <: AutoCloseable] { + def getDataLen(table: T): Long + def getNumRows(table: T): Int + def concatOnHost(tables: Array[T]): CoalescedHostResult +} + +class JCudfCoalescedHostResult(hostConcatResult: HostConcatResult) extends CoalescedHostResult { + assert(hostConcatResult != null, "hostConcatResult should not be null") + + override def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch = + cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + + override def close(): Unit = hostConcatResult.close() + + override def getDataSize: Long = hostConcatResult.getTableHeader.getDataLen +} + +class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn] { + override def getDataLen(table: SerializedTableColumn): Long = table.header.getDataLen + override def getNumRows(table: SerializedTableColumn): Int = table.header.getNumRows + + override def concatOnHost(tables: Array[SerializedTableColumn]): CoalescedHostResult = { + assert(tables.nonEmpty, "no tables to be concatenated") + val numCols = tables.head.header.getNumColumns + val ret = if (numCols == 0) { + val totalRowsNum = tables.map(getNumRows).sum + cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(totalRowsNum) + } else { + val (headers, buffers) = tables.map(t => (t.header, t.hostBuffer)).unzip + JCudfSerialization.concatToHostBuffer(headers, buffers) } + new JCudfCoalescedHostResult(ret) } } /** * Iterator that coalesces columnar batches that are expected to only contain - * [[SerializedTableColumn]]. The serialized tables within are collected up + * serialized tables from shuffle. The serialized tables within are collected up * to the target batch size and then concatenated on the host before handing * them to the caller on `.next()` */ -class HostShuffleCoalesceIterator( +abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag]( iter: Iterator[ColumnarBatch], targetBatchByteSize: Long, metricsMap: Map[String, GpuMetric]) - extends Iterator[HostConcatResult] with AutoCloseable { + extends Iterator[CoalescedHostResult] with AutoCloseable { + private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME) private[this] val inputBatchesMetric = metricsMap(GpuMetric.NUM_INPUT_BATCHES) private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS) - private[this] val serializedTables = new util.ArrayDeque[SerializedTableColumn] + private[this] val serializedTables = new util.ArrayDeque[T] private[this] var numTablesInBatch: Int = 0 private[this] var numRowsInBatch: Int = 0 private[this] var batchByteSize: Long = 0L // Don't install the callback if in a unit test Option(TaskContext.get()).foreach { tc => - onTaskCompletion(tc) { - close() - } + onTaskCompletion(tc)(close()) } + protected def tableOperator: SerializedTableOperator[T] + override def close(): Unit = { serializedTables.forEach(_.close()) serializedTables.clear() } - def concatenateTablesInHost(): HostConcatResult = { + private def concatenateTablesInHost(): CoalescedHostResult = { val result = withResource(new MetricRange(concatTimeMetric)) { _ => - val firstHeader = serializedTables.peekFirst().header - if (firstHeader.getNumColumns == 0) { - (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) - cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch) - } else { - val headers = new Array[SerializedTableHeader](numTablesInBatch) - withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers => - headers.indices.foreach { i => - val serializedTable = serializedTables.removeFirst() - headers(i) = serializedTable.header - buffers(i) = serializedTable.hostBuffer - } - JCudfSerialization.concatToHostBuffer(headers, buffers) - } + withResource(new Array[T](numTablesInBatch)) { tables => + tables.indices.foreach(i => tables(i) = serializedTables.removeFirst()) + tableOperator.concatOnHost(tables) } } // update the stats for the next batch in progress numTablesInBatch = serializedTables.size - batchByteSize = 0 numRowsInBatch = 0 if (numTablesInBatch > 0) { require(numTablesInBatch == 1, "should only track at most one buffer that is not in a batch") - val header = serializedTables.peekFirst().header - batchByteSize = header.getDataLen - numRowsInBatch = header.getNumRows + val firstTable = serializedTables.peekFirst() + batchByteSize = tableOperator.getDataLen(firstTable) + numRowsInBatch = tableOperator.getNumRows(firstTable) } - result } @@ -150,14 +257,14 @@ class HostShuffleCoalesceIterator( // don't bother tracking empty tables if (batch.numRows > 0) { inputRowsMetric += batch.numRows() - val tableColumn = batch.column(0).asInstanceOf[SerializedTableColumn] - batchCanGrow = canAddToBatch(tableColumn.header) + val tableColumn = batch.column(0).asInstanceOf[T] + batchCanGrow = canAddToBatch(tableColumn) serializedTables.addLast(tableColumn) // always add the first table to the batch even if its beyond the target limits if (batchCanGrow || numTablesInBatch == 0) { numTablesInBatch += 1 - numRowsInBatch += tableColumn.header.getNumRows - batchByteSize += tableColumn.header.getDataLen + numRowsInBatch += tableOperator.getNumRows(tableColumn) + batchByteSize += tableOperator.getDataLen(tableColumn) } } else { batch.close() @@ -172,34 +279,39 @@ class HostShuffleCoalesceIterator( numTablesInBatch > 0 } - override def next(): HostConcatResult = { + override def next(): CoalescedHostResult = { if (!hasNext()) { throw new NoSuchElementException("No more host batches to concatenate") } concatenateTablesInHost() } - private def canAddToBatch(nextTable: SerializedTableHeader): Boolean = { - if (batchByteSize + nextTable.getDataLen > targetBatchByteSize) { + private def canAddToBatch(nextTable: T): Boolean = { + if (batchByteSize + tableOperator.getDataLen(nextTable) > targetBatchByteSize) { return false } - if (numRowsInBatch.toLong + nextTable.getNumRows > Integer.MAX_VALUE) { + if (numRowsInBatch.toLong + tableOperator.getNumRows(nextTable) > Integer.MAX_VALUE) { return false } true } } +class HostShuffleCoalesceIterator( + iter: Iterator[ColumnarBatch], + targetBatchSize: Long, + metricsMap: Map[String, GpuMetric]) + extends HostCoalesceIteratorBase[SerializedTableColumn](iter, targetBatchSize, metricsMap) { + override protected def tableOperator = new JCudfTableOperator +} + /** - * Iterator that coalesces columnar batches that are expected to only contain - * [[SerializedTableColumn]]. The serialized tables within are collected up - * to the target batch size and then concatenated on the host before the data - * is transferred to the GPU. + * Iterator that expects only "CoalescedHostResult"s as the input, and transfers + * them to GPU. */ -class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], - dataTypes: Array[DataType], - metricsMap: Map[String, GpuMetric]) - extends Iterator[ColumnarBatch] { +class GpuShuffleCoalesceIterator(iter: Iterator[CoalescedHostResult], + dataTypes: Array[DataType], + metricsMap: Map[String, GpuMetric]) extends Iterator[ColumnarBatch] { private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME) private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES) private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS) @@ -211,22 +323,21 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], throw new NoSuchElementException("No more columnar batches") } withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ => - val hostConcatResult = withResource(new MetricRange(opTimeMetric)) { _ => + val hostCoalescedResult = withResource(new MetricRange(opTimeMetric)) { _ => // op time covers concat time performed in `iter.next()`. // Note the concat runs on CPU. // GPU time = opTime - concatTime iter.next() } - withResource(hostConcatResult) { _ => + withResource(hostCoalescedResult) { _ => // We acquire the GPU regardless of whether `hostConcatResult` // is an empty batch or not, because the downstream tasks expect // the `GpuShuffleCoalesceIterator` to acquire the semaphore and may // generate GPU data from batches that are empty. GpuSemaphore.acquireIfNecessary(TaskContext.get()) - withResource(new MetricRange(opTimeMetric)) { _ => - val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + val batch = hostCoalescedResult.toGpuBatch(dataTypes) outputBatchesMetric += 1 outputRowsMetric += batch.numRows() batch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index b4841046acc..b9525c73966 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import scala.collection.mutable import ai.rapids.cudf.{NvtxColor, NvtxRange} -import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} @@ -72,6 +71,7 @@ class GpuShuffledHashJoinMeta( val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val useSizedJoin = GpuShuffledSizedHashJoinExec.useSizedJoin(conf, join.joinType, join.leftKeys, join.rightKeys) + val readOpt = CoalesceReadOption(conf) val joinExec = join.joinType match { case LeftOuter | RightOuter if useSizedJoin => GpuShuffledAsymmetricHashJoinExec( @@ -83,6 +83,7 @@ class GpuShuffledHashJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, isSkewJoin = false)( join.leftKeys, join.rightKeys, @@ -97,6 +98,7 @@ class GpuShuffledHashJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, isSkewJoin = false)( join.leftKeys, join.rightKeys) @@ -285,16 +287,13 @@ object GpuShuffledHashJoinExec extends Logging { val buildTypes = buildOutput.map(_.dataType).toArray closeOnExcept(new CloseableBufferedIterator(buildIter)) { bufBuildIter => val startTime = System.nanoTime() + var isBuildSerialized = false // Batches type detection - val isBuildSerialized = bufBuildIter.hasNext && isBatchSerialized(bufBuildIter.head) - - // Let batches coalesce for size overflow check - val coalesceBuiltIter = if (isBuildSerialized) { - new HostShuffleCoalesceIterator(bufBuildIter, targetSize, coalesceMetrics) - } else { // Batches on GPU have already coalesced to the target size by the given goal. - bufBuildIter - } - + val coalesceBuiltIter = getHostShuffleCoalesceIterator( + bufBuildIter, targetSize, coalesceMetrics).map { iter => + isBuildSerialized = true + iter + }.getOrElse(bufBuildIter) if (coalesceBuiltIter.hasNext) { val firstBuildBatch = coalesceBuiltIter.next() // Batches have coalesced to the target size, so size will overflow if there are @@ -309,7 +308,7 @@ object GpuShuffledHashJoinExec extends Logging { // It can be optimized for grabbing the GPU semaphore when there is only a single // serialized host batch and the sub-partitioning is not activated. val (singleBuildCb, bufferedStreamIter) = getBuildBatchOptimizedAndClose( - firstBuildBatch.asInstanceOf[HostConcatResult], streamIter, buildTypes, + firstBuildBatch.asInstanceOf[CoalescedHostResult], streamIter, buildTypes, buildGoal, buildTime) logDebug("In the optimized case for grabbing the GPU semaphore, return " + s"a single batch (size: ${getBatchSize(singleBuildCb)}) for the build side " + @@ -321,7 +320,7 @@ object GpuShuffledHashJoinExec extends Logging { coalesceBuiltIter val gpuBuildIter = if (isBuildSerialized) { // batches on host, move them to GPU - new GpuShuffleCoalesceIterator(safeIter.asInstanceOf[Iterator[HostConcatResult]], + new GpuShuffleCoalesceIterator(safeIter.asInstanceOf[Iterator[CoalescedHostResult]], buildTypes, coalesceMetrics) } else { // batches already on GPU safeIter.asInstanceOf[Iterator[ColumnarBatch]] @@ -411,16 +410,16 @@ object GpuShuffledHashJoinExec extends Logging { } } - /** Only accepts a HostConcatResult or a ColumnarBatch as input */ + /** Only accepts a CoalescedHostResult or a ColumnarBatch as input */ private def getBatchSize(maybeBatch: AnyRef): Long = maybeBatch match { case batch: ColumnarBatch => GpuColumnVector.getTotalDeviceMemoryUsed(batch) - case hostBatch: HostConcatResult => hostBatch.getTableHeader().getDataLen() - case _ => throw new IllegalStateException(s"Expect a HostConcatResult or a " + + case hostBatch: CoalescedHostResult => hostBatch.getDataSize + case _ => throw new IllegalStateException(s"Expect a CoalescedHostResult or a " + s"ColumnarBatch, but got a ${maybeBatch.getClass.getSimpleName}") } private def getBuildBatchOptimizedAndClose( - hostConcatResult: HostConcatResult, + hostConcatResult: CoalescedHostResult, streamIter: Iterator[ColumnarBatch], buildDataTypes: Array[DataType], buildGoal: CoalesceSizeGoal, @@ -441,8 +440,7 @@ object GpuShuffledHashJoinExec extends Logging { } // Bring the build batch to the GPU now val buildBatch = buildTime.ns { - val cb = - cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, buildDataTypes) + val cb = hostConcatResult.toGpuBatch(buildDataTypes) getFilterFunc(buildGoal).map(filterAndClose => filterAndClose(cb)).getOrElse(cb) } (buildBatch, bufStreamIter) @@ -463,8 +461,20 @@ object GpuShuffledHashJoinExec extends Logging { ConcatAndConsumeAll.getSingleBatchWithVerification(singleBatchIter, inputAttrs) } - def isBatchSerialized(batch: ColumnarBatch): Boolean = { - batch.numCols() == 1 && batch.column(0).isInstanceOf[SerializedTableColumn] + private def getHostShuffleCoalesceIterator( + iter: BufferedIterator[ColumnarBatch], + targetSize: Long, + coalesceMetrics: Map[String, GpuMetric]): Option[Iterator[CoalescedHostResult]] = { + var retIter: Option[Iterator[CoalescedHostResult]] = None + if (iter.hasNext && iter.head.numCols() == 1) { + iter.head.column(0) match { + // TODO add the Kudo case + case _: SerializedTableColumn => + retIter = Some(new HostShuffleCoalesceIterator(iter, targetSize, coalesceMetrics)) + case _ => // should be gpu batches + } + } + retIter } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala index 4d06bdf0553..252c31da125 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala @@ -223,6 +223,8 @@ object GpuShuffledSizedHashJoinExec { * grabbing the GPU semaphore. */ trait HostHostJoinSizer extends JoinSizer[SpillableHostConcatResult] { + def readOption: CoalesceReadOption + override def setupForProbe( iter: Iterator[ColumnarBatch]): Iterator[SpillableHostConcatResult] = { new SpillableHostConcatResultFromColumnarBatchIterator(iter) @@ -235,24 +237,21 @@ object GpuShuffledSizedHashJoinExec { gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = { val concatMetrics = getConcatMetrics(metrics) - val bufferedCoalesceIter = new CloseableBufferedIterator( - new HostShuffleCoalesceIterator( - new HostQueueBatchIterator(queue, remainingIter), - gpuBatchSizeBytes, - concatMetrics)) - withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => - // Force a coalesce of the first batch before we grab the GPU semaphore - bufferedCoalesceIter.headOption - } - new GpuShuffleCoalesceIterator(bufferedCoalesceIter, batchTypes, concatMetrics) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + new HostQueueBatchIterator(queue, remainingIter), + gpuBatchSizeBytes, + batchTypes, + readOption, + concatMetrics, + prefetchFirstBatch = true) } override def getProbeBatchRowCount(batch: SpillableHostConcatResult): Long = { - batch.header.getNumRows + batch.getNumRows } override def getProbeBatchDataSize(batch: SpillableHostConcatResult): Long = { - batch.header.getDataLen + batch.getDataLen } } @@ -265,6 +264,8 @@ object GpuShuffledSizedHashJoinExec { * See https://github.com/NVIDIA/spark-rapids/issues/11322. */ trait HostHostUnspillableJoinSizer extends JoinSizer[ColumnarBatch] { + def readOption: CoalesceReadOption + override def setupForProbe( iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = iter @@ -275,31 +276,25 @@ object GpuShuffledSizedHashJoinExec { gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = { val concatMetrics = getConcatMetrics(metrics) - val bufferedCoalesceIter = new CloseableBufferedIterator( - new HostShuffleCoalesceIterator( - queue.iterator ++ remainingIter, - gpuBatchSizeBytes, - concatMetrics)) - withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => - // Force a coalesce of the first batch before we grab the GPU semaphore - bufferedCoalesceIter.headOption - } - new GpuShuffleCoalesceIterator(bufferedCoalesceIter, batchTypes, concatMetrics) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + queue.iterator ++ remainingIter, + gpuBatchSizeBytes, + batchTypes, + readOption, + concatMetrics, + prefetchFirstBatch = true) } override def getProbeBatchRowCount(batch: ColumnarBatch): Long = batch.numRows() override def getProbeBatchDataSize(batch: ColumnarBatch): Long = { - SerializedTableColumn.getMemoryUsed(batch) + GpuShuffleCoalesceUtils.getSerializedBufferSize(batch) } } /** * Join sizer to use when at least one side of the join is coming from another GPU exec node * such that the GPU semaphore is already held. Caches input batches on the GPU. - * - * @param startWithLeftSide whether to prefer fetching from the left or right side first - * when probing for table sizes. */ trait SpillableColumnarBatchJoinSizer extends JoinSizer[SpillableColumnarBatch] { override def setupForProbe(iter: Iterator[ColumnarBatch]): Iterator[SpillableColumnarBatch] = { @@ -377,8 +372,10 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex def isSkewJoin: Boolean def cpuLeftKeys: Seq[Expression] def cpuRightKeys: Seq[Expression] + def readOption: CoalesceReadOption - protected def createHostHostSizer(): JoinSizer[HOST_BATCH_TYPE] + protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[HOST_BATCH_TYPE] protected def createSpillableColumnarBatchSizer( startWithLeftSide: Boolean): JoinSizer[SpillableColumnarBatch] @@ -425,20 +422,21 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex val localCondition = condition val localGpuBatchSizeBytes = gpuBatchSizeBytes val localMetrics = allMetrics.withDefaultValue(NoopMetric) + val localReadOption = readOption left.executeColumnar().zipPartitions(right.executeColumnar()) { case (leftIter, rightIter) => val joinInfo = (isLeftHost, isRightHost) match { case (true, true) => getHostHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (true, false) => getHostGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (false, true) => getGpuHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (false, false) => getGpuGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, @@ -539,8 +537,9 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { - val sizer = createHostHostSizer() + val sizer = createHostHostSizer(readOption) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -559,12 +558,15 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = createSpillableColumnarBatchSizer(startWithLeftSide = true) val concatMetrics = getConcatMetrics(metrics) - val leftIter = new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(rawLeftIter, gpuBatchSizeBytes, concatMetrics), + val leftIter = GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + rawLeftIter, + gpuBatchSizeBytes, leftOutput.map(_.dataType).toArray, + readOption, concatMetrics) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) @@ -584,12 +586,15 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rawRightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = createSpillableColumnarBatchSizer(startWithLeftSide = false) val concatMetrics = getConcatMetrics(metrics) - val rightIter = new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(rawRightIter, gpuBatchSizeBytes, concatMetrics), + val rightIter = GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + rawRightIter, + gpuBatchSizeBytes, rightOutput.map(_.dataType).toArray, + readOption, concatMetrics) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) @@ -728,8 +733,9 @@ object GpuShuffledSymmetricHashJoinExec { } } - class HostHostSymmetricJoinSizer extends SymmetricJoinSizer[SpillableHostConcatResult] - with HostHostJoinSizer { + class HostHostSymmetricJoinSizer(override val readOption: CoalesceReadOption) + extends SymmetricJoinSizer[SpillableHostConcatResult] with HostHostJoinSizer { + override val startWithLeftSide: Boolean = true } @@ -762,6 +768,7 @@ case class GpuShuffledSymmetricHashJoinExec( override val right: SparkPlan, override val isGpuShuffle: Boolean, override val gpuBatchSizeBytes: Long, + override val readOption: CoalesceReadOption, override val isSkewJoin: Boolean)( override val cpuLeftKeys: Seq[Expression], override val cpuRightKeys: Seq[Expression]) @@ -771,8 +778,9 @@ case class GpuShuffledSymmetricHashJoinExec( override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) - override protected def createHostHostSizer(): JoinSizer[SpillableHostConcatResult] = { - new HostHostSymmetricJoinSizer() + override protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[SpillableHostConcatResult] = { + new HostHostSymmetricJoinSizer(readOption) } override protected def createSpillableColumnarBatchSizer( @@ -1022,7 +1030,9 @@ object GpuShuffledAsymmetricHashJoinExec { } } - class HostHostAsymmetricJoinSizer(override val magnificationThreshold: Int) + class HostHostAsymmetricJoinSizer( + override val magnificationThreshold: Int, + override val readOption: CoalesceReadOption) extends AsymmetricJoinSizer[ColumnarBatch] with HostHostUnspillableJoinSizer { } @@ -1055,6 +1065,7 @@ case class GpuShuffledAsymmetricHashJoinExec( override val right: SparkPlan, override val isGpuShuffle: Boolean, override val gpuBatchSizeBytes: Long, + override val readOption: CoalesceReadOption, override val isSkewJoin: Boolean)( override val cpuLeftKeys: Seq[Expression], override val cpuRightKeys: Seq[Expression], @@ -1064,8 +1075,9 @@ case class GpuShuffledAsymmetricHashJoinExec( override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys, magnificationThreshold) - override protected def createHostHostSizer(): JoinSizer[ColumnarBatch] = { - new HostHostAsymmetricJoinSizer(magnificationThreshold) + override protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[ColumnarBatch] = { + new HostHostAsymmetricJoinSizer(magnificationThreshold, readOption) } override protected def createSpillableColumnarBatchSizer( @@ -1077,19 +1089,14 @@ case class GpuShuffledAsymmetricHashJoinExec( /** * A spillable form of a HostConcatResult. Takes ownership of the specified host buffer. */ -class SpillableHostConcatResult( - val header: SerializedTableHeader, - hmb: HostMemoryBuffer) extends AutoCloseable { - private var buffer = { - SpillableHostBuffer(hmb, hmb.getLength, SpillPriorities.ACTIVE_BATCHING_PRIORITY) - } +sealed trait SpillableHostConcatResult extends AutoCloseable { + def hmb: HostMemoryBuffer + def toBatch: ColumnarBatch + def getNumRows: Long + def getDataLen: Long - def getHostMemoryBufferAndClose(): HostMemoryBuffer = { - val hostBuffer = buffer.getHostBuffer() - closeOnExcept(hostBuffer) { _ => - close() - } - hostBuffer + protected var buffer = { + SpillableHostBuffer(hmb, hmb.getLength, SpillPriorities.ACTIVE_BATCHING_PRIORITY) } override def close(): Unit = { @@ -1098,6 +1105,36 @@ class SpillableHostConcatResult( } } +class CudfSpillableHostConcatResult( + header: SerializedTableHeader, + val hmb: HostMemoryBuffer) extends SpillableHostConcatResult { + + override def toBatch: ColumnarBatch = { + closeOnExcept(buffer.getHostBuffer()) { hostBuf => + SerializedTableColumn.from(header, hostBuf) + } + } + + override def getNumRows: Long = header.getNumRows + + override def getDataLen: Long = header.getDataLen +} + +object SpillableHostConcatResult { + def from(batch: ColumnarBatch): SpillableHostConcatResult = { + require(batch.numCols() > 0, "Batch must have at least 1 column") + batch.column(0) match { + // TODO add the Kudo case + case col: SerializedTableColumn => + val buffer = col.hostBuffer + buffer.incRefCount() + new CudfSpillableHostConcatResult(col.header, buffer) + case c => + throw new IllegalStateException(s"Expected SerializedTableColumn, got ${c.getClass}") + } + } +} + /** * Converts an iterator of shuffle batches in host memory into an iterator of spillable * host memory batches. @@ -1107,17 +1144,7 @@ class SpillableHostConcatResultFromColumnarBatchIterator( override def hasNext: Boolean = iter.hasNext override def next(): SpillableHostConcatResult = { - withResource(iter.next()) { batch => - require(batch.numCols() > 0, "Batch must have at least 1 column") - batch.column(0) match { - case col: SerializedTableColumn => - val buffer = col.hostBuffer - buffer.incRefCount() - new SpillableHostConcatResult(col.header, buffer) - case c => - throw new IllegalStateException(s"Expected SerializedTableColumn, got ${c.getClass}") - } - } + withResource(iter.next())(SpillableHostConcatResult.from) } } @@ -1137,10 +1164,7 @@ class HostQueueBatchIterator( override def next(): ColumnarBatch = { if (spillableQueue.nonEmpty) { - val shcr = spillableQueue.dequeue() - closeOnExcept(shcr.getHostMemoryBufferAndClose()) { hostBuffer => - SerializedTableColumn.from(shcr.header, hostBuffer) - } + withResource(spillableQueue.dequeue())(_.toBatch) } else { batchIter.next() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index b7a9fcb9020..7d7adfc5097 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -84,6 +84,7 @@ class GpuSortMergeJoinMeta( val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val useSizedJoin = GpuShuffledSizedHashJoinExec.useSizedJoin(conf, join.joinType, join.leftKeys, join.rightKeys) + val readOpt = CoalesceReadOption(conf) val joinExec = join.joinType match { case LeftOuter | RightOuter if useSizedJoin => GpuShuffledAsymmetricHashJoinExec( @@ -95,6 +96,7 @@ class GpuSortMergeJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, join.isSkewJoin)( join.leftKeys, join.rightKeys, @@ -109,6 +111,7 @@ class GpuSortMergeJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, join.isSkewJoin)( join.leftKeys, join.rightKeys)