Skip to content

Commit

Permalink
WIP checkpoint commit - testing the performance of running the as-of …
Browse files Browse the repository at this point in the history
…join on very large DFs (large number of columns)
  • Loading branch information
tnixon committed Sep 20, 2023
1 parent 9f2bbe4 commit 6f8bdeb
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 43 deletions.
51 changes: 51 additions & 0 deletions python/large_cols_as_of_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pandas as pd
from random import random
from time import time

from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

def build_test_as_of_join(num_cols: int, num_rows: int = 1000) -> DataFrame:
# Create 1000 feature columns of doubles
features = {f"feature_{n}" : [random() for i in range(num_rows)] for n in range(num_cols)}

users = pd.DataFrame({
"user" : [i for i in range(num_rows)],
**features
})
desired_order = ["ts"] + list(users.columns)
spark = SparkSession.builder.getOrCreate()
feature_df = spark.createDataFrame(users).withColumn("ts", F.current_timestamp())
feature_df = feature_df.select(*desired_order)

user_subset = users[["user"]]
users_df = spark.createDataFrame(user_subset).withColumn("ts", F.current_timestamp())

users_df.join(feature_df, on='user', how='left')

from tempo.tsdf import TSDF

df_tsdf = TSDF(users_df, ts_col='ts')
ft_tsdf = TSDF(
feature_df,
ts_col='ts',
)

start_ts = time()
joined_df = df_tsdf.asofJoin(
ft_tsdf,
left_prefix="left",
right_prefix="right",
skipNulls=True,
tolerance=None
).df
end_ts = time()
print(f"Time to construct join for {num_cols} columns: {end_ts - start_ts}")

return joined_df

# run for several different numbers of columns
# for test_col in [10, 20, 50, 100, 150, 200, 250, 300, 400, 500]:
# build_test_as_of_join(test_col)

build_test_as_of_join(500)
109 changes: 66 additions & 43 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from abc import ABCMeta, abstractmethod
from functools import reduce
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union
from time import time

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -227,53 +228,71 @@ def __getLastRightRow(
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

non_right_cols = list(set(self.df.columns) - set(right_cols))

if ignoreNulls is False:
if tsPartitionVal is not None:
raise ValueError(
"Disabling null skipping with a partition value is not supported yet."
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(
sfn.when(
sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx])
).otherwise(None),
True, # ignore nulls because it indicates rows from the left side
).over(window_spec),
),
range(len(right_cols)),
self.df,
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]]
),
range(len(right_cols)),
df,
)
# df = reduce(
# lambda df, idx: df.withColumn(
# right_cols[idx],
# sfn.last(
# sfn.when(
# sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx])
# ).otherwise(None),
# True, # ignore nulls because it indicates rows from the left side
# ).over(window_spec),
# ),
# range(len(right_cols)),
# self.df,
# )
# df = reduce(
# lambda df, idx: df.withColumn(
# right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]]
# ),
# range(len(right_cols)),
# df,
# )

mod_right_cols = [sfn.last(sfn.when(sfn.col("rec_ind") == -1,
sfn.struct(col)).otherwise(None),
True).over(window_spec)[col].alias(col)
for col in right_cols]
df = self.df.select(non_right_cols + mod_right_cols)
elif tsPartitionVal is None:
# splitting off the condition as we want different columns in the reduce if implementing the skew AS OF join
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
),
range(len(right_cols)),
self.df,
)
# df = reduce(
# lambda df, idx: df.withColumn(
# right_cols[idx],
# sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
# ),
# range(len(right_cols)),
# self.df,
# )
non_right_cols = list(set(self.df.columns) - set(right_cols))
mod_right_cols = [sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols]
df = self.df.select(non_right_cols + mod_right_cols)
else:
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
).withColumn(
"non_null_ct" + right_cols[idx],
sfn.count(right_cols[idx]).over(window_spec),
),
range(len(right_cols)),
self.df,
)
# df = reduce(
# lambda df, idx: df.withColumn(
# right_cols[idx],
# sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
# ).withColumn(
# "non_null_ct" + right_cols[idx],
# sfn.count(right_cols[idx]).over(window_spec),
# ),
# range(len(right_cols)),
# self.df,
# )
non_right_cols = list(set(self.df.columns) - set(right_cols))
mod_right_cols = [sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols]
non_null_cols = [sfn.count(col).over(window_spec).alias("non_null_ct" + col)
for col in right_cols]
df = self.df.select(non_right_cols + mod_right_cols + non_null_cols)

df = (df.filter(sfn.col(left_ts_col).isNotNull()).drop(self.ts_col)).drop(
"rec_ind"
Expand Down Expand Up @@ -843,16 +862,16 @@ def asofJoin(
[right_tsdf.ts_col] + orig_right_col_diff, right_prefix
)

left_nonpartition_cols = list(
left_columns = list(
set(left_tsdf.df.columns).difference(set(self.partitionCols))
)
right_nonpartition_cols = list(
right_columns = list(
set(right_tsdf.df.columns).difference(set(self.partitionCols))
)

# For both dataframes get all non-partition columns (including ts_col)
left_columns = [left_tsdf.ts_col] + left_nonpartition_cols
right_columns = [right_tsdf.ts_col] + right_nonpartition_cols
# left_columns = [left_tsdf.ts_col] + left_nonpartition_cols
# right_columns = [right_tsdf.ts_col] + right_nonpartition_cols

# Union both dataframes, and create a combined TS column
combined_ts_col = "combined_ts"
Expand All @@ -866,6 +885,7 @@ def asofJoin(

# perform asof join.
if tsPartitionVal is None:
start_ts = time()
asofDF = combined_df.__getLastRightRow(
left_tsdf.ts_col,
right_columns,
Expand All @@ -874,10 +894,12 @@ def asofJoin(
skipNulls,
suppress_null_warning,
)
print(f"Time to get last right row: {time() - start_ts}")
else:
tsPartitionDF = combined_df.__getTimePartitions(
tsPartitionVal, fraction=fraction
)
start_ts = time()
asofDF = tsPartitionDF.__getLastRightRow(
left_tsdf.ts_col,
right_columns,
Expand All @@ -886,6 +908,7 @@ def asofJoin(
skipNulls,
suppress_null_warning,
)
print(f"Time to get last right row: {time() - start_ts}")

# Get rid of overlapped data and the extra columns generated from timePartitions
df = asofDF.df.filter(sfn.col("is_original") == 1).drop(
Expand Down
4 changes: 4 additions & 0 deletions python/tests/as_of_join_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def test_asof_join(self):
).df

# joined dataframe should equal the expected dataframe
print("Joined DF")
joined_df.show()
print("Expected DF")
dfExpected.show()
self.assertDataFrameEquality(joined_df, dfExpected)
self.assertDataFrameEquality(non_prefix_joined_df, noRightPrefixdfExpected)

Expand Down

0 comments on commit 6f8bdeb

Please sign in to comment.