Skip to content

Commit

Permalink
[Hudi] Add integration tests for Hudi (delta-io#3338)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [x] Other (Hudi)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
This PR adds integration tests for Hudi. Previously, the integration
test was not able to actually read the table from a Hudi engine due to
Spark incompatibility, but since the new Hudi 15.0.0 release supports
Spark 3.5 we can now add verifications that actually read the tables
from Hudi.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added integration tests

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
anniewang-db authored and 7mming7 committed Jul 9, 2024
1 parent 9cd7984 commit 67607ba
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 17 deletions.
158 changes: 144 additions & 14 deletions hudi/integration_tests/write_uniform_hudi.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,165 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import current_date, current_timestamp
from pyspark.testing import assertDataFrameEqual
from delta.tables import DeltaTable
import shutil
import random
import os
import time

testRoot = "/tmp/delta-uniform-hudi/"
warehousePath = testRoot + "uniform_tables"
shutil.rmtree(testRoot, ignore_errors=True)
###################### Setup ######################

test_root = "/tmp/delta-uniform-hudi/"
warehouse_path = test_root + "uniform_tables"
shutil.rmtree(test_root, ignore_errors=True)
hudi_table_base_name = "delta_table_with_hudi"

# we need to set the following configs
spark = SparkSession.builder \
.appName("delta-uniform-hudi") \
spark_delta = SparkSession.builder \
.appName("delta-uniform-hudi-writer") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.warehouse.dir", warehousePath) \
.config("spark.sql.warehouse.dir", warehouse_path) \
.getOrCreate()

spark.sql("""CREATE TABLE `delta_table_with_hudi` (col1 INT) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
###################### Helper functions ######################

def get_delta_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
print('hudi_table_path:', hudi_table_path)
df_delta = spark.read.format("delta").load(hudi_table_path)
return df_delta

def get_hudi_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
df_hudi = (spark.read.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.load(hudi_table_path))
return df_hudi

###################### Create tables in Delta ######################
print('Delta tables:')

# validate various data types
spark_delta.sql(f"""CREATE TABLE `{hudi_table_base_name}_0` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
col9 BINARY, col10 DECIMAL(5, 2),
col11 STRUCT<field1: INT, field2: STRING,
field3: STRUCT<field4: INT, field5: INT, field6: STRING>>,
col12 ARRAY<STRUCT<field1: INT, field2: STRING>>,
col13 MAP<STRING, STRUCT<field1: INT, field2: STRING>>) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
spark_delta.sql(f"""INSERT INTO `{hudi_table_base_name}_0` VALUES
(123, true, date(current_timestamp()), 32.1, 1.23, 456, 'hello world',
current_timestamp(), X'1ABF', -999.99,
STRUCT(1, 'hello', STRUCT(2, 3, 'world')),
ARRAY(
STRUCT(1, 'first'),
STRUCT(2, 'second')
),
MAP(
'key1', STRUCT(1, 'delta'),
'key2', STRUCT(1, 'lake')
)); """)

df_delta_0 = get_delta_df(spark_delta, f"{hudi_table_base_name}_0")
df_delta_0.show()

# conversion happens correctly when enabling property after table creation
spark_delta.sql(f"CREATE TABLE {hudi_table_base_name}_1 (col1 INT, col2 STRING) USING DELTA")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_1 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_1 SET TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')")

df_delta_1 = get_delta_df(spark_delta, f"{hudi_table_base_name}_1")
df_delta_1.show()

# validate deletes
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_2 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_2 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"DELETE FROM {hudi_table_base_name}_2 WHERE col1 = 1")

df_delta_2 = get_delta_df(spark_delta, f"{hudi_table_base_name}_2")
df_delta_2.show()

# basic schema evolution
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_3 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_3 ADD COLUMN col3 INT FIRST")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (3, 4, 'c')")

spark.sql("""INSERT INTO `delta_table_with_hudi` VALUES (1); """)
df_delta_3 = get_delta_df(spark_delta, f"{hudi_table_base_name}_3")
df_delta_3.show()

