Skip to content

Commit

Permalink
test passes
Browse files Browse the repository at this point in the history
  • Loading branch information
anniewang-db committed Jul 5, 2024
1 parent 146d497 commit e03d858
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 16 deletions.
137 changes: 123 additions & 14 deletions hudi/integration_tests/write_uniform_hudi.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,144 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import current_date, current_timestamp
from pyspark.testing import assertDataFrameEqual
from delta.tables import DeltaTable
import shutil
import random
import os
import time

testRoot = "/tmp/delta-uniform-hudi/"
warehousePath = testRoot + "uniform_tables"
shutil.rmtree(testRoot, ignore_errors=True)
###################### Setup ######################

test_root = "/tmp/delta-uniform-hudi/"
warehouse_path = test_root + "uniform_tables"
shutil.rmtree(test_root, ignore_errors=True)
hudi_table_base_name = "delta_table_with_hudi"

# we need to set the following configs
spark = SparkSession.builder \
.appName("delta-uniform-hudi") \
spark_delta = SparkSession.builder \
.appName("delta-uniform-hudi-writer") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.warehouse.dir", warehousePath) \
.config("spark.sql.warehouse.dir", warehouse_path) \
.getOrCreate()

spark.sql("""CREATE TABLE `delta_table_with_hudi` (col1 INT) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
###################### Helper functions ######################

def get_delta_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
df_delta = spark.read.format("delta").load(hudi_table_path)
return df_delta

def get_hudi_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
df_hudi = (spark.read.format("hudi")
.option("hoodie.metadata.enable", "true")
.option("hoodie.datasource.write.hive_style_partitioning", "true")
.load(hudi_table_path))
return df_hudi

###################### Create tables in Delta ######################
print('Delta tables:')

# validate various data types
spark_delta.sql(f"""CREATE TABLE `{hudi_table_base_name}_0` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
col9 BINARY, col10 DECIMAL(5, 2),
col11 STRUCT<field1: INT, field2: STRING,
field3: STRUCT<field4: INT, field5: INT, field6: STRING>>,
col12 ARRAY<STRUCT<field1: INT, field2: STRING>>,
col13 MAP<STRING, STRUCT<field1: INT, field2: STRING>>) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """)
spark_delta.sql(f"""INSERT INTO `{hudi_table_base_name}_0` VALUES
(123, true, date(current_timestamp()), 32.1, 1.23, 456, 'hello world',
current_timestamp(), X'1ABF', -999.99,
STRUCT(1, 'hello', STRUCT(2, 3, 'world')),
ARRAY(
STRUCT(1, 'first'),
STRUCT(2, 'second')
),
MAP(
'key1', STRUCT(1, 'delta'),
'key2', STRUCT(1, 'lake')
)); """)

df_delta_0 = get_delta_df(spark_delta, f"{hudi_table_base_name}_0")
df_delta_0.show()

# conversion happens correctly when enabling property after table creation
spark_delta.sql(f"CREATE TABLE {hudi_table_base_name}_1 (col1 INT, col2 STRING) USING DELTA")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_1 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_1 SET TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')")

df_delta_1 = get_delta_df(spark_delta, f"{hudi_table_base_name}_1")
df_delta_1.show()

# validate deletes
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_2 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_2 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"DELETE FROM {hudi_table_base_name}_2 WHERE col1 = 1")

df_delta_2 = get_delta_df(spark_delta, f"{hudi_table_base_name}_2")
df_delta_2.show()

spark.sql("""INSERT INTO `delta_table_with_hudi` VALUES (1); """)
# basic schema evolution
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_3 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (1, 'a'), (2, 'b')")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_3 ADD COLUMN col3 INT FIRST")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (3, 4, 'c')")

df_delta_3 = get_delta_df(spark_delta, f"{hudi_table_base_name}_3")
df_delta_3.show()

# schema evolution for nested fields
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_4 (col1 STRUCT<field1: INT, field2: STRING>)
USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""")
spark_delta.sql(f"""INSERT INTO {hudi_table_base_name}_4 VALUES
(named_struct('field1', 1, 'field2', 'hello'))
""")
spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_4 ADD COLUMN col1.field3 INT AFTER field1")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_4 VALUES (named_struct('field1', 3, 'field3', 4, 'field2', 'delta'))")

df_delta_4 = get_delta_df(spark_delta, f"{hudi_table_base_name}_4")
df_delta_4.show()

time.sleep(5)

hudiTablePath = warehousePath + "/" + "delta_table_with_hudi"
###################### Read tables from Hudi engine ######################
print('Hudi tables:')

spark_hudi = SparkSession.builder \
.appName("delta-uniform-hudi-reader") \
.master("local[*]") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.warehouse.dir", warehouse_path) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
.getOrCreate()

df_hudi_0 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_0")
df_hudi_0.show()
assertDataFrameEqual(df_delta_0, df_hudi_0)

df_hudi_1 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_1")
df_hudi_1.show()
assertDataFrameEqual(df_delta_1, df_hudi_1)

df_hudi_2 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_2")
df_hudi_2.show()
assertDataFrameEqual(df_delta_2, df_hudi_2)

hudiMetadataPath = hudiTablePath + "/.hoodie/metadata/files"
df_hudi_3 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_3")
df_hudi_3.show()
assertDataFrameEqual(df_delta_3, df_hudi_3)

assert len(os.listdir(hudiMetadataPath)) > 0
df_hudi_4 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_4")
df_hudi_4.show()
assertDataFrameEqual(df_delta_4, df_hudi_4)

# TODO: read with Hudi Spark to verify table content after Hudi supports Spark 3.5+
print('UniForm Hudi integration test passed!')
5 changes: 3 additions & 2 deletions run-integration-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_
python_root_dir = path.join(root_dir, "python")
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
package = ','.join([
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version)])
jars = path.join(root_dir, "hudi/target/scala-2.12/delta-hudi-assembly_2.12-%s.jar" % (version))
"io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version),
"org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0"])
jars = ','.join([path.join(root_dir, "hudi/target/scala-2.12/delta-hudi-assembly_2.12-%s.jar" % (version))])

repo = extra_maven_repo if extra_maven_repo else ""

Expand Down

0 comments on commit e03d858

Please sign in to comment.