Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparation for the coming Kudo support #11667

Merged
merged 8 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,11 @@ package com.nvidia.spark.rapids

import java.util

import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import scala.reflect.ClassTag

import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

Expand Down Expand Up @@ -66,78 +68,180 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long)
val metricsMap = allMetrics
val targetSize = targetBatchByteSize
val dataTypes = GpuColumnVector.extractTypes(schema)
val readOption = CoalesceReadOption(new RapidsConf(conf))

child.executeColumnar().mapPartitions { iter =>
new GpuShuffleCoalesceIterator(
new HostShuffleCoalesceIterator(iter, targetSize, metricsMap),
dataTypes, metricsMap)
GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator(iter, targetSize, dataTypes,
readOption, metricsMap)
}
}
}

/** A case class to pack some options. Now it has only one, but may have more in the future */
case class CoalesceReadOption private(kudoEnabled: Boolean)

object CoalesceReadOption {
def apply(conf: RapidsConf): CoalesceReadOption = {
// TODO get the value from conf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this?

Copy link
Collaborator Author

@firestarman firestarman Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to have.
It will be quite easy to add in new fields that can be got from the conf. And you do not need to update all the places where the CoalesceReadOption(conf) is called.

CoalesceReadOption(false)
}
}

object GpuShuffleCoalesceUtils {
def getGpuShuffleCoalesceIterator(
iter: Iterator[ColumnarBatch],
targetSize: Long,
dataTypes: Array[DataType],
readOption: CoalesceReadOption,
metricsMap: Map[String, GpuMetric],
prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = {
liurenjie1024 marked this conversation as resolved.
Show resolved Hide resolved
val hostIter = if (readOption.kudoEnabled) {
// TODO replace with the actual Kudo host iterator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please throw an exception instead? If this is true we have problems and making the data disappear feels wrong to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, done

Iterator.empty
} else {
new HostShuffleCoalesceIterator(iter, targetSize, metricsMap)
}
val maybeBufferedIter = if (prefetchFirstBatch) {
val bufferedIter = new CloseableBufferedIterator(hostIter)
withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ =>
// Force a coalesce of the first batch before we grab the GPU semaphore
bufferedIter.headOption
}
bufferedIter
} else {
hostIter
}
new GpuShuffleCoalesceIterator(maybeBufferedIter, dataTypes, metricsMap)
}

/**
* Return an iterator that can coalesce serialized batches if they are just
* returned from the Shuffle deserializer. Otherwise, None will be returned.
*/
def getHostShuffleCoalesceIterator(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
iter: BufferedIterator[ColumnarBatch],
targetSize: Long,
coalesceMetrics: Map[String, GpuMetric]): Option[Iterator[AutoCloseable]] = {
var retIter: Option[Iterator[AutoCloseable]] = None
if (iter.hasNext && iter.head.numCols() == 1) {
iter.head.column(0) match {
// TODO add the Kudo case
case _: SerializedTableColumn =>
retIter = Some(new HostShuffleCoalesceIterator(iter, targetSize, coalesceMetrics))
case _ => // should be gpu batches
}
}
retIter
}

/** Get the buffer size of a serialized batch just returned by the Shuffle deserializer */
def getSerializedBufferSize(cb: ColumnarBatch): Long = {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
assert(cb.numCols() == 1)
val hmb = cb.column(0) match {
// TODO add the Kudo case
case serCol: SerializedTableColumn => serCol.hostBuffer
case o => throw new IllegalStateException(s"unsupported type: ${o.getClass}")
}
if (hmb != null) hmb.getLength else 0L
}

/**
* Get the buffer size of the coalesced result, it accepts either a concatenated
* host buffer from the Shuffle coalesce exec, or a coalesced GPU batch.
*/
def getCoalescedBufferSize(concated: AnyRef): Long = concated match {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case c: HostConcatResult => c.getTableHeader.getDataLen
case g => GpuColumnVector.getTotalDeviceMemoryUsed(g.asInstanceOf[ColumnarBatch])
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we please have some better error messages here or something? I don't like that we have g.asInstanceOf, when the match should do that for us. It also means that the error message if something goes wrong will just be a class cast exception instead of a cleaner message

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is removed.

}

/** Try to convert a coalesced host buffer to a GPU batch. */
def coalescedResultToGpuIfAllowed(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
coalescedResult: AnyRef,
jlowe marked this conversation as resolved.
Show resolved Hide resolved
dataTypes: Array[DataType]): ColumnarBatch = coalescedResult match {
case c: HostConcatResult =>
cudf_utils.HostConcatResultUtil.getColumnarBatch(c, dataTypes)
case o =>
throw new IllegalArgumentException(s"unsupported type: ${o.getClass}")
}
}

