Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into fix_regex_empty_line_terminators
Browse files Browse the repository at this point in the history
  • Loading branch information
SurajAralihalli committed Nov 7, 2024
2 parents 674fb6f + e13cd55 commit 152e6bd
Show file tree
Hide file tree
Showing 210 changed files with 1,615 additions and 39 deletions.
17 changes: 17 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,23 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>release344</id>
<activation>
<property>
<name>buildver</name>
<value>344</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-24x_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
</dependencies>
</profile>
<profile>
<id>release350</id>
<activation>
Expand Down
19 changes: 12 additions & 7 deletions build/shimplify.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ def __shimplify_layout():
for src_type in ['main', 'test']:
__traverse_source_tree_of_all_shims(
src_type,
lambda unused_src_type, shim_file_path, build_ver_arr:
__update_files2bv(files2bv, shim_file_path, build_ver_arr))
partial(__update_files2bv, files2bv=files2bv))

# adding a new shim?
if __add_shim_buildver is not None:
Expand Down Expand Up @@ -508,11 +507,17 @@ def __shimplify_layout():
__git_rename_or_copy(shim_file, owner_shim)


def __update_files2bv(files2bv, path, buildver_arr):
assert path not in files2bv.keys(), "new path %s %s should be "\
"encountered only once, current map\n%s" % (path, buildver_arr, files2bv)
__log.debug("Adding %s %s to files to shim map", path, buildver_arr)
files2bv[path] = buildver_arr
def __update_files2bv(files2bv,
# TODO an anachronism requirement: that the following two params
# have the same name along generate_symlink_file
shim_file_path,
build_ver_arr,
#
**kwargs):
assert shim_file_path not in files2bv.keys(), "new path %s %s should be "\
"encountered only once, current map\n%s" % (shim_file_path, build_ver_arr, files2bv)
__log.debug("Adding %s %s to files to shim map", shim_file_path, build_ver_arr)
files2bv[shim_file_path] = build_ver_arr


def __add_new_shim_to_file_map(files2bv):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
38 changes: 20 additions & 18 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

pytestmark = pytest.mark.nightly_resource_consuming_test

conf_key_parquet_datetimeRebaseModeInWrite = 'spark.sql.parquet.datetimeRebaseModeInWrite'
conf_key_parquet_int96RebaseModeInWrite = 'spark.sql.parquet.int96RebaseModeInWrite'
conf_key_parquet_datetimeRebaseModeInRead = 'spark.sql.parquet.datetimeRebaseModeInRead'
conf_key_parquet_int96RebaseModeInRead = 'spark.sql.parquet.int96RebaseModeInRead'

# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for
# non-cloud
original_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
Expand All @@ -37,11 +42,8 @@
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]
parquet_decimal_struct_gen= StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(decimal_gens)])
legacy_parquet_datetimeRebaseModeInWrite='spark.sql.parquet.datetimeRebaseModeInWrite' if is_spark_400_or_later() else 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite'
legacy_parquet_int96RebaseModeInWrite='spark.sql.parquet.int96RebaseModeInWrite' if is_spark_400_or_later() else 'spark.sql.legacy.parquet.int96RebaseModeInWrite'
legacy_parquet_int96RebaseModeInRead='spark.sql.parquet.int96RebaseModeInRead' if is_spark_400_or_later() else 'spark.sql.legacy.parquet.int96RebaseModeInRead'
writer_confs={legacy_parquet_datetimeRebaseModeInWrite: 'CORRECTED',
legacy_parquet_int96RebaseModeInWrite: 'CORRECTED'}
writer_confs={conf_key_parquet_datetimeRebaseModeInWrite: 'CORRECTED',
conf_key_parquet_int96RebaseModeInWrite: 'CORRECTED'}

parquet_basic_gen =[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen, TimestampGen(), binary_gen]
Expand Down Expand Up @@ -161,8 +163,8 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase):
lambda spark, path: unary_op_df(spark, gen).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf={legacy_parquet_datetimeRebaseModeInWrite: ts_rebase,
legacy_parquet_int96RebaseModeInWrite: ts_rebase,
conf={conf_key_parquet_datetimeRebaseModeInWrite: ts_rebase,
conf_key_parquet_int96RebaseModeInWrite: ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_type})


