Skip to content

Commit

Permalink
address more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Oct 31, 2024
1 parent 13058a6 commit 666f2a8
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ object CoalesceReadOption {
}

object GpuShuffleCoalesceUtils {
/**
* Return an iterator that will pull in batches from the input iterator,
* concatenate them up to the "targetSize" and move the concatenated result
* to the GPU for each output batch.
* The input iterator is expected to contain only serialized host batches just
* returned from the Shuffle deserializer. Otherwise, it will blow up.
*
* @param iter the input iterator containing only serialized host batches
* @param targetSize the target batch size for coalescing
* @param dataTypes the schema of the input batches
* @param readOption the coalesce read option
* @param metricsMap metrics map
* @param prefetchFirstBatch whether prefetching the first bundle of serialized
* batches with the total siz up to the "targetSize". The
* prefetch batches will be cached on host until the "next()"
* is called. This is for some optimization cases in join.
*/
def getGpuShuffleCoalesceIterator(
iter: Iterator[ColumnarBatch],
targetSize: Long,
Expand All @@ -97,7 +114,7 @@ object GpuShuffleCoalesceUtils {
prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = {
val hostIter = if (readOption.kudoEnabled) {
// TODO replace with the actual Kudo host iterator
Iterator.empty
throw new UnsupportedOperationException("Kudo is not supported yet")
} else {
new HostShuffleCoalesceIterator(iter, targetSize, metricsMap)
}
Expand All @@ -114,26 +131,6 @@ object GpuShuffleCoalesceUtils {
new GpuShuffleCoalesceIterator(maybeBufferedIter, dataTypes, metricsMap)
}

/**
* Return an iterator that can coalesce serialized batches if they are just
* returned from the Shuffle deserializer. Otherwise, None will be returned.
*/
def getHostShuffleCoalesceIterator(
iter: BufferedIterator[ColumnarBatch],
targetSize: Long,
coalesceMetrics: Map[String, GpuMetric]): Option[Iterator[AutoCloseable]] = {
var retIter: Option[Iterator[AutoCloseable]] = None
if (iter.hasNext && iter.head.numCols() == 1) {
iter.head.column(0) match {
// TODO add the Kudo case
case _: SerializedTableColumn =>
retIter = Some(new HostShuffleCoalesceIterator(iter, targetSize, coalesceMetrics))
case _ => // should be gpu batches
}
}
retIter
}

/** Get the buffer size of a serialized batch just returned by the Shuffle deserializer */
def getSerializedBufferSize(cb: ColumnarBatch): Long = {
assert(cb.numCols() == 1)
Expand All @@ -144,54 +141,56 @@ object GpuShuffleCoalesceUtils {
}
if (hmb != null) hmb.getLength else 0L
}
}

/**
* Get the buffer size of the coalesced result, it accepts either a concatenated
* host buffer from the Shuffle coalesce exec, or a coalesced GPU batch.
*/
def getCoalescedBufferSize(concated: AnyRef): Long = concated match {
// TODO add the Kudo case
case c: HostConcatResult => c.getTableHeader.getDataLen
case g => GpuColumnVector.getTotalDeviceMemoryUsed(g.asInstanceOf[ColumnarBatch])
}
/**
* A trait representing the shuffle coalesced result by the Shuffle coalesce iterator.
*/
sealed trait CoalescedHostResult extends AutoCloseable {
/** Convert itself to a GPU batch */
def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch

/** Try to convert a coalesced host buffer to a GPU batch. */
def coalescedResultToGpuIfAllowed(
coalescedResult: AnyRef,
dataTypes: Array[DataType]): ColumnarBatch = coalescedResult match {
// TODO add the Kudo case
case c: HostConcatResult =>
cudf_utils.HostConcatResultUtil.getColumnarBatch(c, dataTypes)
case o =>
throw new IllegalArgumentException(s"unsupported type: ${o.getClass}")
}
/** Get the data size */
def getDataSize: Long
}

/**
* A trait defining some operations on the table T.
* This is used by HostCoalesceIteratorBase to separate the table operations from
* the shuffle read process.
*/
sealed trait SerializedTableOperator[T <: AutoCloseable, C] {
sealed trait SerializedTableOperator[T <: AutoCloseable] {
def getDataLen(table: T): Long
def getNumRows(table: T): Int
def concatOnHost(tables: Array[T]): C
def concatOnHost(tables: Array[T]): CoalescedHostResult
}

class JCudfCoalescedHostResult(hostConcatResult: HostConcatResult) extends CoalescedHostResult {
assert(hostConcatResult != null, "hostConcatResult should not be null")

override def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch =
cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)

override def close(): Unit = hostConcatResult.close()

override def getDataSize: Long = hostConcatResult.getTableHeader.getDataLen
}

class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn, HostConcatResult] {
class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn] {
override def getDataLen(table: SerializedTableColumn): Long = table.header.getDataLen
override def getNumRows(table: SerializedTableColumn): Int = table.header.getNumRows

override def concatOnHost(tables: Array[SerializedTableColumn]): HostConcatResult = {
override def concatOnHost(tables: Array[SerializedTableColumn]): CoalescedHostResult = {
assert(tables.nonEmpty, "no tables to be concatenated")
val numCols = tables.head.header.getNumColumns
if (numCols == 0) {
val ret = if (numCols == 0) {
val totalRowsNum = tables.map(getNumRows).sum
cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(totalRowsNum)
} else {
val (headers, buffers) = tables.map(t => (t.header, t.hostBuffer)).unzip
JCudfSerialization.concatToHostBuffer(headers, buffers)
}
new JCudfCoalescedHostResult(ret)
}
}

Expand All @@ -201,10 +200,12 @@ class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn,
* to the target batch size and then concatenated on the host before handing
* them to the caller on `.next()`
*/
abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag, C](
abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag](
iter: Iterator[ColumnarBatch],
targetBatchByteSize: Long,
metricsMap: Map[String, GpuMetric]) extends Iterator[C] with AutoCloseable {
metricsMap: Map[String, GpuMetric])
extends Iterator[CoalescedHostResult] with AutoCloseable {

private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME)
private[this] val inputBatchesMetric = metricsMap(GpuMetric.NUM_INPUT_BATCHES)
private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS)
Expand All @@ -218,14 +219,14 @@ abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag, C](
onTaskCompletion(tc)(close())
}

protected def tableOperator: SerializedTableOperator[T, C]
protected def tableOperator: SerializedTableOperator[T]

override def close(): Unit = {
serializedTables.forEach(_.close())
serializedTables.clear()
}

private def concatenateTablesInHost(): C = {
private def concatenateTablesInHost(): CoalescedHostResult = {
val result = withResource(new MetricRange(concatTimeMetric)) { _ =>
withResource(new Array[T](numTablesInBatch)) { tables =>
tables.indices.foreach(i => tables(i) = serializedTables.removeFirst())
Expand Down Expand Up @@ -278,7 +279,7 @@ abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag, C](
numTablesInBatch > 0
}

override def next(): C = {
override def next(): CoalescedHostResult = {
if (!hasNext()) {
throw new NoSuchElementException("No more host batches to concatenate")
}
Expand All @@ -298,18 +299,17 @@ abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag, C](

class HostShuffleCoalesceIterator(
iter: Iterator[ColumnarBatch],
targetBatchByteSize: Long,
targetBatchSize: Long,
metricsMap: Map[String, GpuMetric])
extends HostCoalesceIteratorBase[SerializedTableColumn, HostConcatResult](iter,
targetBatchByteSize, metricsMap) {
extends HostCoalesceIteratorBase[SerializedTableColumn](iter, targetBatchSize, metricsMap) {
override protected def tableOperator = new JCudfTableOperator
}

/**
* Iterator that expects only the coalesced host buffers as the input, and transfers
* the host buffers to GPU.
* Iterator that expects only "CoalescedHostResult"s as the input, and transfers
* them to GPU.
*/
class GpuShuffleCoalesceIterator(iter: Iterator[AnyRef],
class GpuShuffleCoalesceIterator(iter: Iterator[CoalescedHostResult],
dataTypes: Array[DataType],
metricsMap: Map[String, GpuMetric]) extends Iterator[ColumnarBatch] {
private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME)
Expand All @@ -323,22 +323,21 @@ class GpuShuffleCoalesceIterator(iter: Iterator[AnyRef],
throw new NoSuchElementException("No more columnar batches")
}
withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ =>
val hostConcatResult = withResource(new MetricRange(opTimeMetric)) { _ =>
val hostCoalescedResult = withResource(new MetricRange(opTimeMetric)) { _ =>
// op time covers concat time performed in `iter.next()`.
// Note the concat runs on CPU.
// GPU time = opTime - concatTime
iter.next()
}

withResourceIfAllowed(hostConcatResult) { _ =>
withResource(hostCoalescedResult) { _ =>
// We acquire the GPU regardless of whether `hostConcatResult`
// is an empty batch or not, because the downstream tasks expect
// the `GpuShuffleCoalesceIterator` to acquire the semaphore and may
// generate GPU data from batches that are empty.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
withResource(new MetricRange(opTimeMetric)) { _ =>
val batch = GpuShuffleCoalesceUtils.coalescedResultToGpuIfAllowed(
hostConcatResult, dataTypes)
val batch = hostCoalescedResult.toGpuBatch(dataTypes)
outputBatchesMetric += 1
outputRowsMetric += batch.numRows()
batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ object GpuShuffledHashJoinExec extends Logging {
val startTime = System.nanoTime()
var isBuildSerialized = false
// Batches type detection
val coalesceBuiltIter = GpuShuffleCoalesceUtils.getHostShuffleCoalesceIterator(
val coalesceBuiltIter = getHostShuffleCoalesceIterator(
bufBuildIter, targetSize, coalesceMetrics).map { iter =>
isBuildSerialized = true
iter
Expand All @@ -299,8 +299,7 @@ object GpuShuffledHashJoinExec extends Logging {
// Batches have coalesced to the target size, so size will overflow if there are
// more than one batch, or the first batch size already exceeds the target.
val sizeOverflow = closeOnExcept(firstBuildBatch) { _ =>
coalesceBuiltIter.hasNext ||
GpuShuffleCoalesceUtils.getCoalescedBufferSize(firstBuildBatch) > targetSize
coalesceBuiltIter.hasNext || getBatchSize(firstBuildBatch) > targetSize
}
val needSingleBuildBatch = !subPartConf.getOrElse(sizeOverflow)
if (needSingleBuildBatch && isBuildSerialized && !sizeOverflow) {
Expand All @@ -309,19 +308,20 @@ object GpuShuffledHashJoinExec extends Logging {
// It can be optimized for grabbing the GPU semaphore when there is only a single
// serialized host batch and the sub-partitioning is not activated.
val (singleBuildCb, bufferedStreamIter) = getBuildBatchOptimizedAndClose(
firstBuildBatch, streamIter, buildTypes, buildGoal, buildTime)
firstBuildBatch.asInstanceOf[CoalescedHostResult], streamIter, buildTypes,
buildGoal, buildTime)
logDebug("In the optimized case for grabbing the GPU semaphore, return " +
s"a single batch (size: " +
s"${GpuShuffleCoalesceUtils.getCoalescedBufferSize(singleBuildCb)}) for the " +
s"build side with $buildGoal goal.")
s"a single batch (size: ${getBatchSize(singleBuildCb)}) for the build side " +
s"with $buildGoal goal.")
(Left(singleBuildCb), bufferedStreamIter)

} else { // Other cases without optimization
val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(Seq(firstBuildBatch)) ++
coalesceBuiltIter
val gpuBuildIter = if (isBuildSerialized) {
// batches on host, move them to GPU
new GpuShuffleCoalesceIterator(safeIter, buildTypes, coalesceMetrics)
new GpuShuffleCoalesceIterator(safeIter.asInstanceOf[Iterator[CoalescedHostResult]],
buildTypes, coalesceMetrics)
} else { // batches already on GPU
safeIter.asInstanceOf[Iterator[ColumnarBatch]]
}
Expand All @@ -333,9 +333,8 @@ object GpuShuffledHashJoinExec extends Logging {
}.getOrElse {
if (needSingleBuildBatch) {
val oneCB = getAsSingleBatch(gpuBuildIter, buildOutput, buildGoal, coalesceMetrics)
logDebug(s"Return a single batch (size: " +
s"${GpuShuffleCoalesceUtils.getCoalescedBufferSize(oneCB)}) for the build " +
s"side with $buildGoal goal.")
logDebug(s"Return a single batch (size: ${getBatchSize(oneCB)}) for the " +
s"build side with $buildGoal goal.")
Left(oneCB)
} else {
logDebug("Return multiple batches as the build side data for the following " +
Expand Down Expand Up @@ -411,8 +410,16 @@ object GpuShuffledHashJoinExec extends Logging {
}
}

/** Only accepts a CoalescedHostResult or a ColumnarBatch as input */
private def getBatchSize(maybeBatch: AnyRef): Long = maybeBatch match {
case batch: ColumnarBatch => GpuColumnVector.getTotalDeviceMemoryUsed(batch)
case hostBatch: CoalescedHostResult => hostBatch.getDataSize
case _ => throw new IllegalStateException(s"Expect a HostConcatResult or a " +
s"ColumnarBatch, but got a ${maybeBatch.getClass.getSimpleName}")
}

private def getBuildBatchOptimizedAndClose(
hostConcatResult: AutoCloseable,
hostConcatResult: CoalescedHostResult,
streamIter: Iterator[ColumnarBatch],
buildDataTypes: Array[DataType],
buildGoal: CoalesceSizeGoal,
Expand All @@ -433,8 +440,7 @@ object GpuShuffledHashJoinExec extends Logging {
}
// Bring the build batch to the GPU now
val buildBatch = buildTime.ns {
val cb = GpuShuffleCoalesceUtils.coalescedResultToGpuIfAllowed(
hostConcatResult, buildDataTypes)
val cb = hostConcatResult.toGpuBatch(buildDataTypes)
getFilterFunc(buildGoal).map(filterAndClose => filterAndClose(cb)).getOrElse(cb)
}
(buildBatch, bufStreamIter)
Expand All @@ -454,5 +460,21 @@ object GpuShuffledHashJoinExec extends Logging {
"single build batch")
ConcatAndConsumeAll.getSingleBatchWithVerification(singleBatchIter, inputAttrs)
}

private def getHostShuffleCoalesceIterator(
iter: BufferedIterator[ColumnarBatch],
targetSize: Long,
coalesceMetrics: Map[String, GpuMetric]): Option[Iterator[CoalescedHostResult]] = {
var retIter: Option[Iterator[CoalescedHostResult]] = None
if (iter.hasNext && iter.head.numCols() == 1) {
iter.head.column(0) match {
// TODO add the Kudo case
case _: SerializedTableColumn =>
retIter = Some(new HostShuffleCoalesceIterator(iter, targetSize, coalesceMetrics))
case _ => // should be gpu batches
}
}
retIter
}
}

0 comments on commit 666f2a8

Please sign in to comment.