/**
* A trait defining some operations on the table T.
* This is used by HostCoalesceIteratorBase to separate the table operations from
* the shuffle read process.
*/
sealed trait TableOperator[T <: AutoCloseable, C] {
firestarman marked this conversation as resolved.
Show resolved Hide resolved
def getDataLen(table: T): Long
def getNumRows(table: T): Int
def concatOnHost(tables: Array[T]): C
}

class CudfTableOperator extends TableOperator[SerializedTableColumn, HostConcatResult] {
firestarman marked this conversation as resolved.
Show resolved Hide resolved
override def getDataLen(table: SerializedTableColumn): Long = table.header.getDataLen
override def getNumRows(table: SerializedTableColumn): Int = table.header.getNumRows

override def concatOnHost(tables: Array[SerializedTableColumn]): HostConcatResult = {
assert(tables.nonEmpty, "no tables to be concatenated")
val numCols = tables.head.header.getNumColumns
if (numCols == 0) {
val totalRowsNum = tables.map(getNumRows).sum
cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(totalRowsNum)
} else {
val (headers, buffers) = tables.map(t => (t.header, t.hostBuffer)).unzip
JCudfSerialization.concatToHostBuffer(headers, buffers)
}
}
}

/**
* Iterator that coalesces columnar batches that are expected to only contain
* [[SerializedTableColumn]]. The serialized tables within are collected up
* serialized tables from shuffle. The serialized tables within are collected up
* to the target batch size and then concatenated on the host before handing
* them to the caller on `.next()`
*/
class HostShuffleCoalesceIterator(
abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag, C](
iter: Iterator[ColumnarBatch],
targetBatchByteSize: Long,
metricsMap: Map[String, GpuMetric])
extends Iterator[HostConcatResult] with AutoCloseable {
metricsMap: Map[String, GpuMetric]) extends Iterator[C] with AutoCloseable {
private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME)
private[this] val inputBatchesMetric = metricsMap(GpuMetric.NUM_INPUT_BATCHES)
private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS)
private[this] val serializedTables = new util.ArrayDeque[SerializedTableColumn]
private[this] val serializedTables = new util.ArrayDeque[T]
private[this] var numTablesInBatch: Int = 0
private[this] var numRowsInBatch: Int = 0
private[this] var batchByteSize: Long = 0L

// Don't install the callback if in a unit test
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
close()
}
onTaskCompletion(tc)(close())
}

protected def tableOperator: TableOperator[T, C]

override def close(): Unit = {
serializedTables.forEach(_.close())
serializedTables.clear()
}

def concatenateTablesInHost(): HostConcatResult = {
private def concatenateTablesInHost(): C = {
val result = withResource(new MetricRange(concatTimeMetric)) { _ =>
val firstHeader = serializedTables.peekFirst().header
if (firstHeader.getNumColumns == 0) {
(0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst())
cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch)
} else {
val headers = new Array[SerializedTableHeader](numTablesInBatch)
withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers =>
headers.indices.foreach { i =>
val serializedTable = serializedTables.removeFirst()
headers(i) = serializedTable.header
buffers(i) = serializedTable.hostBuffer
}
JCudfSerialization.concatToHostBuffer(headers, buffers)
}
withResource(new Array[T](numTablesInBatch)) { tables =>
tables.indices.foreach(i => tables(i) = serializedTables.removeFirst())
tableOperator.concatOnHost(tables)
}
}

// update the stats for the next batch in progress
numTablesInBatch = serializedTables.size

batchByteSize = 0
numRowsInBatch = 0
if (numTablesInBatch > 0) {
require(numTablesInBatch == 1,
"should only track at most one buffer that is not in a batch")
val header = serializedTables.peekFirst().header
batchByteSize = header.getDataLen
numRowsInBatch = header.getNumRows
val firstTable = serializedTables.peekFirst()
batchByteSize = tableOperator.getDataLen(firstTable)
numRowsInBatch = tableOperator.getNumRows(firstTable)
}

result
}