Expand Down Expand Up @@ -288,8 +290,8 @@ def test_write_sql_save_table(spark_tmp_path, parquet_gens, spark_tmp_table_fact

def writeParquetUpgradeCatchException(spark, df, data_path, spark_tmp_table_factory, int96_rebase, datetime_rebase, ts_write):
spark.conf.set('spark.sql.parquet.outputTimestampType', ts_write)
spark.conf.set(legacy_parquet_datetimeRebaseModeInWrite, datetime_rebase)
spark.conf.set(legacy_parquet_int96RebaseModeInWrite, int96_rebase) # for spark 310
spark.conf.set(conf_key_parquet_datetimeRebaseModeInWrite, datetime_rebase)
spark.conf.set(conf_key_parquet_int96RebaseModeInWrite, int96_rebase) # for spark 310
with pytest.raises(Exception) as e_info:
df.coalesce(1).write.format("parquet").mode('overwrite').option("path", data_path).saveAsTable(spark_tmp_table_factory.get())
assert e_info.match(r".*SparkUpgradeException.*")
Expand Down Expand Up @@ -547,8 +549,8 @@ def generate_map_with_empty_validity(spark, path):
def test_parquet_write_fails_legacy_datetime(spark_tmp_path, data_gen, ts_write, ts_rebase_write):
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = {'spark.sql.parquet.outputTimestampType': ts_write,
legacy_parquet_datetimeRebaseModeInWrite: ts_rebase_write,
legacy_parquet_int96RebaseModeInWrite: ts_rebase_write}
conf_key_parquet_datetimeRebaseModeInWrite: ts_rebase_write,
conf_key_parquet_int96RebaseModeInWrite: ts_rebase_write}
def writeParquetCatchException(spark, data_gen, data_path):
with pytest.raises(Exception) as e_info:
unary_op_df(spark, data_gen).coalesce(1).write.parquet(data_path)
Expand All @@ -566,12 +568,12 @@ def test_parquet_write_roundtrip_datetime_with_legacy_rebase(spark_tmp_path, dat
ts_rebase_write, ts_rebase_read):
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = {'spark.sql.parquet.outputTimestampType': ts_write,
legacy_parquet_datetimeRebaseModeInWrite: ts_rebase_write[0],
legacy_parquet_int96RebaseModeInWrite: ts_rebase_write[1],
conf_key_parquet_datetimeRebaseModeInWrite: ts_rebase_write[0],
conf_key_parquet_int96RebaseModeInWrite: ts_rebase_write[1],
# The rebase modes in read configs should be ignored and overridden by the same
# modes in write configs, which are retrieved from the written files.
'spark.sql.legacy.parquet.datetimeRebaseModeInRead': ts_rebase_read[0],
legacy_parquet_int96RebaseModeInRead: ts_rebase_read[1]}
conf_key_parquet_datetimeRebaseModeInRead: ts_rebase_read[0],
conf_key_parquet_int96RebaseModeInRead: ts_rebase_read[1]}
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: unary_op_df(spark, data_gen).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
Expand Down Expand Up @@ -600,7 +602,7 @@ def test_it(spark):
spark.sql("CREATE TABLE {} LOCATION '{}/ctas' AS SELECT * FROM {}".format(
ctas_with_existing_name, data_path, src_name))
except pyspark.sql.utils.AnalysisException as e:
description = e._desc if is_spark_400_or_later() else e.desc
description = e._desc if (is_spark_400_or_later() or is_databricks_version_or_later(14, 3)) else e.desc
if allow_non_empty or description.find('non-empty directory') == -1:
raise e
with_gpu_session(test_it, conf)
Expand Down Expand Up @@ -829,8 +831,8 @@ def write_partitions(spark, table_path):
)

def hive_timestamp_value(spark_tmp_table_factory, spark_tmp_path, ts_rebase, func):
conf={legacy_parquet_datetimeRebaseModeInWrite: ts_rebase,
legacy_parquet_int96RebaseModeInWrite: ts_rebase}
conf={conf_key_parquet_datetimeRebaseModeInWrite: ts_rebase,
conf_key_parquet_int96RebaseModeInWrite: ts_rebase}

def create_table(spark, path):
tmp_table = spark_tmp_table_factory.get()
Expand Down
15 changes: 8 additions & 7 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pyspark.sql.types import *
import pyspark.sql.utils
import pyspark.sql.functions as f
from spark_session import with_cpu_session, with_gpu_session, is_databricks104_or_later, is_before_spark_320, is_before_spark_400
from spark_session import with_cpu_session, with_gpu_session, is_databricks104_or_later, is_databricks_version_or_later, is_before_spark_320, is_spark_400_or_later

_regexp_conf = { 'spark.rapids.sql.regexp.enabled': 'true' }

Expand Down Expand Up @@ -104,10 +104,6 @@ def test_substring_index(data_gen,delim):


@allow_non_gpu('ProjectExec')
@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="Bug in Apache Spark 4.0 causes NumberFormatExceptions from substring_index(), "
"if called with index==null. For further information, see: "
"https://issues.apache.org/jira/browse/SPARK-48989.")
@pytest.mark.parametrize('data_gen', [mk_str_gen('([ABC]{0,3}_?){0,7}')], ids=idfn)
def test_unsupported_fallback_substring_index(data_gen):
delim_gen = StringGen(pattern="_")
Expand Down Expand Up @@ -327,6 +323,10 @@ def test_rtrim(data_gen):
'TRIM(TRAILING NULL FROM a)',
'TRIM(TRAILING "" FROM a)'))

@pytest.mark.skipif(condition=is_spark_400_or_later() or is_databricks_version_or_later(14, 3),
reason="startsWith(None)/endswith(None) seems to cause an NPE in Column.fn() on Apache Spark 4.0, "
"and Databricks 14.3."
"See https://issues.apache.org/jira/browse/SPARK-48995.")
def test_startswith():
gen = mk_str_gen('[Ab\ud720]{3}A.{0,3}Z[Ab\ud720]{3}')
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -351,8 +351,9 @@ def assert_gpu_did_fallback(op):
assert_gpu_did_fallback(f.col("a").startswith(f.col("a")))


