Skip to content

Commit

Permalink
checkpoint save: initial test code for as-of join is running
Browse files Browse the repository at this point in the history
still have some errors to debug
  • Loading branch information
tnixon committed Jan 25, 2024
1 parent ae30fc4 commit c21deed
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 584 deletions.
50 changes: 36 additions & 14 deletions python/tempo/as_of_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,31 @@
from typing import Optional
from functools import reduce

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import DataFrame, SparkSession, Column
import pyspark.sql.functions as sfn

from tempo.timeunit import TimeUnit
from tempo.tsschema import CompositeTSIndex, TSSchema
import tempo.tsdf as t_tsdf

# Helpers


class _AsOfJoinCompositeTSIndex(CompositeTSIndex):
"""
CompositeTSIndex subclass for as-of joins
"""

@property
def unit(self) -> Optional[TimeUnit]:
return None

def rangeExpr(self, reverse: bool = False) -> Column:
raise NotImplementedError("rangeExpr is not defined for as-of join composite ts index")




# As-of join types


Expand Down Expand Up @@ -111,7 +130,7 @@ def _join(self, left: t_tsdf.TSDF, right: t_tsdf.TSDF) -> t_tsdf.TSDF:
str(self.range_join_bin_size))
# find a leading column in the right TSDF
w = right.baseWindow()
lead_colname = "lead_" + right.ts_col
lead_colname = "lead_" + right.ts_index.colname
right_with_lead = right.withColumn(lead_colname,
sfn.lead(right.ts_index.colname).over(w))
# perform the join
Expand All @@ -128,7 +147,8 @@ def _join(self, left: t_tsdf.TSDF, right: t_tsdf.TSDF) -> t_tsdf.TSDF:
_LEFT_HAND_ROW_INDICATOR = 1
_RIGHT_HAND_ROW_INDICATOR = -1

class UnionSortFilterAsOfJoin(AsOfJoiner):