Expand All @@ -150,14 +254,14 @@ class HostShuffleCoalesceIterator(
// don't bother tracking empty tables
if (batch.numRows > 0) {
inputRowsMetric += batch.numRows()
val tableColumn = batch.column(0).asInstanceOf[SerializedTableColumn]
batchCanGrow = canAddToBatch(tableColumn.header)
val tableColumn = batch.column(0).asInstanceOf[T]
batchCanGrow = canAddToBatch(tableColumn)
serializedTables.addLast(tableColumn)
// always add the first table to the batch even if its beyond the target limits
if (batchCanGrow || numTablesInBatch == 0) {
numTablesInBatch += 1
numRowsInBatch += tableColumn.header.getNumRows
batchByteSize += tableColumn.header.getDataLen
numRowsInBatch += tableOperator.getNumRows(tableColumn)
batchByteSize += tableOperator.getDataLen(tableColumn)
}
} else {
batch.close()
Expand All @@ -172,34 +276,40 @@ class HostShuffleCoalesceIterator(
numTablesInBatch > 0
}

override def next(): HostConcatResult = {
override def next(): C = {
if (!hasNext()) {
throw new NoSuchElementException("No more host batches to concatenate")
}
concatenateTablesInHost()
}

private def canAddToBatch(nextTable: SerializedTableHeader): Boolean = {
if (batchByteSize + nextTable.getDataLen > targetBatchByteSize) {
private def canAddToBatch(nextTable: T): Boolean = {
if (batchByteSize + tableOperator.getDataLen(nextTable) > targetBatchByteSize) {
return false
}
if (numRowsInBatch.toLong + nextTable.getNumRows > Integer.MAX_VALUE) {
if (numRowsInBatch.toLong + tableOperator.getNumRows(nextTable) > Integer.MAX_VALUE) {
return false
}
true
}
}

class HostShuffleCoalesceIterator(
iter: Iterator[ColumnarBatch],
targetBatchByteSize: Long,
metricsMap: Map[String, GpuMetric])
extends HostCoalesceIteratorBase[SerializedTableColumn, HostConcatResult](iter,
targetBatchByteSize, metricsMap) {
override protected def tableOperator = new CudfTableOperator
}

/**
* Iterator that coalesces columnar batches that are expected to only contain
* [[SerializedTableColumn]]. The serialized tables within are collected up
* to the target batch size and then concatenated on the host before the data
* is transferred to the GPU.
* Iterator that expects only the coalesced host buffers as the input, and transfers
* the host buffers to GPU.
*/
class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult],
dataTypes: Array[DataType],
metricsMap: Map[String, GpuMetric])
extends Iterator[ColumnarBatch] {
class GpuShuffleCoalesceIterator(iter: Iterator[AnyRef],
jlowe marked this conversation as resolved.
Show resolved Hide resolved
dataTypes: Array[DataType],
metricsMap: Map[String, GpuMetric]) extends Iterator[ColumnarBatch] {
private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME)
private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES)
private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS)
Expand All @@ -218,15 +328,15 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult],
iter.next()
}

withResource(hostConcatResult) { _ =>
withResourceIfAllowed(hostConcatResult) { _ =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
// We acquire the GPU regardless of whether `hostConcatResult`
// is an empty batch or not, because the downstream tasks expect
// the `GpuShuffleCoalesceIterator` to acquire the semaphore and may
// generate GPU data from batches that are empty.
GpuSemaphore.acquireIfNecessary(TaskContext.get())

withResource(new MetricRange(opTimeMetric)) { _ =>
val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
val batch = GpuShuffleCoalesceUtils.coalescedResultToGpuIfAllowed(
hostConcatResult, dataTypes)
outputBatchesMetric += 1
outputRowsMetric += batch.numRows()
batch
Expand Down
Loading