@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="endswith(None) seems to cause an NPE in Column.fn() on Apache Spark 4.0. "
@pytest.mark.skipif(condition=is_spark_400_or_later() or is_databricks_version_or_later(14, 3),
reason="startsWith(None)/endswith(None) seems to cause an NPE in Column.fn() on Apache Spark 4.0, "
"and Databricks 14.3."
"See https://issues.apache.org/jira/browse/SPARK-48995.")
def test_endswith():
gen = mk_str_gen('[Ab\ud720]{3}A.{0,3}Z[Ab\ud720]{3}')
Expand Down
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,26 @@
<module>delta-lake/delta-24x</module>
</modules>
</profile>
<profile>
<id>release344</id>
<activation>
<property>
<name>buildver</name>
<value>344</value>
</property>
</activation>
<properties>
<buildver>344</buildver>
<spark.version>${spark344.version}</spark.version>
<spark.test.version>${spark344.version}</spark.test.version>
<parquet.hadoop.version>1.12.3</parquet.hadoop.version>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<slf4j.version>2.0.6</slf4j.version>
</properties>
<modules>
<module>delta-lake/delta-24x</module>
</modules>
</profile>
<profile>
<id>release330cdh</id>
<activation>
Expand Down Expand Up @@ -796,6 +816,7 @@
<spark341.version>3.4.1</spark341.version>
<spark342.version>3.4.2</spark342.version>
<spark343.version>3.4.3</spark343.version>
<spark344.version>3.4.4</spark344.version>
<spark330cdh.version>3.3.0.3.3.7180.0-274</spark330cdh.version>
<spark332cdh.version>3.3.2.3.3.7190.0-91</spark332cdh.version>
<spark330db.version>3.3.0-databricks</spark330db.version>
Expand Down
17 changes: 17 additions & 0 deletions scala2.13/aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,23 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>release344</id>
<activation>
<property>
<name>buildver</name>
<value>344</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-24x_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
</dependencies>
</profile>
<profile>
<id>release350</id>
<activation>
Expand Down
21 changes: 21 additions & 0 deletions scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,26 @@
<module>delta-lake/delta-24x</module>
</modules>
</profile>
<profile>
<id>release344</id>
<activation>
<property>
<name>buildver</name>
<value>344</value>
</property>
</activation>
<properties>
<buildver>344</buildver>
<spark.version>${spark344.version}</spark.version>
<spark.test.version>${spark344.version}</spark.test.version>
<parquet.hadoop.version>1.12.3</parquet.hadoop.version>
<iceberg.version>${spark330.iceberg.version}</iceberg.version>
<slf4j.version>2.0.6</slf4j.version>
</properties>
<modules>
<module>delta-lake/delta-24x</module>
</modules>
</profile>
<profile>
<id>release330cdh</id>
<activation>
Expand Down Expand Up @@ -796,6 +816,7 @@
<spark341.version>3.4.1</spark341.version>
<spark342.version>3.4.2</spark342.version>
<spark343.version>3.4.3</spark343.version>
<spark344.version>3.4.4</spark344.version>
<spark330cdh.version>3.3.0.3.3.7180.0-274</spark330cdh.version>
<spark332cdh.version>3.3.2.3.3.7190.0-91</spark332cdh.version>
<spark330db.version>3.3.0-databricks</spark330db.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.execution
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand Down Expand Up @@ -179,9 +179,19 @@ class GpuBatchSubPartitioner(
// 1) Hash partition on the batch
val partedTable = GpuHashPartitioningBase.hashPartitionAndClose(
gpuBatch, inputBoundKeys, realNumPartitions, "Sub-Hash Calculate", hashSeed)
val (spillBatch, partitions) = withResource(partedTable) { _ =>
// Convert to SpillableColumnarBatch for the following retry.
(SpillableColumnarBatch(GpuColumnVector.from(partedTable.getTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY),
partedTable.getPartitions)
}
// 2) Split into smaller tables according to partitions
val subTables = withResource(partedTable) { _ =>
partedTable.getTable.contiguousSplit(partedTable.getPartitions.tail: _*)
val subTables = RmmRapidsRetryIterator.withRetryNoSplit(spillBatch) { _ =>
withResource(spillBatch.getColumnarBatch()) { cb =>
withResource(GpuColumnVector.from(cb)) { tbl =>
tbl.contiguousSplit(partitions.tail: _*)
}
}
}
// 3) Make each smaller table spillable and cache them in the queue
withResource(subTables) { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "350db"}
{"spark": "351"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
{"spark": "341"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
{"spark": "341"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
{"spark": "341db"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
{"spark": "341"}
{"spark": "342"}
{"spark": "343"}
{"spark": "344"}
{"spark": "350"}
{"spark": "351"}
{"spark": "352"}
Expand Down
Loading

0 comments on commit 152e6bd

Please sign in to comment.