# schema evolution for nested fields
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_4 (col1 STRUCT<field1: INT, field2: STRING>)
USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"""INSERT INTO {hudi_table_base_name}_4 VALUES
(named_struct('field1', 1, 'field2', 'hello'))
""")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_4 ADD COLUMN col1.field3 INT AFTER field1")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_4 VALUES (named_struct('field1', 3, 'field3', 4, 'field2', 'delta'))")

df_delta_4 = get_delta_df(spark_delta, f"{hudi_table_base_name}_4")
df_delta_4.show()

# time travel
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_5 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi',
'delta.columnMapping.mode' = 'name')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (1, 'a')")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (2, 'b')")

df_history_5 = spark_delta.sql(f"DESCRIBE HISTORY {hudi_table_base_name}_5")
timestamp = df_history_5.collect()[0]['timestamp'] # get the timestamp of the first commit
df_delta_5 = spark_delta.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_delta_5.show()

time.sleep(5)

hudiTablePath = warehousePath + "/" + "delta_table_with_hudi"
###################### Read tables from Hudi engine ######################
print('Hudi tables:')

spark_hudi = SparkSession.builder \
.appName("delta-uniform-hudi-reader") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.warehouse.dir", warehouse_path) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
.getOrCreate()

df_hudi_0 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_0")
df_hudi_0.show()
assertDataFrameEqual(df_delta_0, df_hudi_0)

df_hudi_1 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_1")
df_hudi_1.show()
assertDataFrameEqual(df_delta_1, df_hudi_1)

df_hudi_2 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_2")
df_hudi_2.show()
assertDataFrameEqual(df_delta_2, df_hudi_2)

df_hudi_3 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_3")
df_hudi_3.show()
assertDataFrameEqual(df_delta_3, df_hudi_3)

hudiMetadataPath = hudiTablePath + "/.hoodie/metadata/files"
df_hudi_4 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_4")
df_hudi_4.show()
assertDataFrameEqual(df_delta_4, df_hudi_4)

assert len(os.listdir(hudiMetadataPath)) > 0
df_hudi_5 = spark_hudi.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_hudi_5.show()
assertDataFrameEqual(df_delta_5, df_hudi_5)

# TODO: read with Hudi Spark to verify table content after Hudi supports Spark 3.5+
print('UniForm Hudi integration test passed!')
19 changes: 16 additions & 3 deletions run-integration-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def run_iceberg_integration_tests(root_dir, version, spark_version, iceberg_vers
print("Failed Iceberg tests in %s" % (test_file))
raise

def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_local):
def run_uniform_hudi_integration_tests(root_dir, version, spark_version, hudi_version, extra_maven_repo, use_local):
print("\n\n##### Running Uniform hudi tests on version %s #####" % str(version))
# clear_artifact_cache()
if use_local:
Expand All @@ -256,7 +256,9 @@ def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_
python_root_dir = path.join(root_dir, "python")
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
package = ','.join([
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version)])
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version),
"org.apache.hudi:hudi-spark%s-bundle_2.12:%s" % (spark_version, hudi_version)
])
jars = path.join(root_dir, "hudi/target/scala-2.12/delta-hudi-assembly_2.12-%s.jar" % (version))

repo = extra_maven_repo if extra_maven_repo else ""
Expand Down Expand Up @@ -483,6 +485,17 @@ def __exit__(self, tpe, value, traceback):
required=False,
default="1.4.0",
help="Iceberg Spark Runtime library version")
parser.add_argument(
"--hudi-spark-version",
required=False,
default="3.5",
help="Spark version for the Hudi library")
parser.add_argument(
"--hudi-version",
required=False,
default="0.15.0",
help="Hudi library version"
)

args = parser.parse_args()

Expand All @@ -508,7 +521,7 @@ def __exit__(self, tpe, value, traceback):

if args.run_uniform_hudi_integration_tests:
run_uniform_hudi_integration_tests(
root_dir, args.version, args.maven_repo, args.use_local)
root_dir, args.version, args.hudi_spark_version, args.hudi_version, args.maven_repo, args.use_local)
quit()

if args.run_storage_s3_dynamodb_integration_tests:
Expand Down

0 comments on commit 67607ba

Please sign in to comment.