diff --git a/CHANGELOG.md b/CHANGELOG.md index d74a92519f2..2510eba5dfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ # Change log -Generated on 2024-10-18 +Generated on 2024-10-31 ## Release 24.10 @@ -26,6 +26,7 @@ Generated on 2024-10-18 ### Bugs Fixed ||| |:---|:---| +|[#11558](https://github.com/NVIDIA/spark-rapids/issues/11558)|[BUG] test_sortmerge_join_ridealong fails on DB 13.3| |[#11573](https://github.com/NVIDIA/spark-rapids/issues/11573)|[BUG] very long tail task is observed when many tasks are contending for PrioritySemaphore| |[#11367](https://github.com/NVIDIA/spark-rapids/issues/11367)|[BUG] Error "table_view.cpp:36: Column size mismatch" when using approx_percentile on a string column| |[#11543](https://github.com/NVIDIA/spark-rapids/issues/11543)|[BUG] test_yyyyMMdd_format_for_legacy_mode[DATAGEN_SEED=1727619674, TZ=UTC] failed GPU and CPU are not both null| @@ -68,6 +69,8 @@ Generated on 2024-10-18 ### PRs ||| |:---|:---| +|[#11676](https://github.com/NVIDIA/spark-rapids/pull/11676)| Fix race condition with Parquet filter pushdown modifying shared hadoop Configuration| +|[#11626](https://github.com/NVIDIA/spark-rapids/pull/11626)|Update latest changelog [skip ci]| |[#11624](https://github.com/NVIDIA/spark-rapids/pull/11624)|Update the download link [skip ci]| |[#11577](https://github.com/NVIDIA/spark-rapids/pull/11577)|Update latest changelog [skip ci]| |[#11576](https://github.com/NVIDIA/spark-rapids/pull/11576)|Update rapids JNI and private dependency to 24.10.0| diff --git a/docs/archive.md b/docs/archive.md index 641167a3488..2d9a78ca3d4 100644 --- a/docs/archive.md +++ b/docs/archive.md @@ -5,6 +5,95 @@ nav_order: 15 --- Below are archived releases for RAPIDS Accelerator for Apache Spark. +## Release v24.10.0 +### Hardware Requirements: + +The plugin is tested on the following architectures: + + GPU Models: NVIDIA V100, T4, A10/A100, L4 and H100 GPUs + +### Software Requirements: + + OS: Ubuntu 20.04, Ubuntu 22.04, CentOS 7, or Rocky Linux 8 + + NVIDIA Driver*: R470+ + + Runtime: + Scala 2.12, 2.13 + Python, Java Virtual Machine (JVM) compatible with your spark-version. + + * Check the Spark documentation for Python and Java version compatibility with your specific + Spark version. For instance, visit `https://spark.apache.org/docs/3.4.1` for Spark 3.4.1. + + Supported Spark versions: + Apache Spark 3.2.0, 3.2.1, 3.2.2, 3.2.3, 3.2.4 + Apache Spark 3.3.0, 3.3.1, 3.3.2, 3.3.3, 3.3.4 + Apache Spark 3.4.0, 3.4.1, 3.4.2, 3.4.3 + Apache Spark 3.5.0, 3.5.1, 3.5.2 + + Supported Databricks runtime versions for Azure and AWS: + Databricks 11.3 ML LTS (GPU, Scala 2.12, Spark 3.3.0) + Databricks 12.2 ML LTS (GPU, Scala 2.12, Spark 3.3.2) + Databricks 13.3 ML LTS (GPU, Scala 2.12, Spark 3.4.1) + + Supported Dataproc versions (Debian/Ubuntu/Rocky): + GCP Dataproc 2.1 + GCP Dataproc 2.2 + + Supported Dataproc Serverless versions: + Spark runtime 1.1 LTS + Spark runtime 2.0 + Spark runtime 2.1 + Spark runtime 2.2 + +*Some hardware may have a minimum driver version greater than R470. Check the GPU spec sheet +for your hardware's minimum driver version. + +*For Cloudera and EMR support, please refer to the +[Distributions](https://docs.nvidia.com/spark-rapids/user-guide/latest/faq.html#which-distributions-are-supported) section of the FAQ. + +### RAPIDS Accelerator's Support Policy for Apache Spark +The RAPIDS Accelerator maintains support for Apache Spark versions available for download from [Apache Spark](https://spark.apache.org/downloads.html) + +### Download RAPIDS Accelerator for Apache Spark v24.10.0 + +| Processor | Scala Version | Download Jar | Download Signature | +|-----------|---------------|--------------|--------------------| +| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0.jar.asc) | +| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0.jar.asc) | +| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0-cuda11-arm64.jar.asc) | +| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0-cuda11-arm64.jar.asc) | + +This package is built against CUDA 11.8. It is tested on V100, T4, A10, A100, L4 and H100 GPUs with +CUDA 11.8 through CUDA 12.0. + +### Verify signature +* Download the [PUB_KEY](https://keys.openpgp.org/search?q=sw-spark@nvidia.com). +* Import the public key: `gpg --import PUB_KEY` +* Verify the signature for Scala 2.12 jar: + `gpg --verify rapids-4-spark_2.12-24.10.0.jar.asc rapids-4-spark_2.12-24.10.0.jar` +* Verify the signature for Scala 2.13 jar: + `gpg --verify rapids-4-spark_2.13-24.10.0.jar.asc rapids-4-spark_2.13-24.10.0.jar` + +The output of signature verify: + + gpg: Good signature from "NVIDIA Spark (For the signature of spark-rapids release jars) " + +### Release Notes +* Optimize scheduling policy for GPU Semaphore +* Support distinct join for right outer joins +* Support MinBy and MaxBy for non-float ordering +* Support ArrayJoin expression +* Optimize Expand and Aggregate expression performance +* Improve JSON related expressions +* For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases) + +Note: There is a known issue in the 24.10.0 release when decompressing gzip files on H100 GPUs. +Please find more details in [issue-16661](https://github.com/rapidsai/cudf/issues/16661). + +For a detailed list of changes, please refer to the +[CHANGELOG](https://github.com/NVIDIA/spark-rapids/blob/main/CHANGELOG.md). + ## Release v24.08.1 ### Hardware Requirements: diff --git a/docs/download.md b/docs/download.md index cab4259f5bd..60c62071f8b 100644 --- a/docs/download.md +++ b/docs/download.md @@ -18,7 +18,7 @@ cuDF jar, that is either preinstalled in the Spark classpath on all nodes or sub that uses the RAPIDS Accelerator For Apache Spark. See the [getting-started guide](https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html) for more details. -## Release v24.10.0 +## Release v24.10.1 ### Hardware Requirements: The plugin is tested on the following architectures: @@ -69,14 +69,14 @@ for your hardware's minimum driver version. ### RAPIDS Accelerator's Support Policy for Apache Spark The RAPIDS Accelerator maintains support for Apache Spark versions available for download from [Apache Spark](https://spark.apache.org/downloads.html) -### Download RAPIDS Accelerator for Apache Spark v24.10.0 +### Download RAPIDS Accelerator for Apache Spark v24.10.1 | Processor | Scala Version | Download Jar | Download Signature | |-----------|---------------|--------------|--------------------| -| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0.jar.asc) | -| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0.jar.asc) | -| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0-cuda11-arm64.jar.asc) | -| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.10.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.0/rapids-4-spark_2.13-24.10.0-cuda11-arm64.jar.asc) | +| x86_64 | Scala 2.12 | [RAPIDS Accelerator v24.10.1](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.1/rapids-4-spark_2.12-24.10.1.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.1/rapids-4-spark_2.12-24.10.1.jar.asc) | +| x86_64 | Scala 2.13 | [RAPIDS Accelerator v24.10.1](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.1/rapids-4-spark_2.13-24.10.1.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.1/rapids-4-spark_2.13-24.10.1.jar.asc) | +| arm64 | Scala 2.12 | [RAPIDS Accelerator v24.10.1](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.1/rapids-4-spark_2.12-24.10.1-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.1/rapids-4-spark_2.12-24.10.1-cuda11-arm64.jar.asc) | +| arm64 | Scala 2.13 | [RAPIDS Accelerator v24.10.1](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.1/rapids-4-spark_2.13-24.10.1-cuda11-arm64.jar) | [Signature](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.13/24.10.1/rapids-4-spark_2.13-24.10.1-cuda11-arm64.jar.asc) | This package is built against CUDA 11.8. It is tested on V100, T4, A10, A100, L4 and H100 GPUs with CUDA 11.8 through CUDA 12.0. @@ -85,9 +85,9 @@ CUDA 11.8 through CUDA 12.0. * Download the [PUB_KEY](https://keys.openpgp.org/search?q=sw-spark@nvidia.com). * Import the public key: `gpg --import PUB_KEY` * Verify the signature for Scala 2.12 jar: - `gpg --verify rapids-4-spark_2.12-24.10.0.jar.asc rapids-4-spark_2.12-24.10.0.jar` + `gpg --verify rapids-4-spark_2.12-24.10.1.jar.asc rapids-4-spark_2.12-24.10.1.jar` * Verify the signature for Scala 2.13 jar: - `gpg --verify rapids-4-spark_2.13-24.10.0.jar.asc rapids-4-spark_2.13-24.10.0.jar` + `gpg --verify rapids-4-spark_2.13-24.10.1.jar.asc rapids-4-spark_2.13-24.10.1.jar` The output of signature verify: @@ -102,7 +102,7 @@ The output of signature verify: * Improve JSON related expressions * For updates on RAPIDS Accelerator Tools, please visit [this link](https://github.com/NVIDIA/spark-rapids-tools/releases) -Note: There is a known issue in the 24.10.0 release when decompressing gzip files on H100 GPUs. +Note: There is a known issue in the 24.10.1 release when decompressing gzip files on H100 GPUs. Please find more details in [issue-16661](https://github.com/rapidsai/cudf/issues/16661). For a detailed list of changes, please refer to the diff --git a/pom.xml b/pom.xml index a2678ddd014..0f89e61cbc6 100644 --- a/pom.xml +++ b/pom.xml @@ -1615,14 +1615,11 @@ This will force full Scala code rebuild in downstream modules. ${maven.scalastyle.skip} - - - - - - - - + + + + + Checking scalastyle for all modules using following paths: ${scalastyle.dirs} diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index 1a18fe61a74..dd3c9b5b091 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -1615,14 +1615,11 @@ This will force full Scala code rebuild in downstream modules. ${maven.scalastyle.skip} - - - - - - - - + + + + + Checking scalastyle for all modules using following paths: ${scalastyle.dirs} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 32743cc12ef..30b24fab11d 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -237,6 +237,7 @@ public void close() { public static final class GpuColumnarBatchBuilder extends GpuColumnarBatchBuilderBase { private final RapidsHostColumnBuilder[] builders; private ai.rapids.cudf.HostColumnVector[] hostColumns; + private ai.rapids.cudf.HostColumnVector[] wipHostColumns; /** * A collection of builders for building up columnar data. @@ -280,29 +281,30 @@ public RapidsHostColumnBuilder builder(int i) { @Override protected ai.rapids.cudf.ColumnVector buildAndPutOnDevice(int builderIndex) { ai.rapids.cudf.ColumnVector cv = builders[builderIndex].buildAndPutOnDevice(); + builders[builderIndex].close(); builders[builderIndex] = null; return cv; } public HostColumnVector[] buildHostColumns() { - HostColumnVector[] vectors = new HostColumnVector[builders.length]; - try { - for (int i = 0; i < builders.length; i++) { - vectors[i] = builders[i].build(); + // buildHostColumns is called from tryBuild, and tryBuild has to be safe to call + // multiple times, so if a retry exception happens in this code, we need to pick + // up where we left off last time. + if (wipHostColumns == null) { + wipHostColumns = new HostColumnVector[builders.length]; + } + for (int i = 0; i < builders.length; i++) { + if (builders[i] != null && wipHostColumns[i] == null) { + wipHostColumns[i] = builders[i].build(); + builders[i].close(); builders[i] = null; - } - HostColumnVector[] result = vectors; - vectors = null; - return result; - } finally { - if (vectors != null) { - for (HostColumnVector v : vectors) { - if (v != null) { - v.close(); - } - } + } else if (builders[i] == null && wipHostColumns[i] == null) { + throw new IllegalStateException("buildHostColumns cannot be called more than once"); } } + HostColumnVector[] result = wipHostColumns; + wipHostColumns = null; + return result; } /** @@ -327,13 +329,24 @@ public void close() { } } } finally { - if (hostColumns != null) { - for (ai.rapids.cudf.HostColumnVector hcv: hostColumns) { - if (hcv != null) { - hcv.close(); + try { + if (hostColumns != null) { + for (ai.rapids.cudf.HostColumnVector hcv : hostColumns) { + if (hcv != null) { + hcv.close(); + } + } + hostColumns = null; + } + } finally { + if (wipHostColumns != null) { + for (ai.rapids.cudf.HostColumnVector hcv : wipHostColumns) { + if (hcv != null) { + hcv.close(); + } } + wipHostColumns = null; } - hostColumns = null; } } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnBuilder.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnBuilder.java index d9d8411643b..e6ee5eb9de5 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnBuilder.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnBuilder.java @@ -49,7 +49,6 @@ public final class RapidsHostColumnBuilder implements AutoCloseable { private long estimatedRows; private long rowCapacity = 0L; private long validCapacity = 0L; - private boolean built = false; private List childBuilders = new ArrayList<>(); private Runnable nullHandler; @@ -117,30 +116,76 @@ private void setupNullHandler() { public HostColumnVector build() { List hostColumnVectorCoreList = new ArrayList<>(); - for (RapidsHostColumnBuilder childBuilder : childBuilders) { - hostColumnVectorCoreList.add(childBuilder.buildNestedInternal()); - } - // Aligns the valid buffer size with other buffers in terms of row size, because it grows lazily. - if (valid != null) { - growValidBuffer(); + HostColumnVector hostColumnVector = null; + try { + for (RapidsHostColumnBuilder childBuilder : childBuilders) { + hostColumnVectorCoreList.add(childBuilder.buildNestedInternal()); + } + // Aligns the valid buffer size with other buffers in terms of row size, because it grows lazily. + if (valid != null) { + growValidBuffer(); + } + // Increment the reference counts before creating the HostColumnVector, so we can + // keep track of them properly + if (data != null) { + data.incRefCount(); + } + if (valid != null) { + valid.incRefCount(); + } + if (offsets != null) { + offsets.incRefCount(); + } + hostColumnVector = new HostColumnVector(type, rows, + Optional.of(nullCount), data, valid, offsets, hostColumnVectorCoreList); + } finally { + if (hostColumnVector == null) { + // Something bad happened, and we need to clean up after ourselves + for (HostColumnVectorCore hcv : hostColumnVectorCoreList) { + if (hcv != null) { + hcv.close(); + } + } + } } - HostColumnVector hostColumnVector = new HostColumnVector(type, rows, - Optional.of(nullCount), data, valid, offsets, hostColumnVectorCoreList); - built = true; return hostColumnVector; } private HostColumnVectorCore buildNestedInternal() { List hostColumnVectorCoreList = new ArrayList<>(); - for (RapidsHostColumnBuilder childBuilder : childBuilders) { - hostColumnVectorCoreList.add(childBuilder.buildNestedInternal()); - } - // Aligns the valid buffer size with other buffers in terms of row size, because it grows lazily. - if (valid != null) { - growValidBuffer(); + HostColumnVectorCore ret = null; + try { + for (RapidsHostColumnBuilder childBuilder : childBuilders) { + hostColumnVectorCoreList.add(childBuilder.buildNestedInternal()); + } + // Aligns the valid buffer size with other buffers in terms of row size, because it grows lazily. + if (valid != null) { + growValidBuffer(); + } + // Increment the reference counts before creating the HostColumnVector, so we can + // keep track of them properly + if (data != null) { + data.incRefCount(); + } + if (valid != null) { + valid.incRefCount(); + } + if (offsets != null) { + offsets.incRefCount(); + } + ret = new HostColumnVectorCore(type, rows, Optional.of(nullCount), data, valid, + offsets, hostColumnVectorCoreList); + } finally { + if (ret == null) { + // Something bad happened, and we need to clean up after ourselves + for (HostColumnVectorCore hcv : hostColumnVectorCoreList) { + if (hcv != null) { + hcv.close(); + } + } + } } - return new HostColumnVectorCore(type, rows, Optional.of(nullCount), data, valid, - offsets, hostColumnVectorCoreList); + return ret; } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -650,23 +695,20 @@ public final ColumnVector buildAndPutOnDevice() { @Override public void close() { - if (!built) { - if (data != null) { - data.close(); - data = null; - } - if (valid != null) { - valid.close(); - valid = null; - } - if (offsets != null) { - offsets.close(); - offsets = null; - } - for (RapidsHostColumnBuilder childBuilder : childBuilders) { - childBuilder.close(); - } - built = true; + if (data != null) { + data.close(); + data = null; + } + if (valid != null) { + valid.close(); + valid = null; + } + if (offsets != null) { + offsets.close(); + offsets = null; + } + for (RapidsHostColumnBuilder childBuilder : childBuilders) { + childBuilder.close(); } } @@ -685,7 +727,6 @@ public String toString() { ", nullCount=" + nullCount + ", estimatedRows=" + estimatedRows + ", populatedRows=" + rows + - ", built=" + built + '}'; } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala index 2cfce60c4a5..b0c86773166 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala @@ -216,7 +216,7 @@ object GpuDeviceManager extends Logging { } } - private def toMB(x: Long): Double = x / 1024 / 1024.0 + private def toMiB(x: Long): Double = x / 1024 / 1024.0 private def computeRmmPoolSize(conf: RapidsConf, info: CudaMemInfo): Long = { def truncateToAlignment(x: Long): Long = x & ~511L @@ -238,33 +238,39 @@ object GpuDeviceManager extends Logging { } var poolAllocation = truncateToAlignment( (conf.rmmAllocFraction * (info.free - reserveAmount)).toLong) + val errorPhrase = "The pool allocation of " + + s"${toMiB(poolAllocation)} MiB (gpu.free: ${toMiB(info.free)}," + + s"${RapidsConf.RMM_ALLOC_FRACTION}: (=${conf.rmmAllocFraction}," + + s"${RapidsConf.RMM_ALLOC_RESERVE}: ${reserveAmount} => " + + s"(gpu.free - reserve) * allocFraction = ${toMiB(poolAllocation)}) was " if (poolAllocation < minAllocation) { - throw new IllegalArgumentException(s"The pool allocation of " + - s"${toMB(poolAllocation)} MB (calculated from ${RapidsConf.RMM_ALLOC_FRACTION} " + - s"(=${conf.rmmAllocFraction}) and ${toMB(info.free)} MB free memory) was less than " + - s"the minimum allocation of ${toMB(minAllocation)} (calculated from " + - s"${RapidsConf.RMM_ALLOC_MIN_FRACTION} (=${conf.rmmAllocMinFraction}) " + - s"and ${toMB(info.total)} MB total memory)") + throw new IllegalArgumentException(errorPhrase + + s"less than allocation of ${toMiB(minAllocation)} MiB (gpu.total: " + + s"${toMiB(info.total)} MiB, ${RapidsConf.RMM_ALLOC_MIN_FRACTION}: " + + s"${conf.rmmAllocMinFraction} => gpu.total *" + + s"minAllocFraction = ${toMiB(minAllocation)} MiB). Please ensure that the GPU has " + + s"enough free memory, or adjust configuration accordingly.") } if (maxAllocation < poolAllocation) { - throw new IllegalArgumentException(s"The pool allocation of " + - s"${toMB(poolAllocation)} MB (calculated from ${RapidsConf.RMM_ALLOC_FRACTION} " + - s"(=${conf.rmmAllocFraction}) and ${toMB(info.free)} MB free memory) was more than " + - s"the maximum allocation of ${toMB(maxAllocation)} (calculated from " + - s"${RapidsConf.RMM_ALLOC_MAX_FRACTION} (=${conf.rmmAllocMaxFraction}) " + - s"and ${toMB(info.total)} MB total memory)") + throw new IllegalArgumentException(errorPhrase + + s"more than allocation of ${toMiB(maxAllocation)} MiB (gpu.total: " + + s"${toMiB(info.total)} MiB, ${RapidsConf.RMM_ALLOC_MAX_FRACTION}: " + + s"${conf.rmmAllocMaxFraction} => gpu.total *" + + s"maxAllocFraction = ${toMiB(maxAllocation)} MiB). Please ensure that pool " + + s"allocation does not exceed maximum allocation and adjust configuration accordingly.") } if (reserveAmount >= maxAllocation) { - throw new IllegalArgumentException(s"RMM reserve memory (${toMB(reserveAmount)} MB) " + - s"larger than maximum pool size (${toMB(maxAllocation)} MB). Check the settings for " + + throw new IllegalArgumentException(s"RMM reserve memory (${toMiB(reserveAmount)} MB) " + + s"larger than maximum pool size (${toMiB(maxAllocation)} MB). Check the settings for " + s"${RapidsConf.RMM_ALLOC_MAX_FRACTION} (=${conf.rmmAllocFraction}) and " + s"${RapidsConf.RMM_ALLOC_RESERVE} (=$reserveAmount)") } val adjustedMaxAllocation = truncateToAlignment(maxAllocation - reserveAmount) if (poolAllocation > adjustedMaxAllocation) { - logWarning(s"RMM pool allocation (${toMB(poolAllocation)} MB) does not leave enough free " + - s"memory for reserve memory (${toMB(reserveAmount)} MB), lowering the pool size to " + - s"${toMB(adjustedMaxAllocation)} MB to accommodate the requested reserve amount.") + logWarning(s"RMM pool allocation (${toMiB(poolAllocation)} MB) does not leave enough " + + s"free memory for reserve memory (${toMiB(reserveAmount)} MB), lowering the pool " + + s"size to ${toMiB(adjustedMaxAllocation)} MB to " + + s"accommodate the requested reserve amount.") poolAllocation = adjustedMaxAllocation } @@ -348,7 +354,7 @@ object GpuDeviceManager extends Logging { deviceId = Some(gpuId) logInfo(s"Initializing RMM${features.mkString(" ", " ", "")} " + - s"pool size = ${toMB(poolAllocation)} MB on gpuId $gpuId") + s"pool size = ${toMiB(poolAllocation)} MB on gpuId $gpuId") if (Cuda.isPtdsEnabled()) { logInfo("Using per-thread default stream") diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuExecutedCommandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExecutedCommandExec.scala similarity index 100% rename from sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuExecutedCommandExec.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExecutedCommandExec.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index a88bd9f2cfb..d57f6430a0f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,10 @@ package com.nvidia.spark.rapids import java.util -import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} -import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader} +import scala.reflect.ClassTag + +import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -66,78 +68,183 @@ case class GpuShuffleCoalesceExec(child: SparkPlan, targetBatchByteSize: Long) val metricsMap = allMetrics val targetSize = targetBatchByteSize val dataTypes = GpuColumnVector.extractTypes(schema) + val readOption = CoalesceReadOption(new RapidsConf(conf)) child.executeColumnar().mapPartitions { iter => - new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(iter, targetSize, metricsMap), - dataTypes, metricsMap) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator(iter, targetSize, dataTypes, + readOption, metricsMap) + } + } +} + +/** A case class to pack some options. Now it has only one, but may have more in the future */ +case class CoalesceReadOption private(kudoEnabled: Boolean) + +object CoalesceReadOption { + def apply(conf: RapidsConf): CoalesceReadOption = { + // TODO get the value from conf + CoalesceReadOption(false) + } +} + +object GpuShuffleCoalesceUtils { + /** + * Return an iterator that will pull in batches from the input iterator, + * concatenate them up to the "targetSize" and move the concatenated result + * to the GPU for each output batch. + * The input iterator is expected to contain only serialized host batches just + * returned from the Shuffle deserializer. Otherwise, it will blow up. + * + * @param iter the input iterator containing only serialized host batches + * @param targetSize the target batch size for coalescing + * @param dataTypes the schema of the input batches + * @param readOption the coalesce read option + * @param metricsMap metrics map + * @param prefetchFirstBatch whether prefetching the first bundle of serialized + * batches with the total size up to the "targetSize". The + * prefetched batches will be cached on host until the "next()" + * is called. This is for some optimization cases in join. + */ + def getGpuShuffleCoalesceIterator( + iter: Iterator[ColumnarBatch], + targetSize: Long, + dataTypes: Array[DataType], + readOption: CoalesceReadOption, + metricsMap: Map[String, GpuMetric], + prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = { + val hostIter = if (readOption.kudoEnabled) { + // TODO replace with the actual Kudo host iterator + throw new UnsupportedOperationException("Kudo is not supported yet") + } else { + new HostShuffleCoalesceIterator(iter, targetSize, metricsMap) + } + val maybeBufferedIter = if (prefetchFirstBatch) { + val bufferedIter = new CloseableBufferedIterator(hostIter) + withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => + // Force a coalesce of the first batch before we grab the GPU semaphore + bufferedIter.headOption + } + bufferedIter + } else { + hostIter + } + new GpuShuffleCoalesceIterator(maybeBufferedIter, dataTypes, metricsMap) + } + + /** Get the buffer size of a serialized batch just returned by the Shuffle deserializer */ + def getSerializedBufferSize(cb: ColumnarBatch): Long = { + assert(cb.numCols() == 1) + val hmb = cb.column(0) match { + // TODO add the Kudo case + case serCol: SerializedTableColumn => serCol.hostBuffer + case o => throw new IllegalStateException(s"unsupported type: ${o.getClass}") + } + if (hmb != null) hmb.getLength else 0L + } +} + +/** + * A trait representing the shuffle coalesced result by the Shuffle coalesce iterator. + */ +sealed trait CoalescedHostResult extends AutoCloseable { + /** Convert itself to a GPU batch */ + def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch + + /** Get the data size */ + def getDataSize: Long +} + +/** + * A trait defining some operations on the table T. + * This is used by HostCoalesceIteratorBase to separate the table operations from + * the shuffle read process. + */ +sealed trait SerializedTableOperator[T <: AutoCloseable] { + def getDataLen(table: T): Long + def getNumRows(table: T): Int + def concatOnHost(tables: Array[T]): CoalescedHostResult +} + +class JCudfCoalescedHostResult(hostConcatResult: HostConcatResult) extends CoalescedHostResult { + assert(hostConcatResult != null, "hostConcatResult should not be null") + + override def toGpuBatch(dataTypes: Array[DataType]): ColumnarBatch = + cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + + override def close(): Unit = hostConcatResult.close() + + override def getDataSize: Long = hostConcatResult.getTableHeader.getDataLen +} + +class JCudfTableOperator extends SerializedTableOperator[SerializedTableColumn] { + override def getDataLen(table: SerializedTableColumn): Long = table.header.getDataLen + override def getNumRows(table: SerializedTableColumn): Int = table.header.getNumRows + + override def concatOnHost(tables: Array[SerializedTableColumn]): CoalescedHostResult = { + assert(tables.nonEmpty, "no tables to be concatenated") + val numCols = tables.head.header.getNumColumns + val ret = if (numCols == 0) { + val totalRowsNum = tables.map(getNumRows).sum + cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(totalRowsNum) + } else { + val (headers, buffers) = tables.map(t => (t.header, t.hostBuffer)).unzip + JCudfSerialization.concatToHostBuffer(headers, buffers) } + new JCudfCoalescedHostResult(ret) } } /** * Iterator that coalesces columnar batches that are expected to only contain - * [[SerializedTableColumn]]. The serialized tables within are collected up + * serialized tables from shuffle. The serialized tables within are collected up * to the target batch size and then concatenated on the host before handing * them to the caller on `.next()` */ -class HostShuffleCoalesceIterator( +abstract class HostCoalesceIteratorBase[T <: AutoCloseable: ClassTag]( iter: Iterator[ColumnarBatch], targetBatchByteSize: Long, metricsMap: Map[String, GpuMetric]) - extends Iterator[HostConcatResult] with AutoCloseable { + extends Iterator[CoalescedHostResult] with AutoCloseable { + private[this] val concatTimeMetric = metricsMap(GpuMetric.CONCAT_TIME) private[this] val inputBatchesMetric = metricsMap(GpuMetric.NUM_INPUT_BATCHES) private[this] val inputRowsMetric = metricsMap(GpuMetric.NUM_INPUT_ROWS) - private[this] val serializedTables = new util.ArrayDeque[SerializedTableColumn] + private[this] val serializedTables = new util.ArrayDeque[T] private[this] var numTablesInBatch: Int = 0 private[this] var numRowsInBatch: Int = 0 private[this] var batchByteSize: Long = 0L // Don't install the callback if in a unit test Option(TaskContext.get()).foreach { tc => - onTaskCompletion(tc) { - close() - } + onTaskCompletion(tc)(close()) } + protected def tableOperator: SerializedTableOperator[T] + override def close(): Unit = { serializedTables.forEach(_.close()) serializedTables.clear() } - def concatenateTablesInHost(): HostConcatResult = { + private def concatenateTablesInHost(): CoalescedHostResult = { val result = withResource(new MetricRange(concatTimeMetric)) { _ => - val firstHeader = serializedTables.peekFirst().header - if (firstHeader.getNumColumns == 0) { - (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) - cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch) - } else { - val headers = new Array[SerializedTableHeader](numTablesInBatch) - withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers => - headers.indices.foreach { i => - val serializedTable = serializedTables.removeFirst() - headers(i) = serializedTable.header - buffers(i) = serializedTable.hostBuffer - } - JCudfSerialization.concatToHostBuffer(headers, buffers) - } + withResource(new Array[T](numTablesInBatch)) { tables => + tables.indices.foreach(i => tables(i) = serializedTables.removeFirst()) + tableOperator.concatOnHost(tables) } } // update the stats for the next batch in progress numTablesInBatch = serializedTables.size - batchByteSize = 0 numRowsInBatch = 0 if (numTablesInBatch > 0) { require(numTablesInBatch == 1, "should only track at most one buffer that is not in a batch") - val header = serializedTables.peekFirst().header - batchByteSize = header.getDataLen - numRowsInBatch = header.getNumRows + val firstTable = serializedTables.peekFirst() + batchByteSize = tableOperator.getDataLen(firstTable) + numRowsInBatch = tableOperator.getNumRows(firstTable) } - result } @@ -150,14 +257,14 @@ class HostShuffleCoalesceIterator( // don't bother tracking empty tables if (batch.numRows > 0) { inputRowsMetric += batch.numRows() - val tableColumn = batch.column(0).asInstanceOf[SerializedTableColumn] - batchCanGrow = canAddToBatch(tableColumn.header) + val tableColumn = batch.column(0).asInstanceOf[T] + batchCanGrow = canAddToBatch(tableColumn) serializedTables.addLast(tableColumn) // always add the first table to the batch even if its beyond the target limits if (batchCanGrow || numTablesInBatch == 0) { numTablesInBatch += 1 - numRowsInBatch += tableColumn.header.getNumRows - batchByteSize += tableColumn.header.getDataLen + numRowsInBatch += tableOperator.getNumRows(tableColumn) + batchByteSize += tableOperator.getDataLen(tableColumn) } } else { batch.close() @@ -172,34 +279,39 @@ class HostShuffleCoalesceIterator( numTablesInBatch > 0 } - override def next(): HostConcatResult = { + override def next(): CoalescedHostResult = { if (!hasNext()) { throw new NoSuchElementException("No more host batches to concatenate") } concatenateTablesInHost() } - private def canAddToBatch(nextTable: SerializedTableHeader): Boolean = { - if (batchByteSize + nextTable.getDataLen > targetBatchByteSize) { + private def canAddToBatch(nextTable: T): Boolean = { + if (batchByteSize + tableOperator.getDataLen(nextTable) > targetBatchByteSize) { return false } - if (numRowsInBatch.toLong + nextTable.getNumRows > Integer.MAX_VALUE) { + if (numRowsInBatch.toLong + tableOperator.getNumRows(nextTable) > Integer.MAX_VALUE) { return false } true } } +class HostShuffleCoalesceIterator( + iter: Iterator[ColumnarBatch], + targetBatchSize: Long, + metricsMap: Map[String, GpuMetric]) + extends HostCoalesceIteratorBase[SerializedTableColumn](iter, targetBatchSize, metricsMap) { + override protected def tableOperator = new JCudfTableOperator +} + /** - * Iterator that coalesces columnar batches that are expected to only contain - * [[SerializedTableColumn]]. The serialized tables within are collected up - * to the target batch size and then concatenated on the host before the data - * is transferred to the GPU. + * Iterator that expects only "CoalescedHostResult"s as the input, and transfers + * them to GPU. */ -class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], - dataTypes: Array[DataType], - metricsMap: Map[String, GpuMetric]) - extends Iterator[ColumnarBatch] { +class GpuShuffleCoalesceIterator(iter: Iterator[CoalescedHostResult], + dataTypes: Array[DataType], + metricsMap: Map[String, GpuMetric]) extends Iterator[ColumnarBatch] { private[this] val opTimeMetric = metricsMap(GpuMetric.OP_TIME) private[this] val outputBatchesMetric = metricsMap(GpuMetric.NUM_OUTPUT_BATCHES) private[this] val outputRowsMetric = metricsMap(GpuMetric.NUM_OUTPUT_ROWS) @@ -211,22 +323,21 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], throw new NoSuchElementException("No more columnar batches") } withResource(new NvtxRange("Concat+Load Batch", NvtxColor.YELLOW)) { _ => - val hostConcatResult = withResource(new MetricRange(opTimeMetric)) { _ => + val hostCoalescedResult = withResource(new MetricRange(opTimeMetric)) { _ => // op time covers concat time performed in `iter.next()`. // Note the concat runs on CPU. // GPU time = opTime - concatTime iter.next() } - withResource(hostConcatResult) { _ => + withResource(hostCoalescedResult) { _ => // We acquire the GPU regardless of whether `hostConcatResult` // is an empty batch or not, because the downstream tasks expect // the `GpuShuffleCoalesceIterator` to acquire the semaphore and may // generate GPU data from batches that are empty. GpuSemaphore.acquireIfNecessary(TaskContext.get()) - withResource(new MetricRange(opTimeMetric)) { _ => - val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + val batch = hostCoalescedResult.toGpuBatch(dataTypes) outputBatchesMetric += 1 outputRowsMetric += batch.numRows() batch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index b4841046acc..b9525c73966 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import scala.collection.mutable import ai.rapids.cudf.{NvtxColor, NvtxRange} -import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} @@ -72,6 +71,7 @@ class GpuShuffledHashJoinMeta( val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val useSizedJoin = GpuShuffledSizedHashJoinExec.useSizedJoin(conf, join.joinType, join.leftKeys, join.rightKeys) + val readOpt = CoalesceReadOption(conf) val joinExec = join.joinType match { case LeftOuter | RightOuter if useSizedJoin => GpuShuffledAsymmetricHashJoinExec( @@ -83,6 +83,7 @@ class GpuShuffledHashJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, isSkewJoin = false)( join.leftKeys, join.rightKeys, @@ -97,6 +98,7 @@ class GpuShuffledHashJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, isSkewJoin = false)( join.leftKeys, join.rightKeys) @@ -285,16 +287,13 @@ object GpuShuffledHashJoinExec extends Logging { val buildTypes = buildOutput.map(_.dataType).toArray closeOnExcept(new CloseableBufferedIterator(buildIter)) { bufBuildIter => val startTime = System.nanoTime() + var isBuildSerialized = false // Batches type detection - val isBuildSerialized = bufBuildIter.hasNext && isBatchSerialized(bufBuildIter.head) - - // Let batches coalesce for size overflow check - val coalesceBuiltIter = if (isBuildSerialized) { - new HostShuffleCoalesceIterator(bufBuildIter, targetSize, coalesceMetrics) - } else { // Batches on GPU have already coalesced to the target size by the given goal. - bufBuildIter - } - + val coalesceBuiltIter = getHostShuffleCoalesceIterator( + bufBuildIter, targetSize, coalesceMetrics).map { iter => + isBuildSerialized = true + iter + }.getOrElse(bufBuildIter) if (coalesceBuiltIter.hasNext) { val firstBuildBatch = coalesceBuiltIter.next() // Batches have coalesced to the target size, so size will overflow if there are @@ -309,7 +308,7 @@ object GpuShuffledHashJoinExec extends Logging { // It can be optimized for grabbing the GPU semaphore when there is only a single // serialized host batch and the sub-partitioning is not activated. val (singleBuildCb, bufferedStreamIter) = getBuildBatchOptimizedAndClose( - firstBuildBatch.asInstanceOf[HostConcatResult], streamIter, buildTypes, + firstBuildBatch.asInstanceOf[CoalescedHostResult], streamIter, buildTypes, buildGoal, buildTime) logDebug("In the optimized case for grabbing the GPU semaphore, return " + s"a single batch (size: ${getBatchSize(singleBuildCb)}) for the build side " + @@ -321,7 +320,7 @@ object GpuShuffledHashJoinExec extends Logging { coalesceBuiltIter val gpuBuildIter = if (isBuildSerialized) { // batches on host, move them to GPU - new GpuShuffleCoalesceIterator(safeIter.asInstanceOf[Iterator[HostConcatResult]], + new GpuShuffleCoalesceIterator(safeIter.asInstanceOf[Iterator[CoalescedHostResult]], buildTypes, coalesceMetrics) } else { // batches already on GPU safeIter.asInstanceOf[Iterator[ColumnarBatch]] @@ -411,16 +410,16 @@ object GpuShuffledHashJoinExec extends Logging { } } - /** Only accepts a HostConcatResult or a ColumnarBatch as input */ + /** Only accepts a CoalescedHostResult or a ColumnarBatch as input */ private def getBatchSize(maybeBatch: AnyRef): Long = maybeBatch match { case batch: ColumnarBatch => GpuColumnVector.getTotalDeviceMemoryUsed(batch) - case hostBatch: HostConcatResult => hostBatch.getTableHeader().getDataLen() - case _ => throw new IllegalStateException(s"Expect a HostConcatResult or a " + + case hostBatch: CoalescedHostResult => hostBatch.getDataSize + case _ => throw new IllegalStateException(s"Expect a CoalescedHostResult or a " + s"ColumnarBatch, but got a ${maybeBatch.getClass.getSimpleName}") } private def getBuildBatchOptimizedAndClose( - hostConcatResult: HostConcatResult, + hostConcatResult: CoalescedHostResult, streamIter: Iterator[ColumnarBatch], buildDataTypes: Array[DataType], buildGoal: CoalesceSizeGoal, @@ -441,8 +440,7 @@ object GpuShuffledHashJoinExec extends Logging { } // Bring the build batch to the GPU now val buildBatch = buildTime.ns { - val cb = - cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, buildDataTypes) + val cb = hostConcatResult.toGpuBatch(buildDataTypes) getFilterFunc(buildGoal).map(filterAndClose => filterAndClose(cb)).getOrElse(cb) } (buildBatch, bufStreamIter) @@ -463,8 +461,20 @@ object GpuShuffledHashJoinExec extends Logging { ConcatAndConsumeAll.getSingleBatchWithVerification(singleBatchIter, inputAttrs) } - def isBatchSerialized(batch: ColumnarBatch): Boolean = { - batch.numCols() == 1 && batch.column(0).isInstanceOf[SerializedTableColumn] + private def getHostShuffleCoalesceIterator( + iter: BufferedIterator[ColumnarBatch], + targetSize: Long, + coalesceMetrics: Map[String, GpuMetric]): Option[Iterator[CoalescedHostResult]] = { + var retIter: Option[Iterator[CoalescedHostResult]] = None + if (iter.hasNext && iter.head.numCols() == 1) { + iter.head.column(0) match { + // TODO add the Kudo case + case _: SerializedTableColumn => + retIter = Some(new HostShuffleCoalesceIterator(iter, targetSize, coalesceMetrics)) + case _ => // should be gpu batches + } + } + retIter } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala index 4d06bdf0553..252c31da125 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala @@ -223,6 +223,8 @@ object GpuShuffledSizedHashJoinExec { * grabbing the GPU semaphore. */ trait HostHostJoinSizer extends JoinSizer[SpillableHostConcatResult] { + def readOption: CoalesceReadOption + override def setupForProbe( iter: Iterator[ColumnarBatch]): Iterator[SpillableHostConcatResult] = { new SpillableHostConcatResultFromColumnarBatchIterator(iter) @@ -235,24 +237,21 @@ object GpuShuffledSizedHashJoinExec { gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = { val concatMetrics = getConcatMetrics(metrics) - val bufferedCoalesceIter = new CloseableBufferedIterator( - new HostShuffleCoalesceIterator( - new HostQueueBatchIterator(queue, remainingIter), - gpuBatchSizeBytes, - concatMetrics)) - withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => - // Force a coalesce of the first batch before we grab the GPU semaphore - bufferedCoalesceIter.headOption - } - new GpuShuffleCoalesceIterator(bufferedCoalesceIter, batchTypes, concatMetrics) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + new HostQueueBatchIterator(queue, remainingIter), + gpuBatchSizeBytes, + batchTypes, + readOption, + concatMetrics, + prefetchFirstBatch = true) } override def getProbeBatchRowCount(batch: SpillableHostConcatResult): Long = { - batch.header.getNumRows + batch.getNumRows } override def getProbeBatchDataSize(batch: SpillableHostConcatResult): Long = { - batch.header.getDataLen + batch.getDataLen } } @@ -265,6 +264,8 @@ object GpuShuffledSizedHashJoinExec { * See https://github.com/NVIDIA/spark-rapids/issues/11322. */ trait HostHostUnspillableJoinSizer extends JoinSizer[ColumnarBatch] { + def readOption: CoalesceReadOption + override def setupForProbe( iter: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = iter @@ -275,31 +276,25 @@ object GpuShuffledSizedHashJoinExec { gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = { val concatMetrics = getConcatMetrics(metrics) - val bufferedCoalesceIter = new CloseableBufferedIterator( - new HostShuffleCoalesceIterator( - queue.iterator ++ remainingIter, - gpuBatchSizeBytes, - concatMetrics)) - withResource(new NvtxRange("fetch first batch", NvtxColor.YELLOW)) { _ => - // Force a coalesce of the first batch before we grab the GPU semaphore - bufferedCoalesceIter.headOption - } - new GpuShuffleCoalesceIterator(bufferedCoalesceIter, batchTypes, concatMetrics) + GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + queue.iterator ++ remainingIter, + gpuBatchSizeBytes, + batchTypes, + readOption, + concatMetrics, + prefetchFirstBatch = true) } override def getProbeBatchRowCount(batch: ColumnarBatch): Long = batch.numRows() override def getProbeBatchDataSize(batch: ColumnarBatch): Long = { - SerializedTableColumn.getMemoryUsed(batch) + GpuShuffleCoalesceUtils.getSerializedBufferSize(batch) } } /** * Join sizer to use when at least one side of the join is coming from another GPU exec node * such that the GPU semaphore is already held. Caches input batches on the GPU. - * - * @param startWithLeftSide whether to prefer fetching from the left or right side first - * when probing for table sizes. */ trait SpillableColumnarBatchJoinSizer extends JoinSizer[SpillableColumnarBatch] { override def setupForProbe(iter: Iterator[ColumnarBatch]): Iterator[SpillableColumnarBatch] = { @@ -377,8 +372,10 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex def isSkewJoin: Boolean def cpuLeftKeys: Seq[Expression] def cpuRightKeys: Seq[Expression] + def readOption: CoalesceReadOption - protected def createHostHostSizer(): JoinSizer[HOST_BATCH_TYPE] + protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[HOST_BATCH_TYPE] protected def createSpillableColumnarBatchSizer( startWithLeftSide: Boolean): JoinSizer[SpillableColumnarBatch] @@ -425,20 +422,21 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex val localCondition = condition val localGpuBatchSizeBytes = gpuBatchSizeBytes val localMetrics = allMetrics.withDefaultValue(NoopMetric) + val localReadOption = readOption left.executeColumnar().zipPartitions(right.executeColumnar()) { case (leftIter, rightIter) => val joinInfo = (isLeftHost, isRightHost) match { case (true, true) => getHostHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (true, false) => getHostGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (false, true) => getGpuHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, - localRightKeys, rightOutput, rightIter, - localCondition, localGpuBatchSizeBytes, localMetrics) + localRightKeys, rightOutput, rightIter, localCondition, + localGpuBatchSizeBytes, localReadOption, localMetrics) case (false, false) => getGpuGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, @@ -539,8 +537,9 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { - val sizer = createHostHostSizer() + val sizer = createHostHostSizer(readOption) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -559,12 +558,15 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = createSpillableColumnarBatchSizer(startWithLeftSide = true) val concatMetrics = getConcatMetrics(metrics) - val leftIter = new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(rawLeftIter, gpuBatchSizeBytes, concatMetrics), + val leftIter = GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + rawLeftIter, + gpuBatchSizeBytes, leftOutput.map(_.dataType).toArray, + readOption, concatMetrics) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) @@ -584,12 +586,15 @@ abstract class GpuShuffledSizedHashJoinExec[HOST_BATCH_TYPE <: AutoCloseable] ex rawRightIter: Iterator[ColumnarBatch], condition: Option[Expression], gpuBatchSizeBytes: Long, + readOption: CoalesceReadOption, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = createSpillableColumnarBatchSizer(startWithLeftSide = false) val concatMetrics = getConcatMetrics(metrics) - val rightIter = new GpuShuffleCoalesceIterator( - new HostShuffleCoalesceIterator(rawRightIter, gpuBatchSizeBytes, concatMetrics), + val rightIter = GpuShuffleCoalesceUtils.getGpuShuffleCoalesceIterator( + rawRightIter, + gpuBatchSizeBytes, rightOutput.map(_.dataType).toArray, + readOption, concatMetrics) sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) @@ -728,8 +733,9 @@ object GpuShuffledSymmetricHashJoinExec { } } - class HostHostSymmetricJoinSizer extends SymmetricJoinSizer[SpillableHostConcatResult] - with HostHostJoinSizer { + class HostHostSymmetricJoinSizer(override val readOption: CoalesceReadOption) + extends SymmetricJoinSizer[SpillableHostConcatResult] with HostHostJoinSizer { + override val startWithLeftSide: Boolean = true } @@ -762,6 +768,7 @@ case class GpuShuffledSymmetricHashJoinExec( override val right: SparkPlan, override val isGpuShuffle: Boolean, override val gpuBatchSizeBytes: Long, + override val readOption: CoalesceReadOption, override val isSkewJoin: Boolean)( override val cpuLeftKeys: Seq[Expression], override val cpuRightKeys: Seq[Expression]) @@ -771,8 +778,9 @@ case class GpuShuffledSymmetricHashJoinExec( override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys) - override protected def createHostHostSizer(): JoinSizer[SpillableHostConcatResult] = { - new HostHostSymmetricJoinSizer() + override protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[SpillableHostConcatResult] = { + new HostHostSymmetricJoinSizer(readOption) } override protected def createSpillableColumnarBatchSizer( @@ -1022,7 +1030,9 @@ object GpuShuffledAsymmetricHashJoinExec { } } - class HostHostAsymmetricJoinSizer(override val magnificationThreshold: Int) + class HostHostAsymmetricJoinSizer( + override val magnificationThreshold: Int, + override val readOption: CoalesceReadOption) extends AsymmetricJoinSizer[ColumnarBatch] with HostHostUnspillableJoinSizer { } @@ -1055,6 +1065,7 @@ case class GpuShuffledAsymmetricHashJoinExec( override val right: SparkPlan, override val isGpuShuffle: Boolean, override val gpuBatchSizeBytes: Long, + override val readOption: CoalesceReadOption, override val isSkewJoin: Boolean)( override val cpuLeftKeys: Seq[Expression], override val cpuRightKeys: Seq[Expression], @@ -1064,8 +1075,9 @@ case class GpuShuffledAsymmetricHashJoinExec( override def otherCopyArgs: Seq[AnyRef] = Seq(cpuLeftKeys, cpuRightKeys, magnificationThreshold) - override protected def createHostHostSizer(): JoinSizer[ColumnarBatch] = { - new HostHostAsymmetricJoinSizer(magnificationThreshold) + override protected def createHostHostSizer( + readOption: CoalesceReadOption): JoinSizer[ColumnarBatch] = { + new HostHostAsymmetricJoinSizer(magnificationThreshold, readOption) } override protected def createSpillableColumnarBatchSizer( @@ -1077,19 +1089,14 @@ case class GpuShuffledAsymmetricHashJoinExec( /** * A spillable form of a HostConcatResult. Takes ownership of the specified host buffer. */ -class SpillableHostConcatResult( - val header: SerializedTableHeader, - hmb: HostMemoryBuffer) extends AutoCloseable { - private var buffer = { - SpillableHostBuffer(hmb, hmb.getLength, SpillPriorities.ACTIVE_BATCHING_PRIORITY) - } +sealed trait SpillableHostConcatResult extends AutoCloseable { + def hmb: HostMemoryBuffer + def toBatch: ColumnarBatch + def getNumRows: Long + def getDataLen: Long - def getHostMemoryBufferAndClose(): HostMemoryBuffer = { - val hostBuffer = buffer.getHostBuffer() - closeOnExcept(hostBuffer) { _ => - close() - } - hostBuffer + protected var buffer = { + SpillableHostBuffer(hmb, hmb.getLength, SpillPriorities.ACTIVE_BATCHING_PRIORITY) } override def close(): Unit = { @@ -1098,6 +1105,36 @@ class SpillableHostConcatResult( } } +class CudfSpillableHostConcatResult( + header: SerializedTableHeader, + val hmb: HostMemoryBuffer) extends SpillableHostConcatResult { + + override def toBatch: ColumnarBatch = { + closeOnExcept(buffer.getHostBuffer()) { hostBuf => + SerializedTableColumn.from(header, hostBuf) + } + } + + override def getNumRows: Long = header.getNumRows + + override def getDataLen: Long = header.getDataLen +} + +object SpillableHostConcatResult { + def from(batch: ColumnarBatch): SpillableHostConcatResult = { + require(batch.numCols() > 0, "Batch must have at least 1 column") + batch.column(0) match { + // TODO add the Kudo case + case col: SerializedTableColumn => + val buffer = col.hostBuffer + buffer.incRefCount() + new CudfSpillableHostConcatResult(col.header, buffer) + case c => + throw new IllegalStateException(s"Expected SerializedTableColumn, got ${c.getClass}") + } + } +} + /** * Converts an iterator of shuffle batches in host memory into an iterator of spillable * host memory batches. @@ -1107,17 +1144,7 @@ class SpillableHostConcatResultFromColumnarBatchIterator( override def hasNext: Boolean = iter.hasNext override def next(): SpillableHostConcatResult = { - withResource(iter.next()) { batch => - require(batch.numCols() > 0, "Batch must have at least 1 column") - batch.column(0) match { - case col: SerializedTableColumn => - val buffer = col.hostBuffer - buffer.incRefCount() - new SpillableHostConcatResult(col.header, buffer) - case c => - throw new IllegalStateException(s"Expected SerializedTableColumn, got ${c.getClass}") - } - } + withResource(iter.next())(SpillableHostConcatResult.from) } } @@ -1137,10 +1164,7 @@ class HostQueueBatchIterator( override def next(): ColumnarBatch = { if (spillableQueue.nonEmpty) { - val shcr = spillableQueue.dequeue() - closeOnExcept(shcr.getHostMemoryBufferAndClose()) { hostBuffer => - SerializedTableColumn.from(shcr.header, hostBuffer) - } + withResource(spillableQueue.dequeue())(_.toBatch) } else { batchIter.next() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index b7a9fcb9020..7d7adfc5097 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -84,6 +84,7 @@ class GpuSortMergeJoinMeta( val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val useSizedJoin = GpuShuffledSizedHashJoinExec.useSizedJoin(conf, join.joinType, join.leftKeys, join.rightKeys) + val readOpt = CoalesceReadOption(conf) val joinExec = join.joinType match { case LeftOuter | RightOuter if useSizedJoin => GpuShuffledAsymmetricHashJoinExec( @@ -95,6 +96,7 @@ class GpuSortMergeJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, join.isSkewJoin)( join.leftKeys, join.rightKeys, @@ -109,6 +111,7 @@ class GpuSortMergeJoinMeta( right, conf.isGPUShuffle, conf.gpuTargetBatchSizeBytes, + readOpt, join.isSkewJoin)( join.leftKeys, join.rightKeys) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala index cefdfa8766e..7223463b8b7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -82,12 +82,12 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L synchronized { currentNonPinnedAllocated += amount } - Some(HostMemoryBuffer.allocate(amount, false)) + Some(HostMemoryBuffer.allocateRaw(amount)) } else { synchronized { if ((currentNonPinnedAllocated + amount) <= nonPinnedLimit) { currentNonPinnedAllocated += amount - Some(HostMemoryBuffer.allocate(amount, false)) + Some(HostMemoryBuffer.allocateRaw(amount)) } else { None } diff --git a/sql-plugin/src/main/spark320/java/com/nvidia/spark/rapids/shims/XxHash64Shims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/XxHash64Shims.scala similarity index 100% rename from sql-plugin/src/main/spark320/java/com/nvidia/spark/rapids/shims/XxHash64Shims.scala rename to sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/XxHash64Shims.scala