class UnionSortFilterAsOfJoiner(AsOfJoiner):
"""
Implements the classic as-of join strategy of unioning the left and right
TSDFs, sorting by the timestamp column, and then filtering out only the
Expand Down Expand Up @@ -167,12 +187,12 @@ def _combine(self, left: t_tsdf.TSDF, right: t_tsdf.TSDF) -> t_tsdf.TSDF:
if not isinstance(right_comps, list):
right_comps = [right_comps]
if isinstance(left.ts_index, t_tsdf.CompositeTSIndex):
comp_names = left.ts_index.component_names
comp_names = left.ts_index.component_fields
else:
comp_names = [left.ts_index.colname]
combined_comps = [sfn.coalesce(lc, rc).alias(cn) for lc, rc, cn in zip(left_comps, right_comps, comp_names)]
# build an expression for an right-hand side indicator field
right_ind = (sfn.when(sfn.col(left.ts_col).isNotNull(),
right_ind = (sfn.when(sfn.col(left.ts_index.colname).isNotNull(),
_LEFT_HAND_ROW_INDICATOR)
.otherwise(_RIGHT_HAND_ROW_INDICATOR)
.alias(_DEFAULT_RIGHT_ROW_COLNAME))
Expand All @@ -181,8 +201,9 @@ def _combine(self, left: t_tsdf.TSDF, right: t_tsdf.TSDF) -> t_tsdf.TSDF:
sfn.struct(*combined_comps,right_ind))
combined_ts_col = with_combined_ts.df.schema[_DEFAULT_COMBINED_TS_COLNAME]
# construct a CompositeTSIndex from the combined ts index
combined_tsidx = CompositeTSIndex(combined_ts_col,
*(comp_names+[_DEFAULT_RIGHT_ROW_COLNAME]))
combined_comp_names = comp_names + [_DEFAULT_RIGHT_ROW_COLNAME]
combined_tsidx = _AsOfJoinCompositeTSIndex(combined_ts_col,
*combined_comp_names)
# put it all together in a new TSDF
return t_tsdf.TSDF(with_combined_ts.df,
ts_schema=TSSchema(combined_tsidx, left.series_ids))
Expand All @@ -196,6 +217,7 @@ def _filterLastRightRow(self,
"""
# find the last value for each column in the right-hand side
w = combined.allBeforeWindow()
right_row_field = combined.ts_index.fieldPath(_DEFAULT_RIGHT_ROW_COLNAME)
if self.skipNulls:
last_right_cols = [
sfn.last(col, True).over(w).alias(col)
Expand All @@ -204,7 +226,7 @@ def _filterLastRightRow(self,
else:
last_right_cols = [
sfn.last(
sfn.when(sfn.col(_DEFAULT_RIGHT_ROW_COLNAME) == _RIGHT_HAND_ROW_INDICATOR,
sfn.when(sfn.col(right_row_field) == _RIGHT_HAND_ROW_INDICATOR,
sfn.struct(col)).otherwise(None),
True,
).over(w).alias(col)
Expand All @@ -215,7 +237,7 @@ def _filterLastRightRow(self,
last_right_vals = combined.select(*(non_right_cols + last_right_cols))
# filter out the last right-hand row for each left-hand row
as_of_df = (last_right_vals.df
.where(sfn.col(_DEFAULT_RIGHT_ROW_COLNAME) ==
.where(sfn.col(right_row_field) ==
_LEFT_HAND_ROW_INDICATOR)
.drop(_DEFAULT_COMBINED_TS_COLNAME))
# return with the left-hand schema
Expand Down Expand Up @@ -246,7 +268,7 @@ def _join(self, left: t_tsdf.TSDF, right: t_tsdf.TSDF) -> t_tsdf.TSDF:
return as_of


class SkewAsOfJoiner(UnionSortFilterAsOfJoin):
class SkewAsOfJoiner(UnionSortFilterAsOfJoiner):
"""
Implements the as-of join strategy for skewed data
"""
Expand Down Expand Up @@ -351,7 +373,7 @@ def choose_as_of_join_strategy(left_tsdf: t_tsdf.TSDF,
return SkewAsOfJoiner(tsPartitionVal, left_prefix, right_prefix)

# default to use the union sort filter join
return UnionSortFilterAsOfJoin(left_prefix,
right_prefix,
skipNulls,
tolerance)
return UnionSortFilterAsOfJoiner(left_prefix,
right_prefix,
skipNulls,
tolerance)
130 changes: 101 additions & 29 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,78 @@
import tempo.resample as t_resample
import tempo.utils as t_utils
from tempo.intervals import IntervalsDF
from tempo.tsschema import DEFAULT_TIMESTAMP_FORMAT, is_time_format, sub_seconds_precision_digits, \
CompositeTSIndex, ParsedTSIndex, TSIndex, TSSchema, WindowBuilder
from tempo.tsschema import (DEFAULT_TIMESTAMP_FORMAT, is_time_format,
identify_fractional_second_separator,
sub_seconds_precision_digits,
CompositeTSIndex, ParsedTSIndex, TSIndex, TSSchema,
WindowBuilder)
from tempo.typing import ColumnOrName, PandasMapIterFunction, PandasGroupedMapFunction

logger = logging.getLogger(__name__)


# Helper functions


def make_struct_from_cols(df: DataFrame,
struct_col_name: str,
cols_to_move: List[str]) -> DataFrame:
"""
Transform a :class:`DataFrame` by moving certain columns into a named struct
:param df: the :class:`DataFrame` to transform
:param struct_col_name: name of the struct column to create
:param cols_to_move: name of the columns to move into the struct
:return: the transformed :class:`DataFrame`
"""
return (df.withColumn(struct_col_name,
sfn.struct(*cols_to_move))
.drop(*cols_to_move))


def time_str_to_double(df: DataFrame,
ts_str_col: str,
ts_dbl_col: str,
ts_fmt: str = DEFAULT_TIMESTAMP_FORMAT) -> DataFrame:
"""
Convert a string timestamp column to a double timestamp column
:param df: the :class:`DataFrame` to transform
:param ts_str_col: name of the string timestamp column
:param ts_dbl_col: name of the double timestamp column to create
:param fractional_seconds_split_char: the character to split fractional seconds on
:return: the transformed :class:`DataFrame`
"""
tmp_int_ts_col = "__tmp_int_ts"
tmp_frac_ts_col = "__tmp_fract_ts"
fract_secs_sep = identify_fractional_second_separator(ts_fmt)
double_ts_df = (
# get the interger part of the timestamp
df.withColumn(tmp_int_ts_col,
sfn.to_timestamp(ts_str_col, ts_fmt).cast("long"))
# get the fractional part of the timestamp
.withColumn(tmp_frac_ts_col,
sfn.when(
sfn.col(ts_str_col).contains(fract_secs_sep),
sfn.concat(
sfn.lit("0."),
sfn.split(sfn.col(ts_str_col),
f"\\{fract_secs_sep}")[1]
)).otherwise(0.0).cast("double")
)
# combine them together
.withColumn(ts_dbl_col,
sfn.col(tmp_int_ts_col) + sfn.col(tmp_frac_ts_col))
# clean up
.drop(tmp_int_ts_col, tmp_frac_ts_col)
)
return double_ts_df

# The TSDF class


class TSDF(WindowBuilder):
"""
This object is the main wrapper over a Spark data frame which allows a user to parallelize time series computations on a Spark data frame by various dimensions. The two dimensions required are partition_cols (list of columns by which to summarize) and ts_col (timestamp column, which can be epoch or TimestampType).
Expand Down Expand Up @@ -87,23 +152,6 @@ def __withStandardizedColOrder(self) -> TSDF:

return self.__withTransformedDF(self.df.select(std_ordered_cols))

@classmethod
def __makeStructFromCols(
cls, df: DataFrame, struct_col_name: str, cols_to_move: List[str]
) -> DataFrame:
"""
Transform a :class:`DataFrame` by moving certain columns into a struct
:param df: the :class:`DataFrame` to transform
:param struct_col_name: name of the struct column to create
:param cols_to_move: name of the columns to move into the struct
:return: the transformed :class:`DataFrame`
"""
return df.withColumn(struct_col_name, sfn.struct(*cols_to_move)).drop(
*cols_to_move
)

# default column name for constructed timeseries index struct columns
__DEFAULT_TS_IDX_COL = "ts_idx"

Expand All @@ -117,9 +165,9 @@ def fromSubsequenceCol(
) -> "TSDF":
# construct a struct with the ts_col and subsequence_col
struct_col_name = cls.__DEFAULT_TS_IDX_COL
with_subseq_struct_df = cls.__makeStructFromCols(df,
struct_col_name,
[ts_col, subsequence_col])
with_subseq_struct_df = make_struct_from_cols(df,
struct_col_name,
[ts_col, subsequence_col])
# construct an appropriate TSIndex
subseq_struct = with_subseq_struct_df.schema[struct_col_name]
subseq_idx = CompositeTSIndex(subseq_struct, ts_col, subsequence_col)
Expand All @@ -128,6 +176,7 @@ def fromSubsequenceCol(

# default column name for parsed timeseries column
__DEFAULT_PARSED_TS_COL = "parsed_ts"
__DEFAULT_DOUBLE_TS_COL = "double_ts"

@classmethod
def fromStringTimestamp(
Expand All @@ -138,7 +187,12 @@ def fromStringTimestamp(
ts_fmt: str = DEFAULT_TIMESTAMP_FORMAT
) -> "TSDF":
# parse the ts_col based on the pattern
is_sub_ms = False
sub_ms_digits = 0
if is_time_format(ts_fmt):
# is this a sub-microsecond precision timestamp?
sub_ms_digits = sub_seconds_precision_digits(ts_fmt)
is_sub_ms = sub_ms_digits > 6
# if the ts_fmt is a time format, we can use to_timestamp
ts_expr = sfn.to_timestamp(sfn.col(ts_col), ts_fmt)
else:
Expand All @@ -147,18 +201,36 @@ def fromStringTimestamp(
# parse the ts_col give the expression
parsed_ts_col = cls.__DEFAULT_PARSED_TS_COL
parsed_df = df.withColumn(parsed_ts_col, ts_expr)
# parse a sub-microsecond precision timestamp to a double
if is_sub_ms:
# get the integer part of the timestamp
parsed_df = time_str_to_double(parsed_df,
ts_col,
cls.__DEFAULT_DOUBLE_TS_COL,
ts_fmt)
# move the ts cols into a struct
struct_col_name = cls.__DEFAULT_TS_IDX_COL
with_parsed_struct_df = cls.__makeStructFromCols(parsed_df,
struct_col_name,
[ts_col, parsed_ts_col])
cols_to_move = [ts_col, parsed_ts_col]
if is_sub_ms:
cols_to_move.append(cls.__DEFAULT_DOUBLE_TS_COL)
with_parsed_struct_df = make_struct_from_cols(parsed_df,
struct_col_name,
cols_to_move)
# construct an appropriate TSIndex
parsed_struct = with_parsed_struct_df.schema[struct_col_name]
parsed_ts_idx = ParsedTSIndex.fromParsedTimestamp(parsed_struct,
parsed_ts_col,
ts_col)
if is_sub_ms:
parsed_ts_idx = ParsedTSIndex.fromParsedTimestamp(parsed_struct,
parsed_ts_col,
ts_col,
cls.__DEFAULT_DOUBLE_TS_COL,
sub_ms_digits)
else:
parsed_ts_idx = ParsedTSIndex.fromParsedTimestamp(parsed_struct,
parsed_ts_col,
ts_col)
# construct & return the TSDF with appropriate schema
return TSDF(with_parsed_struct_df, ts_schema=TSSchema(parsed_ts_idx, series_ids))
return TSDF(with_parsed_struct_df,
ts_schema=TSSchema(parsed_ts_idx, series_ids))

@property
def ts_index(self) -> "TSIndex":
Expand Down
Loading

0 comments on commit c21deed

Please sign in to comment.