Skip to content

Commit

Permalink
black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed Jan 2, 2024
1 parent 09a41cd commit 88bb03b
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 198 deletions.
39 changes: 19 additions & 20 deletions python/tempo/interpol.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ def __generate_time_series_fill(self, tsdf: t_tsdf.TSDF) -> t_tsdf.TSDF:
"previous_timestamp",
sfn.col(tsdf.ts_col),
).withColumn(
"next_timestamp",
sfn.lead(sfn.col(tsdf.ts_col)).over(tsdf.baseWindow()))
"next_timestamp", sfn.lead(sfn.col(tsdf.ts_col)).over(tsdf.baseWindow())
)

def __generate_column_time_fill(
self,
Expand All @@ -184,12 +184,14 @@ def __generate_column_time_fill(

return tsdf.withColumn(
f"previous_timestamp_{target_col}",
sfn.last(sfn.col(f"{tsdf.ts_col}_{target_col}"),
ignorenulls=True).over(fwd_win),
sfn.last(sfn.col(f"{tsdf.ts_col}_{target_col}"), ignorenulls=True).over(
fwd_win
),
).withColumn(
f"next_timestamp_{target_col}",
sfn.last(sfn.col(f"{tsdf.ts_col}_{target_col}"),
ignorenulls=True).over(bkwd_win)
sfn.last(sfn.col(f"{tsdf.ts_col}_{target_col}"), ignorenulls=True).over(
bkwd_win
),
)

def __generate_target_fill(
Expand All @@ -213,15 +215,14 @@ def __generate_target_fill(
return (
tsdf.withColumn(
f"previous_{target_col}",
sfn.last(sfn.col(target_col), ignorenulls=True).over(fwd_win)
sfn.last(sfn.col(target_col), ignorenulls=True).over(fwd_win),
)
# Handle if subsequent value is null
.withColumn(
f"next_null_{target_col}",
sfn.last(sfn.col(target_col), ignorenulls=True).over(bkwd_win)
sfn.last(sfn.col(target_col), ignorenulls=True).over(bkwd_win),
).withColumn(
f"next_{target_col}",
sfn.lead(sfn.col(target_col)).over(fwd_win)
f"next_{target_col}", sfn.lead(sfn.col(target_col)).over(fwd_win)
)
)

Expand Down Expand Up @@ -277,9 +278,7 @@ def interpolate(

if self.is_resampled is False:
# Resample and Normalize Input
sampled_input = tsdf.resample(freq=freq,
func=func,
metricCols=target_cols)
sampled_input = tsdf.resample(freq=freq, func=func, metricCols=target_cols)

# Fill timeseries for nearest values
time_series_filled = self.__generate_time_series_fill(sampled_input)
Expand All @@ -290,11 +289,11 @@ def interpolate(
for column in target_cols:
add_column_time = add_column_time.withColumn(
f"{tsdf.ts_col}_{column}",
sfn.when(sfn.col(column).isNull(), None).otherwise(sfn.col(tsdf.ts_col)),
)
add_column_time = self.__generate_column_time_fill(
add_column_time, column
sfn.when(sfn.col(column).isNull(), None).otherwise(
sfn.col(tsdf.ts_col)
),
)
add_column_time = self.__generate_column_time_fill(add_column_time, column)

# Handle edge case if last value (latest) is null
edge_filled = add_column_time.withColumn(
Expand Down Expand Up @@ -325,9 +324,9 @@ def interpolate(
flagged_series = (
exploded_series.withColumn(
"is_ts_interpolated",
sfn.when(sfn.col(f"new_{tsdf.ts_col}") != sfn.col(tsdf.ts_col), True).otherwise(
False
),
sfn.when(
sfn.col(f"new_{tsdf.ts_col}") != sfn.col(tsdf.ts_col), True
).otherwise(False),
)
.withColumn(tsdf.ts_col, sfn.col(f"new_{tsdf.ts_col}"))
.drop(sfn.col(f"new_{tsdf.ts_col}"))
Expand Down
29 changes: 16 additions & 13 deletions python/tempo/intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,14 @@ def fromStackedMetrics(
return cls(df, start_ts, end_ts, series)

@classmethod
def fromNestedBoundariesDF(cls,
df: DataFrame,
nested_boundaries_col: str,
series_ids: Optional[list[str]] = None,
start_element_name: str = "start",
end_element_name: str = "end") -> IntervalsDF:
def fromNestedBoundariesDF(
cls,
df: DataFrame,
nested_boundaries_col: str,
series_ids: Optional[list[str]] = None,
start_element_name: str = "start",
end_element_name: str = "end",
) -> IntervalsDF:
"""
:param df:
Expand All @@ -211,14 +213,15 @@ def fromNestedBoundariesDF(cls,
# unpack the start & end elements
start_path = f"{nested_boundaries_col}.{start_element_name}"
end_path = f"{nested_boundaries_col}.{end_element_name}"
unpacked_boundaries_df = (df.withColumn(start_element_name, sfn.col(start_path))
.withColumn(end_element_name, sfn.col(end_path))
.drop(nested_boundaries_col))
unpacked_boundaries_df = (
df.withColumn(start_element_name, sfn.col(start_path))
.withColumn(end_element_name, sfn.col(end_path))
.drop(nested_boundaries_col)
)
# return the results as an IntervalsDF
return IntervalsDF(unpacked_boundaries_df,
start_element_name,
end_element_name,
series_ids)
return IntervalsDF(
unpacked_boundaries_df, start_element_name, end_element_name, series_ids
)

def __get_adjacent_rows(self, df: DataFrame) -> DataFrame:
"""
Expand Down
18 changes: 6 additions & 12 deletions python/tempo/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ def aggregate(
exprs = {x: "avg" for x in metricCols}
res = df.groupBy(groupingCols).agg(exprs)
agg_metric_cls = list(
set(res.columns).difference(
set(tsdf.series_ids + [tsdf.ts_col, "agg_key"])
)
set(res.columns).difference(set(tsdf.series_ids + [tsdf.ts_col, "agg_key"]))
)
new_cols = [
sfn.col(c).alias(
Expand All @@ -172,9 +170,7 @@ def aggregate(
exprs = {x: "min" for x in metricCols}
res = df.groupBy(groupingCols).agg(exprs)
agg_metric_cls = list(
set(res.columns).difference(
set(tsdf.series_ids + [tsdf.ts_col, "agg_key"])
)
set(res.columns).difference(set(tsdf.series_ids + [tsdf.ts_col, "agg_key"]))
)
new_cols = [
sfn.col(c).alias(
Expand All @@ -187,9 +183,7 @@ def aggregate(
exprs = {x: "max" for x in metricCols}
res = df.groupBy(groupingCols).agg(exprs)
agg_metric_cls = list(
set(res.columns).difference(
set(tsdf.series_ids + [tsdf.ts_col, "agg_key"])
)
set(res.columns).difference(set(tsdf.series_ids + [tsdf.ts_col, "agg_key"]))
)
new_cols = [
sfn.col(c).alias(
Expand Down Expand Up @@ -245,9 +239,9 @@ def aggregate(
metrics.append(col[0])

if fill:
res = imputes.join(
res, tsdf.series_ids + [tsdf.ts_col], "leftouter"
).na.fill(0, metrics)
res = imputes.join(res, tsdf.series_ids + [tsdf.ts_col], "leftouter").na.fill(
0, metrics
)

return res

Expand Down
64 changes: 31 additions & 33 deletions python/tempo/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@


def vwap(
tsdf: TSDF,
frequency: str = "m",
volume_col: str = "volume",
price_col: str = "price",
tsdf: TSDF,
frequency: str = "m",
volume_col: str = "volume",
price_col: str = "price",
) -> TSDF:
# set pre_vwap as self or enrich with the frequency
pre_vwap = tsdf.df
Expand Down Expand Up @@ -72,9 +72,7 @@ def EMA(tsdf: TSDF, colName: str, window: int = 30, exp_factor: float = 0.2) ->
for i in range(window):
lagColName = "_".join(["lag", colName, str(i)])
weight = exp_factor * (1 - exp_factor) ** i
df = df.withColumn(
lagColName, weight * sfn.lag(sfn.col(colName), i).over(w)
)
df = df.withColumn(lagColName, weight * sfn.lag(sfn.col(colName), i).over(w))
df = df.withColumn(
emaColName,
sfn.col(emaColName)
Expand All @@ -88,11 +86,11 @@ def EMA(tsdf: TSDF, colName: str, window: int = 30, exp_factor: float = 0.2) ->


def withLookbackFeatures(
tsdf: TSDF,
feature_cols: List[str],
lookback_window_size: int,
exact_size: bool = True,
feature_col_name: str = "features",
tsdf: TSDF,
feature_cols: List[str],
lookback_window_size: int,
exact_size: bool = True,
feature_col_name: str = "features",
) -> TSDF:
"""
Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of
Expand Down Expand Up @@ -130,10 +128,10 @@ def withLookbackFeatures(


def withRangeStats(
tsdf: TSDF,
type: str = "range",
cols_to_summarize: Optional[List[Column]] = None,
range_back_window_secs: int = 1000,
tsdf: TSDF,
type: str = "range",
cols_to_summarize: Optional[List[Column]] = None,
range_back_window_secs: int = 1000,
) -> TSDF:
"""
Create a wider set of stats based on all numeric columns by default
Expand Down Expand Up @@ -172,8 +170,8 @@ def withRangeStats(
selected_cols.append(sfn.stddev(metric).over(w).alias("stddev_" + metric))
derived_cols.append(
(
(sfn.col(metric) - sfn.col("mean_" + metric))
/ sfn.col("stddev_" + metric)
(sfn.col(metric) - sfn.col("mean_" + metric))
/ sfn.col("stddev_" + metric)
).alias("zscore_" + metric)
)
selected_df = tsdf.df.select(*selected_cols)
Expand All @@ -185,9 +183,9 @@ def withRangeStats(


def withGroupedStats(
tsdf: TSDF,
metric_cols: Optional[List[str]] = None,
freq: Optional[str] = None,
tsdf: TSDF,
metric_cols: Optional[List[str]] = None,
freq: Optional[str] = None,
) -> TSDF:
"""
Create a wider set of stats based on all numeric columns by default
Expand All @@ -214,8 +212,8 @@ def withGroupedStats(
datatype[0]
for datatype in tsdf.df.dtypes
if (
(datatype[1] in summarizable_types)
and (datatype[0].lower() not in prohibited_cols)
(datatype[1] in summarizable_types)
and (datatype[0].lower() not in prohibited_cols)
)
]

Expand Down Expand Up @@ -255,10 +253,10 @@ def withGroupedStats(


def calc_bars(
tsdf: TSDF,
freq: str,
metric_cols: Optional[List[str]] = None,
fill: Optional[bool] = None,
tsdf: TSDF,
freq: str,
metric_cols: Optional[List[str]] = None,
fill: Optional[bool] = None,
) -> TSDF:
resample_open = tsdf.resample(
freq=freq, func="floor", metricCols=metric_cols, prefix="open", fill=fill
Expand All @@ -279,19 +277,19 @@ def calc_bars(
.join(resample_low.df, join_cols)
.join(resample_close.df, join_cols)
)
non_part_cols = set(bars.columns) - set(resample_open.series_ids) - {resample_open.ts_col}
non_part_cols = (
set(bars.columns) - set(resample_open.series_ids) - {resample_open.ts_col}
)
sel_and_sort = (
resample_open.series_ids + [resample_open.ts_col] + sorted(non_part_cols)
resample_open.series_ids + [resample_open.ts_col] + sorted(non_part_cols)
)
bars = bars.select(sel_and_sort)

return TSDF(bars, ts_col=resample_open.ts_col, series_ids=resample_open.series_ids)


def fourier_transform(
tsdf: TSDF,
timestep: Union[int, float, complex],
value_col: str
tsdf: TSDF, timestep: Union[int, float, complex], value_col: str
) -> TSDF:
"""
Function to fourier transform the time series to its frequency domain representation.
Expand All @@ -304,7 +302,7 @@ def fourier_transform(
"""

def tempo_fourier_util(
pdf: pd.DataFrame,
pdf: pd.DataFrame,
) -> pd.DataFrame:
"""
This method is a vanilla python logic implementing fourier transform on a numpy array using the scipy module.
Expand Down
Loading

0 comments on commit 88bb03b

Please sign in to comment.