Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hudi] Add integration tests for Hudi #3338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 144 additions & 14 deletions hudi/integration_tests/write_uniform_hudi.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,165 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import current_date, current_timestamp
from pyspark.testing import assertDataFrameEqual
from delta.tables import DeltaTable
import shutil
import random
import os
import time

testRoot = "/tmp/delta-uniform-hudi/"
warehousePath = testRoot + "uniform_tables"
shutil.rmtree(testRoot, ignore_errors=True)
###################### Setup ######################

test_root = "/tmp/delta-uniform-hudi/"
warehouse_path = test_root + "uniform_tables"
shutil.rmtree(test_root, ignore_errors=True)
hudi_table_base_name = "delta_table_with_hudi"

# we need to set the following configs
spark = SparkSession.builder \
.appName("delta-uniform-hudi") \
spark_delta = SparkSession.builder \
.appName("delta-uniform-hudi-writer") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.warehouse.dir", warehousePath) \
.config("spark.sql.warehouse.dir", warehouse_path) \
.getOrCreate()

spark.sql("""CREATE TABLE `delta_table_with_hudi` (col1 INT) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
###################### Helper functions ######################

def get_delta_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
print('hudi_table_path:', hudi_table_path)
df_delta = spark.read.format("delta").load(hudi_table_path)
return df_delta

def get_hudi_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
df_hudi = (spark.read.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.load(hudi_table_path))
return df_hudi

###################### Create tables in Delta ######################
print('Delta tables:')

# validate various data types
spark_delta.sql(f"""CREATE TABLE `{hudi_table_base_name}_0` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
col9 BINARY, col10 DECIMAL(5, 2),
col11 STRUCT<field1: INT, field2: STRING,
field3: STRUCT<field4: INT, field5: INT, field6: STRING>>,
col12 ARRAY<STRUCT<field1: INT, field2: STRING>>,
col13 MAP<STRING, STRUCT<field1: INT, field2: STRING>>) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
spark_delta.sql(f"""INSERT INTO `{hudi_table_base_name}_0` VALUES
(123, true, date(current_timestamp()), 32.1, 1.23, 456, 'hello world',
current_timestamp(), X'1ABF', -999.99,
STRUCT(1, 'hello', STRUCT(2, 3, 'world')),
ARRAY(
STRUCT(1, 'first'),
STRUCT(2, 'second')
),
MAP(
'key1', STRUCT(1, 'delta'),
'key2', STRUCT(1, 'lake')
)); """)

df_delta_0 = get_delta_df(spark_delta, f"{hudi_table_base_name}_0")
df_delta_0.show()

# conversion happens correctly when enabling property after table creation
spark_delta.sql(f"CREATE TABLE {hudi_table_base_name}_1 (col1 INT, col2 STRING) USING DELTA")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_1 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_1 SET TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')")

df_delta_1 = get_delta_df(spark_delta, f"{hudi_table_base_name}_1")
df_delta_1.show()

# validate deletes
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_2 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_2 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"DELETE FROM {hudi_table_base_name}_2 WHERE col1 = 1")

df_delta_2 = get_delta_df(spark_delta, f"{hudi_table_base_name}_2")
df_delta_2.show()

# basic schema evolution
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_3 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_3 ADD COLUMN col3 INT FIRST")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (3, 4, 'c')")

spark.sql("""INSERT INTO `delta_table_with_hudi` VALUES (1); """)
df_delta_3 = get_delta_df(spark_delta, f"{hudi_table_base_name}_3")
df_delta_3.show()

# schema evolution for nested fields
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_4 (col1 STRUCT<field1: INT, field2: STRING>)
USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"""INSERT INTO {hudi_table_base_name}_4 VALUES
(named_struct('field1', 1, 'field2', 'hello'))
""")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_4 ADD COLUMN col1.field3 INT AFTER field1")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_4 VALUES (named_struct('field1', 3, 'field3', 4, 'field2', 'delta'))")

df_delta_4 = get_delta_df(spark_delta, f"{hudi_table_base_name}_4")
df_delta_4.show()

# time travel
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_5 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi',
'delta.columnMapping.mode' = 'name')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (1, 'a')")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (2, 'b')")

df_history_5 = spark_delta.sql(f"DESCRIBE HISTORY {hudi_table_base_name}_5")
timestamp = df_history_5.collect()[0]['timestamp'] # get the timestamp of the first commit
df_delta_5 = spark_delta.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_delta_5.show()

time.sleep(5)

hudiTablePath = warehousePath + "/" + "delta_table_with_hudi"
###################### Read tables from Hudi engine ######################
print('Hudi tables:')

spark_hudi = SparkSession.builder \
.appName("delta-uniform-hudi-reader") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.warehouse.dir", warehouse_path) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
.getOrCreate()

df_hudi_0 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_0")
df_hudi_0.show()
assertDataFrameEqual(df_delta_0, df_hudi_0)

df_hudi_1 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_1")
df_hudi_1.show()
assertDataFrameEqual(df_delta_1, df_hudi_1)

df_hudi_2 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_2")
df_hudi_2.show()
assertDataFrameEqual(df_delta_2, df_hudi_2)

df_hudi_3 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_3")
df_hudi_3.show()
assertDataFrameEqual(df_delta_3, df_hudi_3)

hudiMetadataPath = hudiTablePath + "/.hoodie/metadata/files"
df_hudi_4 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_4")
df_hudi_4.show()
assertDataFrameEqual(df_delta_4, df_hudi_4)

assert len(os.listdir(hudiMetadataPath)) > 0
df_hudi_5 = spark_hudi.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_hudi_5.show()
assertDataFrameEqual(df_delta_5, df_hudi_5)

# TODO: read with Hudi Spark to verify table content after Hudi supports Spark 3.5+
print('UniForm Hudi integration test passed!')
19 changes: 16 additions & 3 deletions run-integration-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def run_iceberg_integration_tests(root_dir, version, spark_version, iceberg_vers
print("Failed Iceberg tests in %s" % (test_file))
raise

def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_local):
def run_uniform_hudi_integration_tests(root_dir, version, spark_version, hudi_version, extra_maven_repo, use_local):
print("\n\n##### Running Uniform hudi tests on version %s #####" % str(version))
# clear_artifact_cache()
if use_local:
Expand All @@ -256,7 +256,9 @@ def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_
python_root_dir = path.join(root_dir, "python")
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
package = ','.join([
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version)])
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version),
"org.apache.hudi:hudi-spark%s-bundle_2.12:%s" % (spark_version, hudi_version)
])
jars = path.join(root_dir, "hudi/target/scala-2.12/delta-hudi-assembly_2.12-%s.jar" % (version))

repo = extra_maven_repo if extra_maven_repo else ""
Expand Down Expand Up @@ -483,6 +485,17 @@ def __exit__(self, tpe, value, traceback):
required=False,
default="1.4.0",
help="Iceberg Spark Runtime library version")
parser.add_argument(
"--hudi-spark-version",
required=False,
default="3.5",
help="Spark version for the Hudi library")
parser.add_argument(
"--hudi-version",
required=False,
default="0.15.0",
help="Hudi library version"
)

args = parser.parse_args()

Expand All @@ -508,7 +521,7 @@ def __exit__(self, tpe, value, traceback):

if args.run_uniform_hudi_integration_tests:
run_uniform_hudi_integration_tests(
root_dir, args.version, args.maven_repo, args.use_local)
root_dir, args.version, args.hudi_spark_version, args.hudi_version, args.maven_repo, args.use_local)
quit()

if args.run_storage_s3_dynamodb_integration_tests:
Expand Down
Loading