From 09a41cd6e97b7989c92b8764befd4e398253f93e Mon Sep 17 00:00:00 2001 From: Tristan Nixon Date: Tue, 2 Jan 2024 13:46:36 -0800 Subject: [PATCH] flake8 formatting --- python/tempo/intervals.py | 2 +- python/tempo/stats.py | 640 +++++++++++++++++++------------------- python/tempo/tsdf.py | 193 ++++++------ python/tempo/tsschema.py | 42 ++- python/tempo/utils.py | 5 +- 5 files changed, 440 insertions(+), 442 deletions(-) diff --git a/python/tempo/intervals.py b/python/tempo/intervals.py index 71ff3f31..bd029858 100644 --- a/python/tempo/intervals.py +++ b/python/tempo/intervals.py @@ -197,7 +197,7 @@ def fromNestedBoundariesDF(cls, nested_boundaries_col: str, series_ids: Optional[list[str]] = None, start_element_name: str = "start", - end_element_name: str = "end" ) -> IntervalsDF: + end_element_name: str = "end") -> IntervalsDF: """ :param df: diff --git a/python/tempo/stats.py b/python/tempo/stats.py index 82e76189..a6ad15f2 100644 --- a/python/tempo/stats.py +++ b/python/tempo/stats.py @@ -13,347 +13,347 @@ def vwap( - tsdf: TSDF, - frequency: str = "m", - volume_col: str = "volume", - price_col: str = "price", + tsdf: TSDF, + frequency: str = "m", + volume_col: str = "volume", + price_col: str = "price", ) -> TSDF: - # set pre_vwap as self or enrich with the frequency - pre_vwap = tsdf.df - if frequency == "m": - pre_vwap = tsdf.df.withColumn( - "time_group", - sfn.concat( - sfn.lpad(sfn.hour(sfn.col(tsdf.ts_col)), 2, "0"), - sfn.lit(":"), - sfn.lpad(sfn.minute(sfn.col(tsdf.ts_col)), 2, "0"), - ), - ) - elif frequency == "H": - pre_vwap = tsdf.df.withColumn( - "time_group", - sfn.concat(sfn.lpad(sfn.hour(sfn.col(tsdf.ts_col)), 2, "0")), - ) - elif frequency == "D": - pre_vwap = tsdf.df.withColumn( - "time_group", - sfn.concat(sfn.lpad(sfn.dayofyear(sfn.col(tsdf.ts_col)), 2, "0")), - ) - - group_cols = ["time_group"] - if tsdf.series_ids: - group_cols.extend(tsdf.series_ids) - vwapped = ( - pre_vwap.withColumn("dllr_value", sfn.col(price_col) * sfn.col(volume_col)) - .groupby(group_cols) - .agg( - sfn.sum("dllr_value").alias("dllr_value"), - sfn.sum(volume_col).alias(volume_col), - sfn.max(price_col).alias("_".join(["max", price_col])), - ) - .withColumn("vwap", sfn.col("dllr_value") / sfn.col(volume_col)) - ) - - return TSDF(vwapped, ts_schema=copy.deepcopy(tsdf.ts_schema)) + # set pre_vwap as self or enrich with the frequency + pre_vwap = tsdf.df + if frequency == "m": + pre_vwap = tsdf.df.withColumn( + "time_group", + sfn.concat( + sfn.lpad(sfn.hour(sfn.col(tsdf.ts_col)), 2, "0"), + sfn.lit(":"), + sfn.lpad(sfn.minute(sfn.col(tsdf.ts_col)), 2, "0"), + ), + ) + elif frequency == "H": + pre_vwap = tsdf.df.withColumn( + "time_group", + sfn.concat(sfn.lpad(sfn.hour(sfn.col(tsdf.ts_col)), 2, "0")), + ) + elif frequency == "D": + pre_vwap = tsdf.df.withColumn( + "time_group", + sfn.concat(sfn.lpad(sfn.dayofyear(sfn.col(tsdf.ts_col)), 2, "0")), + ) + + group_cols = ["time_group"] + if tsdf.series_ids: + group_cols.extend(tsdf.series_ids) + vwapped = ( + pre_vwap.withColumn("dllr_value", sfn.col(price_col) * sfn.col(volume_col)) + .groupby(group_cols) + .agg( + sfn.sum("dllr_value").alias("dllr_value"), + sfn.sum(volume_col).alias(volume_col), + sfn.max(price_col).alias("_".join(["max", price_col])), + ) + .withColumn("vwap", sfn.col("dllr_value") / sfn.col(volume_col)) + ) + + return TSDF(vwapped, ts_schema=copy.deepcopy(tsdf.ts_schema)) def EMA(tsdf: TSDF, colName: str, window: int = 30, exp_factor: float = 0.2) -> TSDF: - """ - Constructs an approximate EMA in the fashion of: - EMA = e * lag(col,0) + e * (1 - e) * lag(col, 1) + e * (1 - e)^2 * lag(col, 2) etc, up until window - TODO: replace case when statement with coalesce - TODO: add in time partitions functionality (what is the overlap fraction?) - """ - - emaColName = "_".join(["EMA", colName]) - df = tsdf.df.withColumn(emaColName, sfn.lit(0)).orderBy(tsdf.ts_col) - w = tsdf.baseWindow() - # Generate all the lag columns: - for i in range(window): - lagColName = "_".join(["lag", colName, str(i)]) - weight = exp_factor * (1 - exp_factor) ** i - df = df.withColumn( - lagColName, weight * sfn.lag(sfn.col(colName), i).over(w) - ) - df = df.withColumn( - emaColName, - sfn.col(emaColName) - + sfn.when(sfn.col(lagColName).isNull(), sfn.lit(0)).otherwise( - sfn.col(lagColName) - ), - ).drop(lagColName) - # Nulls are currently removed - - return TSDF(df, ts_schema=copy.deepcopy(tsdf.ts_schema)) + """ + Constructs an approximate EMA in the fashion of: + EMA = e * lag(col,0) + e * (1 - e) * lag(col, 1) + e * (1 - e)^2 * lag(col, 2) etc, up until window + TODO: replace case when statement with coalesce + TODO: add in time partitions functionality (what is the overlap fraction?) + """ + + emaColName = "_".join(["EMA", colName]) + df = tsdf.df.withColumn(emaColName, sfn.lit(0)).orderBy(tsdf.ts_col) + w = tsdf.baseWindow() + # Generate all the lag columns: + for i in range(window): + lagColName = "_".join(["lag", colName, str(i)]) + weight = exp_factor * (1 - exp_factor) ** i + df = df.withColumn( + lagColName, weight * sfn.lag(sfn.col(colName), i).over(w) + ) + df = df.withColumn( + emaColName, + sfn.col(emaColName) + + sfn.when(sfn.col(lagColName).isNull(), sfn.lit(0)).otherwise( + sfn.col(lagColName) + ), + ).drop(lagColName) + # Nulls are currently removed + + return TSDF(df, ts_schema=copy.deepcopy(tsdf.ts_schema)) def withLookbackFeatures( - tsdf: TSDF, - feature_cols: List[str], - lookback_window_size: int, - exact_size: bool = True, - feature_col_name: str = "features", + tsdf: TSDF, + feature_cols: List[str], + lookback_window_size: int, + exact_size: bool = True, + feature_col_name: str = "features", ) -> TSDF: - """ - Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of - some set of features. This function creates a new column containing, for each observation, a 2-D array of the values - of some number of other columns over a trailing "lookback" window from the previous observation up to some maximum - number of past observations. - - :param tsdf: the TSDF to be enriched with the lookback feature column - :param feature_cols: the names of one or more feature columns to be aggregated into the feature column - :param lookback_window_size: The size of lookback window (in terms of past observations). Must be an integer >= 1 - :param exact_size: If True (the default), then the resulting DataFrame will only include observations where the - generated feature column contains arrays of length lookbackWindowSize. This implies that it will truncate - observations that occurred less than lookbackWindowSize from the start of the timeseries. If False, no truncation - occurs, and the column may contain arrays less than lookbackWindowSize in length. - :param feature_col_name: The name of the feature column to be generated. Defaults to "features" - - :return: a DataFrame with a feature column named featureColName containing the lookback feature tensor - """ - # first, join all featureCols into a single array column - temp_array_col_name = "__TempArrayCol" - feat_array_tsdf = tsdf.withColumn(temp_array_col_name, sfn.array(*feature_cols)) - - # construct a lookback array - lookback_win = tsdf.rowsBetweenWindow(-lookback_window_size, -1) - lookback_tsdf = feat_array_tsdf.withColumn( - feature_col_name, - sfn.collect_list(sfn.col(temp_array_col_name)).over(lookback_win), - ).drop(temp_array_col_name) - - # make sure only windows of exact size are allowed - if exact_size: - return lookback_tsdf.where(sfn.size(feature_col_name) == lookback_window_size) - - return lookback_tsdf + """ + Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of + some set of features. This function creates a new column containing, for each observation, a 2-D array of the values + of some number of other columns over a trailing "lookback" window from the previous observation up to some maximum + number of past observations. + + :param tsdf: the TSDF to be enriched with the lookback feature column + :param feature_cols: the names of one or more feature columns to be aggregated into the feature column + :param lookback_window_size: The size of lookback window (in terms of past observations). Must be an integer >= 1 + :param exact_size: If True (the default), then the resulting DataFrame will only include observations where the + generated feature column contains arrays of length lookbackWindowSize. This implies that it will truncate + observations that occurred less than lookbackWindowSize from the start of the timeseries. If False, no truncation + occurs, and the column may contain arrays less than lookbackWindowSize in length. + :param feature_col_name: The name of the feature column to be generated. Defaults to "features" + + :return: a DataFrame with a feature column named featureColName containing the lookback feature tensor + """ + # first, join all featureCols into a single array column + temp_array_col_name = "__TempArrayCol" + feat_array_tsdf = tsdf.withColumn(temp_array_col_name, sfn.array(*feature_cols)) + + # construct a lookback array + lookback_win = tsdf.rowsBetweenWindow(-lookback_window_size, -1) + lookback_tsdf = feat_array_tsdf.withColumn( + feature_col_name, + sfn.collect_list(sfn.col(temp_array_col_name)).over(lookback_win), + ).drop(temp_array_col_name) + + # make sure only windows of exact size are allowed + if exact_size: + return lookback_tsdf.where(sfn.size(feature_col_name) == lookback_window_size) + + return lookback_tsdf def withRangeStats( - tsdf: TSDF, - type: str = "range", - cols_to_summarize: Optional[List[Column]] = None, - range_back_window_secs: int = 1000, + tsdf: TSDF, + type: str = "range", + cols_to_summarize: Optional[List[Column]] = None, + range_back_window_secs: int = 1000, ) -> TSDF: - """ - Create a wider set of stats based on all numeric columns by default - Users can choose which columns they want to summarize also. These stats are: - mean/count/min/max/sum/std deviation/zscore - - :param tsdf: the input dataframe - :param type: this is created in case we want to extend these stats to lookback over a fixed number of rows instead of ranging over column values - :param cols_to_summarize: list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided - :param range_back_window_secs: lookback this many seconds in time to summarize all stats. Note this will look back from the floor of the base event timestamp (as opposed to the exact time since we cast to long) - - Assumptions: - - 1. The features are summarized over a rolling window that ranges back - 2. The range back window can be specified by the user - 3. Sequence numbers are not yet supported for the sort - 4. There is a cast to long from timestamp so microseconds or more likely breaks down - this could be more easily handled with a string timestamp or sorting the timestamp itself. If using a 'rows preceding' window, this wouldn't be a problem - """ - - # by default summarize all metric columns - if not cols_to_summarize: - cols_to_summarize = tsdf.metric_cols - - # build window - w = tsdf.rangeBetweenWindow(-1 * range_back_window_secs, 0) - - # compute column summaries - selected_cols: List[Column] = [sfn.col(c) for c in tsdf.columns] - derived_cols = [] - for metric in cols_to_summarize: - selected_cols.append(sfn.mean(metric).over(w).alias("mean_" + metric)) - selected_cols.append(sfn.count(metric).over(w).alias("count_" + metric)) - selected_cols.append(sfn.min(metric).over(w).alias("min_" + metric)) - selected_cols.append(sfn.max(metric).over(w).alias("max_" + metric)) - selected_cols.append(sfn.sum(metric).over(w).alias("sum_" + metric)) - selected_cols.append(sfn.stddev(metric).over(w).alias("stddev_" + metric)) - derived_cols.append( - ( - (sfn.col(metric) - sfn.col("mean_" + metric)) - / sfn.col("stddev_" + metric) - ).alias("zscore_" + metric) - ) - selected_df = tsdf.df.select(*selected_cols) - summary_df = selected_df.select(*selected_df.columns, *derived_cols).drop( - "double_ts" - ) - - return TSDF(summary_df, ts_schema=copy.deepcopy(tsdf.ts_schema)) + """ + Create a wider set of stats based on all numeric columns by default + Users can choose which columns they want to summarize also. These stats are: + mean/count/min/max/sum/std deviation/zscore + + :param tsdf: the input dataframe + :param type: this is created in case we want to extend these stats to lookback over a fixed number of rows instead of ranging over column values + :param cols_to_summarize: list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided + :param range_back_window_secs: lookback this many seconds in time to summarize all stats. Note this will look back from the floor of the base event timestamp (as opposed to the exact time since we cast to long) + + Assumptions: + + 1. The features are summarized over a rolling window that ranges back + 2. The range back window can be specified by the user + 3. Sequence numbers are not yet supported for the sort + 4. There is a cast to long from timestamp so microseconds or more likely breaks down - this could be more easily handled with a string timestamp or sorting the timestamp itself. If using a 'rows preceding' window, this wouldn't be a problem + """ + + # by default summarize all metric columns + if not cols_to_summarize: + cols_to_summarize = tsdf.metric_cols + + # build window + w = tsdf.rangeBetweenWindow(-1 * range_back_window_secs, 0) + + # compute column summaries + selected_cols: List[Column] = [sfn.col(c) for c in tsdf.columns] + derived_cols = [] + for metric in cols_to_summarize: + selected_cols.append(sfn.mean(metric).over(w).alias("mean_" + metric)) + selected_cols.append(sfn.count(metric).over(w).alias("count_" + metric)) + selected_cols.append(sfn.min(metric).over(w).alias("min_" + metric)) + selected_cols.append(sfn.max(metric).over(w).alias("max_" + metric)) + selected_cols.append(sfn.sum(metric).over(w).alias("sum_" + metric)) + selected_cols.append(sfn.stddev(metric).over(w).alias("stddev_" + metric)) + derived_cols.append( + ( + (sfn.col(metric) - sfn.col("mean_" + metric)) + / sfn.col("stddev_" + metric) + ).alias("zscore_" + metric) + ) + selected_df = tsdf.df.select(*selected_cols) + summary_df = selected_df.select(*selected_df.columns, *derived_cols).drop( + "double_ts" + ) + + return TSDF(summary_df, ts_schema=copy.deepcopy(tsdf.ts_schema)) def withGroupedStats( - tsdf: TSDF, - metric_cols: Optional[List[str]] = None, - freq: Optional[str] = None, + tsdf: TSDF, + metric_cols: Optional[List[str]] = None, + freq: Optional[str] = None, ) -> TSDF: - """ - Create a wider set of stats based on all numeric columns by default - Users can choose which columns they want to summarize also. These stats are: - mean/count/min/max/sum/std deviation - - :param tsdf: the input dataframe - :param metric_cols - list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided - :param freq - frequency (provide a string of the form '1 min', '30 seconds' and we interpret the window to use to aggregate - """ - - # identify columns to summarize if not provided - # these should include all numeric columns that - # are not the timestamp column and not any of the partition columns - if not metric_cols: - # columns we should never summarize - prohibited_cols = [tsdf.ts_col.lower()] - if tsdf.series_ids: - prohibited_cols.extend([pc.lower() for pc in tsdf.series_ids]) - # types that can be summarized - summarizable_types = ["int", "bigint", "float", "double"] - # filter columns to find summarizable columns - metric_cols = [ - datatype[0] - for datatype in tsdf.df.dtypes - if ( - (datatype[1] in summarizable_types) - and (datatype[0].lower() not in prohibited_cols) - ) - ] - - # build window - parsed_freq = t_resample.checkAllowableFreq(freq) - period, unit = parsed_freq[0], parsed_freq[1] - agg_window = sfn.window( - sfn.col(tsdf.ts_col), - "{} {}".format( - period, t_resample.freq_dict[unit] # type: ignore[literal-required] - ), - ) - - # compute column summaries - selected_cols = [] - for metric in metric_cols: - selected_cols.extend( - [ - sfn.mean(sfn.col(metric)).alias("mean_" + metric), - sfn.count(sfn.col(metric)).alias("count_" + metric), - sfn.min(sfn.col(metric)).alias("min_" + metric), - sfn.max(sfn.col(metric)).alias("max_" + metric), - sfn.sum(sfn.col(metric)).alias("sum_" + metric), - sfn.stddev(sfn.col(metric)).alias("stddev_" + metric), - ] - ) - - grouping_cols = [sfn.col(c) for c in tsdf.series_ids] + [agg_window] - selected_df = tsdf.df.groupBy(grouping_cols).agg(*selected_cols) - summary_df = ( - selected_df.select(*selected_df.columns) - .withColumn(tsdf.ts_col, sfn.col("window").start) - .drop("window") - ) - - return TSDF(summary_df, ts_schema=copy.deepcopy(tsdf.ts_schema)) + """ + Create a wider set of stats based on all numeric columns by default + Users can choose which columns they want to summarize also. These stats are: + mean/count/min/max/sum/std deviation + + :param tsdf: the input dataframe + :param metric_cols - list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided + :param freq - frequency (provide a string of the form '1 min', '30 seconds' and we interpret the window to use to aggregate + """ + + # identify columns to summarize if not provided + # these should include all numeric columns that + # are not the timestamp column and not any of the partition columns + if not metric_cols: + # columns we should never summarize + prohibited_cols = [tsdf.ts_col.lower()] + if tsdf.series_ids: + prohibited_cols.extend([pc.lower() for pc in tsdf.series_ids]) + # types that can be summarized + summarizable_types = ["int", "bigint", "float", "double"] + # filter columns to find summarizable columns + metric_cols = [ + datatype[0] + for datatype in tsdf.df.dtypes + if ( + (datatype[1] in summarizable_types) + and (datatype[0].lower() not in prohibited_cols) + ) + ] + + # build window + parsed_freq = t_resample.checkAllowableFreq(freq) + period, unit = parsed_freq[0], parsed_freq[1] + agg_window = sfn.window( + sfn.col(tsdf.ts_col), + "{} {}".format( + period, t_resample.freq_dict[unit] # type: ignore[literal-required] + ), + ) + + # compute column summaries + selected_cols = [] + for metric in metric_cols: + selected_cols.extend( + [ + sfn.mean(sfn.col(metric)).alias("mean_" + metric), + sfn.count(sfn.col(metric)).alias("count_" + metric), + sfn.min(sfn.col(metric)).alias("min_" + metric), + sfn.max(sfn.col(metric)).alias("max_" + metric), + sfn.sum(sfn.col(metric)).alias("sum_" + metric), + sfn.stddev(sfn.col(metric)).alias("stddev_" + metric), + ] + ) + + grouping_cols = [sfn.col(c) for c in tsdf.series_ids] + [agg_window] + selected_df = tsdf.df.groupBy(grouping_cols).agg(*selected_cols) + summary_df = ( + selected_df.select(*selected_df.columns) + .withColumn(tsdf.ts_col, sfn.col("window").start) + .drop("window") + ) + + return TSDF(summary_df, ts_schema=copy.deepcopy(tsdf.ts_schema)) def calc_bars( - tsdf: TSDF, - freq: str, - metric_cols: Optional[List[str]] = None, - fill: Optional[bool] = None, + tsdf: TSDF, + freq: str, + metric_cols: Optional[List[str]] = None, + fill: Optional[bool] = None, ) -> TSDF: - resample_open = tsdf.resample( - freq=freq, func="floor", metricCols=metric_cols, prefix="open", fill=fill - ) - resample_low = tsdf.resample( - freq=freq, func="min", metricCols=metric_cols, prefix="low", fill=fill - ) - resample_high = tsdf.resample( - freq=freq, func="max", metricCols=metric_cols, prefix="high", fill=fill - ) - resample_close = tsdf.resample( - freq=freq, func="ceil", metricCols=metric_cols, prefix="close", fill=fill - ) - - join_cols = resample_open.series_ids + [resample_open.ts_col] - bars = ( - resample_open.df.join(resample_high.df, join_cols) - .join(resample_low.df, join_cols) - .join(resample_close.df, join_cols) - ) - non_part_cols = set(bars.columns) - set(resample_open.series_ids) - {resample_open.ts_col} - sel_and_sort = ( - resample_open.series_ids + [resample_open.ts_col] + sorted(non_part_cols) - ) - bars = bars.select(sel_and_sort) - - return TSDF(bars, ts_col=resample_open.ts_col, series_ids=resample_open.series_ids) + resample_open = tsdf.resample( + freq=freq, func="floor", metricCols=metric_cols, prefix="open", fill=fill + ) + resample_low = tsdf.resample( + freq=freq, func="min", metricCols=metric_cols, prefix="low", fill=fill + ) + resample_high = tsdf.resample( + freq=freq, func="max", metricCols=metric_cols, prefix="high", fill=fill + ) + resample_close = tsdf.resample( + freq=freq, func="ceil", metricCols=metric_cols, prefix="close", fill=fill + ) + + join_cols = resample_open.series_ids + [resample_open.ts_col] + bars = ( + resample_open.df.join(resample_high.df, join_cols) + .join(resample_low.df, join_cols) + .join(resample_close.df, join_cols) + ) + non_part_cols = set(bars.columns) - set(resample_open.series_ids) - {resample_open.ts_col} + sel_and_sort = ( + resample_open.series_ids + [resample_open.ts_col] + sorted(non_part_cols) + ) + bars = bars.select(sel_and_sort) + + return TSDF(bars, ts_col=resample_open.ts_col, series_ids=resample_open.series_ids) def fourier_transform( - tsdf: TSDF, - timestep: Union[int, float, complex], - value_col: str + tsdf: TSDF, + timestep: Union[int, float, complex], + value_col: str ) -> TSDF: - """ - Function to fourier transform the time series to its frequency domain representation. - - :param tsdf: input time series dataframe - :param timestep: timestep value to be used for getting the frequency scale - :param value_col: name of the time domain data column which will be transformed - - :return: TSDF with the fourier transform columns added - """ - - def tempo_fourier_util( - pdf: pd.DataFrame, - ) -> pd.DataFrame: - """ - This method is a vanilla python logic implementing fourier transform on a numpy array using the scipy module. - This method is meant to be called from Tempo TSDF as a pandas function API on Spark - """ - select_cols = list(pdf.columns) - pdf.sort_values(by=["tpoints"], inplace=True, ascending=True) - y = np.array(pdf["tdval"]) - tran = fft(y) - r = tran.real - i = tran.imag - pdf["ft_real"] = r - pdf["ft_imag"] = i - N = tran.shape - xf = fftfreq(N[0], timestep) - pdf["freq"] = xf - return pdf[select_cols + ["freq", "ft_real", "ft_imag"]] - - data = tsdf.df - - if not tsdf.series_ids: - data = data.withColumn("dummy_group", sfn.lit("dummy_val")) - data = ( - data.select(sfn.col("dummy_group"), tsdf.ts_col, sfn.col(value_col)) - .withColumn("tdval", sfn.col(value_col)) - .withColumn("tpoints", sfn.col(tsdf.ts_col)) - ) - return_schema = ",".join( - [f"{i[0]} {i[1]}" for i in data.dtypes] - + ["freq double", "ft_real double", "ft_imag double"] - ) - result = data.groupBy("dummy_group").applyInPandas( - tempo_fourier_util, return_schema - ) - result = result.drop("dummy_group", "tdval", "tpoints") - else: - group_cols = tsdf.series_ids - data = ( - data.select(*group_cols, tsdf.ts_col, sfn.col(value_col)) - .withColumn("tdval", sfn.col(value_col)) - .withColumn("tpoints", sfn.col(tsdf.ts_col)) - ) - return_schema = ",".join( - [f"{i[0]} {i[1]}" for i in data.dtypes] - + ["freq double", "ft_real double", "ft_imag double"] - ) - result = data.groupBy(*group_cols).applyInPandas( - tempo_fourier_util, return_schema - ) - result = result.drop("tdval", "tpoints") - - return TSDF(result, ts_schema=copy.deepcopy(tsdf.ts_schema)) \ No newline at end of file + """ + Function to fourier transform the time series to its frequency domain representation. + + :param tsdf: input time series dataframe + :param timestep: timestep value to be used for getting the frequency scale + :param value_col: name of the time domain data column which will be transformed + + :return: TSDF with the fourier transform columns added + """ + + def tempo_fourier_util( + pdf: pd.DataFrame, + ) -> pd.DataFrame: + """ + This method is a vanilla python logic implementing fourier transform on a numpy array using the scipy module. + This method is meant to be called from Tempo TSDF as a pandas function API on Spark + """ + select_cols = list(pdf.columns) + pdf.sort_values(by=["tpoints"], inplace=True, ascending=True) + y = np.array(pdf["tdval"]) + tran = fft(y) + r = tran.real + i = tran.imag + pdf["ft_real"] = r + pdf["ft_imag"] = i + N = tran.shape + xf = fftfreq(N[0], timestep) + pdf["freq"] = xf + return pdf[select_cols + ["freq", "ft_real", "ft_imag"]] + + data = tsdf.df + + if not tsdf.series_ids: + data = data.withColumn("dummy_group", sfn.lit("dummy_val")) + data = ( + data.select(sfn.col("dummy_group"), tsdf.ts_col, sfn.col(value_col)) + .withColumn("tdval", sfn.col(value_col)) + .withColumn("tpoints", sfn.col(tsdf.ts_col)) + ) + return_schema = ",".join( + [f"{i[0]} {i[1]}" for i in data.dtypes] + + ["freq double", "ft_real double", "ft_imag double"] + ) + result = data.groupBy("dummy_group").applyInPandas( + tempo_fourier_util, return_schema + ) + result = result.drop("dummy_group", "tdval", "tpoints") + else: + group_cols = tsdf.series_ids + data = ( + data.select(*group_cols, tsdf.ts_col, sfn.col(value_col)) + .withColumn("tdval", sfn.col(value_col)) + .withColumn("tpoints", sfn.col(tsdf.ts_col)) + ) + return_schema = ",".join( + [f"{i[0]} {i[1]}" for i in data.dtypes] + + ["freq double", "ft_real double", "ft_imag double"] + ) + result = data.groupBy(*group_cols).applyInPandas( + tempo_fourier_util, return_schema + ) + result = result.drop("tdval", "tpoints") + + return TSDF(result, ts_schema=copy.deepcopy(tsdf.ts_schema)) diff --git a/python/tempo/tsdf.py b/python/tempo/tsdf.py index 35e02e59..2f4e1c87 100644 --- a/python/tempo/tsdf.py +++ b/python/tempo/tsdf.py @@ -16,8 +16,9 @@ from pyspark.sql.column import Column from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import DataType, StructType +from pyspark.sql._typing import ColumnOrName +from pyspark.sql.pandas._typing import PandasMapIterFunction, PandasGroupedMapFunction from pyspark.sql.window import Window, WindowSpec -from scipy.fft import fft, fftfreq # type: ignore import tempo.interpol as t_interpolation import tempo.io as t_io @@ -594,85 +595,85 @@ def show( ) # pragma: no cover t_utils.get_display_df(self, k).show(n, truncate, vertical) - def describe(self) -> DataFrame: - """ - Describe a TSDF object using a global summary across all time series (anywhere from 10 to millions) as well as the standard Spark data frame stats. Missing vals - Summary - global - unique time series based on partition columns, min/max times, granularity - lowest precision in the time series timestamp column - count / mean / stddev / min / max - standard Spark data frame describe() output - missing_vals_pct - percentage (from 0 to 100) of missing values. - """ - # extract the double version of the timestamp column to summarize - double_ts_col = self.ts_col + "_dbl" - - this_df = self.df.withColumn(double_ts_col, sfn.col(self.ts_col).cast("double")) - - # summary missing value percentages - missing_vals = this_df.select( - [ - ( - 100 - * sfn.count(sfn.when(sfn.col(c[0]).isNull(), c[0])) - / sfn.count(sfn.lit(1)) - ).alias(c[0]) - for c in this_df.dtypes - if c[1] != "timestamp" - ] - ).select(sfn.lit("missing_vals_pct").alias("summary"), "*") - - # describe stats - desc_stats = this_df.describe().union(missing_vals) - unique_ts = this_df.select(*self.series_ids).distinct().count() - - max_ts = this_df.select( - sfn.max(sfn.col(self.ts_col)).alias("max_ts") - ).collect()[0][0] - min_ts = this_df.select( - sfn.min(sfn.col(self.ts_col)).alias("max_ts") - ).collect()[0][0] - gran = this_df.selectExpr( - """min(case when {0} - cast({0} as integer) > 0 then '1-millis' - when {0} % 60 != 0 then '2-seconds' - when {0} % 3600 != 0 then '3-minutes' - when {0} % 86400 != 0 then '4-hours' - else '5-days' end) granularity""".format( - double_ts_col - ) - ).collect()[0][0][2:] - - non_summary_cols = [c for c in desc_stats.columns if c != "summary"] - - desc_stats = desc_stats.select( - sfn.col("summary"), - sfn.lit(" ").alias("unique_ts_count"), - sfn.lit(" ").alias("min_ts"), - sfn.lit(" ").alias("max_ts"), - sfn.lit(" ").alias("granularity"), - *non_summary_cols, - ) - - # add in single record with global summary attributes and the previously computed missing value and Spark data frame describe stats - global_smry_rec = desc_stats.limit(1).select( - sfn.lit("global").alias("summary"), - sfn.lit(unique_ts).alias("unique_ts_count"), - sfn.lit(min_ts).alias("min_ts"), - sfn.lit(max_ts).alias("max_ts"), - sfn.lit(gran).alias("granularity"), - *[sfn.lit(" ").alias(c) for c in non_summary_cols], - ) - - full_smry = global_smry_rec.union(desc_stats) - full_smry = full_smry.withColumnRenamed( - "unique_ts_count", "unique_time_series_count" - ) - - try: # pragma: no cover - dbutils.fs.ls("/") # type: ignore - return full_smry - # TODO: Can we raise something other than generic Exception? - # perhaps refactor to check for IS_DATABRICKS - except Exception: - return full_smry + # def describe(self) -> DataFrame: + # """ + # Describe a TSDF object using a global summary across all time series (anywhere from 10 to millions) as well as the standard Spark data frame stats. Missing vals + # Summary + # global - unique time series based on partition columns, min/max times, granularity - lowest precision in the time series timestamp column + # count / mean / stddev / min / max - standard Spark data frame describe() output + # missing_vals_pct - percentage (from 0 to 100) of missing values. + # """ + # # extract the double version of the timestamp column to summarize + # double_ts_col = self.ts_col + "_dbl" + # + # this_df = self.df.withColumn(double_ts_col, sfn.col(self.ts_col).cast("double")) + # + # # summary missing value percentages + # missing_vals = this_df.select( + # [ + # ( + # 100 + # * sfn.count(sfn.when(sfn.col(c[0]).isNull(), c[0])) + # / sfn.count(sfn.lit(1)) + # ).alias(c[0]) + # for c in this_df.dtypes + # if c[1] != "timestamp" + # ] + # ).select(sfn.lit("missing_vals_pct").alias("summary"), "*") + # + # # describe stats + # desc_stats = this_df.describe().union(missing_vals) + # unique_ts = this_df.select(*self.series_ids).distinct().count() + # + # max_ts = this_df.select( + # sfn.max(sfn.col(self.ts_col)).alias("max_ts") + # ).collect()[0][0] + # min_ts = this_df.select( + # sfn.min(sfn.col(self.ts_col)).alias("max_ts") + # ).collect()[0][0] + # gran = this_df.selectExpr( + # """min(case when {0} - cast({0} as integer) > 0 then '1-millis' + # when {0} % 60 != 0 then '2-seconds' + # when {0} % 3600 != 0 then '3-minutes' + # when {0} % 86400 != 0 then '4-hours' + # else '5-days' end) granularity""".format( + # double_ts_col + # ) + # ).collect()[0][0][2:] + # + # non_summary_cols = [c for c in desc_stats.columns if c != "summary"] + # + # desc_stats = desc_stats.select( + # sfn.col("summary"), + # sfn.lit(" ").alias("unique_ts_count"), + # sfn.lit(" ").alias("min_ts"), + # sfn.lit(" ").alias("max_ts"), + # sfn.lit(" ").alias("granularity"), + # *non_summary_cols, + # ) + # + # # add in single record with global summary attributes and the previously computed missing value and Spark data frame describe stats + # global_smry_rec = desc_stats.limit(1).select( + # sfn.lit("global").alias("summary"), + # sfn.lit(unique_ts).alias("unique_ts_count"), + # sfn.lit(min_ts).alias("min_ts"), + # sfn.lit(max_ts).alias("max_ts"), + # sfn.lit(gran).alias("granularity"), + # *[sfn.lit(" ").alias(c) for c in non_summary_cols], + # ) + # + # full_smry = global_smry_rec.union(desc_stats) + # full_smry = full_smry.withColumnRenamed( + # "unique_ts_count", "unique_time_series_count" + # ) + # + # try: # pragma: no cover + # dbutils.fs.ls("/") # type: ignore + # return full_smry + # # TODO: Can we raise something other than generic Exception? + # # perhaps refactor to check for IS_DATABRICKS + # except Exception: + # return full_smry def __getSparkPlan(self, df: DataFrame, spark: SparkSession) -> str: """ @@ -1004,14 +1005,14 @@ def withColumnTypeChanged(self, colName: str, newType: Union[DataType, str]): return self.__withTransformedDF(new_df) @overload - def drop(self, cols: "ColumnOrName") -> "TSDF": + def drop(self, cols: ColumnOrName) -> TSDF: ... @overload - def drop(self, *cols: str) -> "TSDF": + def drop(self, *cols: str) -> TSDF: ... - def drop(self, *cols: "ColumnOrName") -> "TSDF": + def drop(self, *cols: ColumnOrName) -> TSDF: """ Returns a new :class:`TSDF` that drops the specified column. @@ -1025,7 +1026,7 @@ def drop(self, *cols: "ColumnOrName") -> "TSDF": return self.__withTransformedDF(dropped_df) def mapInPandas(self, - func: "PandasMapIterFunction", + func: PandasMapIterFunction, schema: Union[StructType, str]) -> TSDF: """ @@ -1033,7 +1034,7 @@ def mapInPandas(self, :param schema: :return: """ - mapped_df = self.df.mapInPandas(func,schema) + mapped_df = self.df.mapInPandas(func, schema) return self.__withTransformedDF(mapped_df) def union(self, other: TSDF) -> TSDF: @@ -1083,7 +1084,7 @@ def rollingAgg(self, def rollingApply(self, outputCol: str, window: WindowSpec, - func: "PandasGroupedMapFunction", + func: PandasGroupedMapFunction, schema: Union[StructType, str], *inputCols: Union[str, Column]) -> TSDF: """ @@ -1103,7 +1104,7 @@ def rollingApply(self, # Aggregations # - ## Aggregations across series and time + # Aggregations across series and time def summarize(self, *cols: Optional[Union[str, List[str]]]) -> GroupedData: """ @@ -1148,7 +1149,7 @@ def metricSummary(self, *statistics: str) -> DataFrame: """ return self.df.select(self.metric_cols).summary(statistics) - ## Aggregations by series + # Aggregations by series def groupBySeries(self) -> GroupedData: """ @@ -1172,7 +1173,7 @@ def aggBySeries(self, *exprs: Union[Column, Dict[str, str]]) -> DataFrame: return self.groupBySeries().agg(exprs) def applyToSeries(self, - func: "PandasGroupedMapFunction", + func: PandasGroupedMapFunction, schema: Union[StructType, str]) -> DataFrame: """ Maps each series using a pandas udf and returns the result as a `DataFrame`. @@ -1203,7 +1204,7 @@ def applyToSeries(self, """ return self.groupBySeries().applyInPandas(func, schema) - ### Cyclical Aggregtion + # Cyclical Aggregtion def groupByCycles(self, length: str, @@ -1259,7 +1260,7 @@ def aggByCycles(self, def applyToCycles(self, length: str, - func: "PandasGroupedMapFunction", + func: PandasGroupedMapFunction, schema: Union[StructType, str], period: Optional[str] = None, offset: Optional[str] = None, @@ -1564,7 +1565,7 @@ def interpolate( ts_col = self.ts_col if series_ids is None: - partition_cols = self.series_ids + series_ids = self.series_ids # Set defaults for target columns, timestamp column and partition columns when not provided if target_cols is None: @@ -1582,11 +1583,11 @@ def interpolate( ] interpolate_service = t_interpolation.Interpolation(is_resampled=True) - tsdf_input = TSDF(self.df, ts_col=self.ts_col, series_ids=self.series_ids) + tsdf_input = TSDF(self.df, ts_col=ts_col, series_ids=series_ids) interpolated_df = interpolate_service.interpolate( tsdf=tsdf_input, - ts_col=self.ts_col, - series_ids=self.series_ids, + ts_col=ts_col, + series_ids=series_ids, target_cols=target_cols, freq=freq, func=func, @@ -1626,4 +1627,4 @@ def __ge__(self, other: Any) -> bool: pass -CT = TypeVar("CT", bound=Comparable) \ No newline at end of file +CT = TypeVar("CT", bound=Comparable) diff --git a/python/tempo/tsschema.py b/python/tempo/tsschema.py index b0abb3e3..757383c3 100644 --- a/python/tempo/tsschema.py +++ b/python/tempo/tsschema.py @@ -1,11 +1,12 @@ -from enum import Enum, auto from abc import ABC, abstractmethod -from typing import cast, Any, Union, Optional, Collection, List +from enum import Enum, auto +from typing import Any, Collection, List, Optional, Union, cast import pyspark.sql.functions as sfn -from pyspark.sql import Column, WindowSpec, Window -from pyspark.sql.types import * -from pyspark.sql.types import NumericType +from pyspark.sql import Column, Window, WindowSpec +from pyspark.sql.types import BooleanType, DateType, NumericType, StringType, \ + StructField, StructType, TimestampType + # # Time Units @@ -151,13 +152,13 @@ def ts_col(self) -> str: def validate(self, df_schema: StructType) -> None: # the ts column must exist - assert(self.colname in df_schema.fieldNames(), - f"The TSIndex column {self.colname} does not exist in the given DataFrame") + assert self.colname in df_schema.fieldNames(), \ + f"The TSIndex column {self.colname} does not exist in the given DataFrame" schema_ts_col = df_schema[self.colname] # it must have the right type schema_ts_type = schema_ts_col.dataType - assert( isinstance(schema_ts_type, type(self.dataType)), - f"The TSIndex column is of type {schema_ts_type}, but the expected type is {self.dataType}" ) + assert isinstance(schema_ts_type, type(self.dataType)), \ + f"The TSIndex column is of type {schema_ts_type}, but the expected type is {self.dataType}" def renamed(self, new_name: str) -> "TSIndex": self.__name = new_name @@ -270,7 +271,6 @@ def __init__(self, ts_idx: StructField, *ts_fields: str) -> None: self.ts_components = [SimpleTSIndex.fromTSCol(self.struct[field]) for field in ts_fields] self.primary_ts_idx = self.ts_components[0] - @property def _indexAttributes(self) -> dict[str, Any]: return { @@ -297,13 +297,13 @@ def unit(self) -> Optional[TimeUnits]: def validate(self, df_schema: StructType) -> None: # validate that the composite field exists - assert(self.colname in df_schema.fieldNames(), - f"The TSIndex column {self.colname} does not exist in the given DataFrame") + assert self.colname in df_schema.fieldNames(), \ + f"The TSIndex column {self.colname} does not exist in the given DataFrame" schema_ts_col = df_schema[self.colname] # it must have the right type schema_ts_type = schema_ts_col.dataType - assert( isinstance(schema_ts_type, StructType), - f"The TSIndex column is of type {schema_ts_type}, but the expected type is {StructType}" ) + assert isinstance(schema_ts_type, StructType), \ + f"The TSIndex column is of type {schema_ts_type}, but the expected type is {StructType}" # validate all the TS components for comp in self.ts_components: comp.validate(schema_ts_type) @@ -372,12 +372,12 @@ def validate(self, df_schema: StructType) -> None: super().validate(df_schema) # make sure the parsed field exists composite_idx_type: StructType = cast(StructType, df_schema[self.colname].dataType) - assert( self.__src_str_col in composite_idx_type, - f"The src_str_col column {self.src_str_col} does not exist in the composite field {composite_idx_type}") + assert self.__src_str_col in composite_idx_type, \ + f"The src_str_col column {self.src_str_col} does not exist in the composite field {composite_idx_type}" # make sure it's StringType src_str_field_type = composite_idx_type[self.__src_str_col].dataType - assert( isinstance(src_str_field_type, StringType), - f"The src_str_col column {self.src_str_col} should be of StringType, but found {src_str_field_type} instead" ) + assert isinstance(src_str_field_type, StringType), \ + f"The src_str_col column {self.src_str_col} should be of StringType, but found {src_str_field_type} instead" class ParsedTimestampIndex(ParsedTSIndex): @@ -567,8 +567,8 @@ def validate(self, df_schema: StructType) -> None: self.ts_idx.validate(df_schema) # check series IDs for sid in self.series_ids: - assert( sid in df_schema.fieldNames(), - f"Series ID {sid} does not exist in the given DataFrame" ) + assert sid in df_schema.fieldNames(), \ + f"Series ID {sid} does not exist in the given DataFrame" def find_observational_columns(self, df_schema: StructType) -> list[str]: return list(set(df_schema.fieldNames()) - set(self.structural_columns)) @@ -605,5 +605,3 @@ def rangeBetweenWindow(self, start: int, end: int, reverse: bool = False) -> Win .orderBy(self.ts_idx.rangeExpr(reverse=reverse)) .rangeBetween(start, end) ) - - diff --git a/python/tempo/utils.py b/python/tempo/utils.py index 05af5f68..74ab717f 100644 --- a/python/tempo/utils.py +++ b/python/tempo/utils.py @@ -3,14 +3,13 @@ import logging import os import warnings -from typing import List, Optional, Union, overload +from typing import Optional, Union, overload +import pyspark.sql.functions as sfn from IPython import get_ipython from IPython.core.display import HTML from IPython.display import display as ipydisplay from pandas.core.frame import DataFrame as pandasDataFrame - -import pyspark.sql.functions as sfn from pyspark.sql.dataframe import DataFrame import tempo.resample as t_resample