Skip to content

Commit

Permalink
add time travel test
Browse files Browse the repository at this point in the history
  • Loading branch information
anniewang-db committed Jul 5, 2024
1 parent 20ca652 commit cb57986
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions hudi/integration_tests/write_uniform_hudi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

def get_delta_df(spark, table_name):
hudi_table_path = os.path.join(warehouse_path, table_name)
print('hudi_table_path:', hudi_table_path)
df_delta = spark.read.format("delta").load(hudi_table_path)
return df_delta

Expand Down Expand Up @@ -106,6 +107,20 @@ def get_hudi_df(spark, table_name):
df_delta_4 = get_delta_df(spark_delta, f"{hudi_table_base_name}_4")
df_delta_4.show()

# time travel
spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_5 (col1 INT, col2 STRING) USING DELTA
TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi',
'delta.columnMapping.mode' = 'name')""")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (1, 'a')")
spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (2, 'b')")

df_history_5 = spark_delta.sql(f"DESCRIBE HISTORY {hudi_table_base_name}_5")
timestamp = df_history_5.collect()[0]['timestamp'] # get the timestamp of the first commit
df_delta_5 = spark_delta.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_delta_5.show()

time.sleep(5)

###################### Read tables from Hudi engine ######################
Expand Down Expand Up @@ -141,4 +156,10 @@ def get_hudi_df(spark, table_name):
df_hudi_4.show()
assertDataFrameEqual(df_delta_4, df_hudi_4)

df_hudi_5 = spark_hudi.sql(f"""
SELECT * FROM {hudi_table_base_name}_5
TIMESTAMP AS OF '{timestamp}'""")
df_hudi_5.show()
assertDataFrameEqual(df_delta_5, df_hudi_5)

print('UniForm Hudi integration test passed!')

0 comments on commit cb57986

Please sign in to comment.