Skip to content

Commit

Permalink
Add retry in sub hash join
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Nov 6, 2024
1 parent 5afee5b commit 6d3a33a
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.execution
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand Down Expand Up @@ -179,9 +179,19 @@ class GpuBatchSubPartitioner(
// 1) Hash partition on the batch
val partedTable = GpuHashPartitioningBase.hashPartitionAndClose(
gpuBatch, inputBoundKeys, realNumPartitions, "Sub-Hash Calculate", hashSeed)
val (spillBatch, partitions) = withResource(partedTable) { _ =>
// Convert to SpillableColumnarBatch for the following retry.
(SpillableColumnarBatch(GpuColumnVector.from(partedTable.getTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY),
partedTable.getPartitions)
}
// 2) Split into smaller tables according to partitions
val subTables = withResource(partedTable) { _ =>
partedTable.getTable.contiguousSplit(partedTable.getPartitions.tail: _*)
val subTables = RmmRapidsRetryIterator.withRetryNoSplit(spillBatch) { _ =>
withResource(spillBatch.getColumnarBatch()) { cb =>
withResource(GpuColumnVector.from(cb)) { tbl =>
tbl.contiguousSplit(partitions.tail: _*)
}
}
}
// 3) Make each smaller table spillable and cache them in the queue
withResource(subTables) { _ =>
Expand Down

0 comments on commit 6d3a33a

Please sign in to comment.