From 6f8bdebbf237f396fbddf9faae827b5969688054 Mon Sep 17 00:00:00 2001 From: Tristan Nixon Date: Wed, 20 Sep 2023 16:42:23 -0700 Subject: [PATCH] WIP checkpoint commit - testing the performance of running the as-of join on very large DFs (large number of columns) --- python/large_cols_as_of_debug.py | 51 +++++++++++++++ python/tempo/tsdf.py | 109 +++++++++++++++++++------------ python/tests/as_of_join_tests.py | 4 ++ 3 files changed, 121 insertions(+), 43 deletions(-) create mode 100644 python/large_cols_as_of_debug.py diff --git a/python/large_cols_as_of_debug.py b/python/large_cols_as_of_debug.py new file mode 100644 index 00000000..49dd9080 --- /dev/null +++ b/python/large_cols_as_of_debug.py @@ -0,0 +1,51 @@ +import pandas as pd +from random import random +from time import time + +from pyspark.sql import SparkSession, DataFrame +import pyspark.sql.functions as F + +def build_test_as_of_join(num_cols: int, num_rows: int = 1000) -> DataFrame: + # Create 1000 feature columns of doubles + features = {f"feature_{n}" : [random() for i in range(num_rows)] for n in range(num_cols)} + + users = pd.DataFrame({ + "user" : [i for i in range(num_rows)], + **features + }) + desired_order = ["ts"] + list(users.columns) + spark = SparkSession.builder.getOrCreate() + feature_df = spark.createDataFrame(users).withColumn("ts", F.current_timestamp()) + feature_df = feature_df.select(*desired_order) + + user_subset = users[["user"]] + users_df = spark.createDataFrame(user_subset).withColumn("ts", F.current_timestamp()) + + users_df.join(feature_df, on='user', how='left') + + from tempo.tsdf import TSDF + + df_tsdf = TSDF(users_df, ts_col='ts') + ft_tsdf = TSDF( + feature_df, + ts_col='ts', + ) + + start_ts = time() + joined_df = df_tsdf.asofJoin( + ft_tsdf, + left_prefix="left", + right_prefix="right", + skipNulls=True, + tolerance=None + ).df + end_ts = time() + print(f"Time to construct join for {num_cols} columns: {end_ts - start_ts}") + + return joined_df + +# run for several different numbers of columns +# for test_col in [10, 20, 50, 100, 150, 200, 250, 300, 400, 500]: +# build_test_as_of_join(test_col) + +build_test_as_of_join(500) \ No newline at end of file diff --git a/python/tempo/tsdf.py b/python/tempo/tsdf.py index 8eb23328..619650d5 100644 --- a/python/tempo/tsdf.py +++ b/python/tempo/tsdf.py @@ -5,6 +5,7 @@ from abc import ABCMeta, abstractmethod from functools import reduce from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union +from time import time import numpy as np import pandas as pd @@ -227,53 +228,71 @@ def __getLastRightRow( .rowsBetween(Window.unboundedPreceding, Window.currentRow) ) + non_right_cols = list(set(self.df.columns) - set(right_cols)) + if ignoreNulls is False: if tsPartitionVal is not None: raise ValueError( "Disabling null skipping with a partition value is not supported yet." ) - df = reduce( - lambda df, idx: df.withColumn( - right_cols[idx], - sfn.last( - sfn.when( - sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx]) - ).otherwise(None), - True, # ignore nulls because it indicates rows from the left side - ).over(window_spec), - ), - range(len(right_cols)), - self.df, - ) - df = reduce( - lambda df, idx: df.withColumn( - right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]] - ), - range(len(right_cols)), - df, - ) + # df = reduce( + # lambda df, idx: df.withColumn( + # right_cols[idx], + # sfn.last( + # sfn.when( + # sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx]) + # ).otherwise(None), + # True, # ignore nulls because it indicates rows from the left side + # ).over(window_spec), + # ), + # range(len(right_cols)), + # self.df, + # ) + # df = reduce( + # lambda df, idx: df.withColumn( + # right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]] + # ), + # range(len(right_cols)), + # df, + # ) + + mod_right_cols = [sfn.last(sfn.when(sfn.col("rec_ind") == -1, + sfn.struct(col)).otherwise(None), + True).over(window_spec)[col].alias(col) + for col in right_cols] + df = self.df.select(non_right_cols + mod_right_cols) elif tsPartitionVal is None: # splitting off the condition as we want different columns in the reduce if implementing the skew AS OF join - df = reduce( - lambda df, idx: df.withColumn( - right_cols[idx], - sfn.last(right_cols[idx], ignoreNulls).over(window_spec), - ), - range(len(right_cols)), - self.df, - ) + # df = reduce( + # lambda df, idx: df.withColumn( + # right_cols[idx], + # sfn.last(right_cols[idx], ignoreNulls).over(window_spec), + # ), + # range(len(right_cols)), + # self.df, + # ) + non_right_cols = list(set(self.df.columns) - set(right_cols)) + mod_right_cols = [sfn.last(col, ignoreNulls).over(window_spec).alias(col) + for col in right_cols] + df = self.df.select(non_right_cols + mod_right_cols) else: - df = reduce( - lambda df, idx: df.withColumn( - right_cols[idx], - sfn.last(right_cols[idx], ignoreNulls).over(window_spec), - ).withColumn( - "non_null_ct" + right_cols[idx], - sfn.count(right_cols[idx]).over(window_spec), - ), - range(len(right_cols)), - self.df, - ) + # df = reduce( + # lambda df, idx: df.withColumn( + # right_cols[idx], + # sfn.last(right_cols[idx], ignoreNulls).over(window_spec), + # ).withColumn( + # "non_null_ct" + right_cols[idx], + # sfn.count(right_cols[idx]).over(window_spec), + # ), + # range(len(right_cols)), + # self.df, + # ) + non_right_cols = list(set(self.df.columns) - set(right_cols)) + mod_right_cols = [sfn.last(col, ignoreNulls).over(window_spec).alias(col) + for col in right_cols] + non_null_cols = [sfn.count(col).over(window_spec).alias("non_null_ct" + col) + for col in right_cols] + df = self.df.select(non_right_cols + mod_right_cols + non_null_cols) df = (df.filter(sfn.col(left_ts_col).isNotNull()).drop(self.ts_col)).drop( "rec_ind" @@ -843,16 +862,16 @@ def asofJoin( [right_tsdf.ts_col] + orig_right_col_diff, right_prefix ) - left_nonpartition_cols = list( + left_columns = list( set(left_tsdf.df.columns).difference(set(self.partitionCols)) ) - right_nonpartition_cols = list( + right_columns = list( set(right_tsdf.df.columns).difference(set(self.partitionCols)) ) # For both dataframes get all non-partition columns (including ts_col) - left_columns = [left_tsdf.ts_col] + left_nonpartition_cols - right_columns = [right_tsdf.ts_col] + right_nonpartition_cols + # left_columns = [left_tsdf.ts_col] + left_nonpartition_cols + # right_columns = [right_tsdf.ts_col] + right_nonpartition_cols # Union both dataframes, and create a combined TS column combined_ts_col = "combined_ts" @@ -866,6 +885,7 @@ def asofJoin( # perform asof join. if tsPartitionVal is None: + start_ts = time() asofDF = combined_df.__getLastRightRow( left_tsdf.ts_col, right_columns, @@ -874,10 +894,12 @@ def asofJoin( skipNulls, suppress_null_warning, ) + print(f"Time to get last right row: {time() - start_ts}") else: tsPartitionDF = combined_df.__getTimePartitions( tsPartitionVal, fraction=fraction ) + start_ts = time() asofDF = tsPartitionDF.__getLastRightRow( left_tsdf.ts_col, right_columns, @@ -886,6 +908,7 @@ def asofJoin( skipNulls, suppress_null_warning, ) + print(f"Time to get last right row: {time() - start_ts}") # Get rid of overlapped data and the extra columns generated from timePartitions df = asofDF.df.filter(sfn.col("is_original") == 1).drop( diff --git a/python/tests/as_of_join_tests.py b/python/tests/as_of_join_tests.py index 0b02c866..93c6d18a 100644 --- a/python/tests/as_of_join_tests.py +++ b/python/tests/as_of_join_tests.py @@ -23,6 +23,10 @@ def test_asof_join(self): ).df # joined dataframe should equal the expected dataframe + print("Joined DF") + joined_df.show() + print("Expected DF") + dfExpected.show() self.assertDataFrameEquality(joined_df, dfExpected) self.assertDataFrameEquality(non_prefix_joined_df, noRightPrefixdfExpected)