Skip to content

Commit

Permalink
multiple metrics fix (#159)
Browse files Browse the repository at this point in the history
* multiple metrics fix

* add docs

* add docs

* add docs

* add docs

* add docs
  • Loading branch information
rportilla-databricks authored Feb 25, 2022
1 parent efdf034 commit d150a19
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 12 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,21 @@ interpolated_tsdf = input_tsdf.interpolate(

```

#### 8 - Grouped Stats by Frequency

Group by partition columns and a frequency to get the minimum, maximum, count, mean, standard deviation, and sum for all or some subset of numeric columns.

###### Parameters:

`freq` = (required) Frequency at which the grouping should take place - acceptable parameters are strings of the form "1 minute", "40 seconds", etc.

`metricCols` = (optional) List of columns to compute metrics for. These should be numeric columns. If this is not supplied, this method will compute stats on all numeric columns in the TSDF

```python
grouped_stats = watch_accel_tsdf.withGroupedStats(metricCols = ["y"], freq="1 minute")
display(grouped_stats)
```

## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Expand Down
16 changes: 16 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,22 @@ interpolated_tsdf = input_tsdf.interpolate(

```

#### 8 - Grouped Stats by Frequency

Group by partition columns and a frequency to get the minimum, maximum, count, mean, standard deviation, and sum for all or some subset of numeric columns.

###### Parameters:

`freq` = (required) Frequency at which the grouping should take place - acceptable parameters are strings of the form "1 minute", "40 seconds", etc.

`metricCols` = (optional) List of columns to compute metrics for. These should be numeric columns. If this is not supplied, this method will compute stats on all numeric columns in the TSDF

```python
grouped_stats = watch_accel_tsdf.withGroupedStats(metricCols = ["y"], freq="1 minute")
display(grouped_stats)
```


## Project Support
Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects.

Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setuptools.setup(
name='dbl-tempo',
version='0.1.8',
version='0.1.9',
author='Ricardo Portilla, Tristan Nixon, Max Thone, Sonali Guleria',
author_email='[email protected]',
description='Spark Time Series Utility Package',
Expand Down
4 changes: 2 additions & 2 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ def withGroupedStats(self, metricCols=[], freq = None):

# compute column summaries
selectedCols = []
reduce(lambda selectedCols, metric:
selectedCols.extend([f.mean(f.col(metric)).alias('mean_' + metric), f.count(f.col(metric)).alias('count_' + metric), f.min(f.col(metric)).alias('min_' + metric), f.max(f.col(metric)).alias('max_' + metric), f.sum(f.col(metric)).alias('sum_' + metric), f.stddev(f.col(metric)).alias('stddev_' + metric)]), metricCols, selectedCols)
for metric in metricCols:
selectedCols.extend([f.mean(f.col(metric)).alias('mean_' + metric), f.count(f.col(metric)).alias('count_' + metric), f.min(f.col(metric)).alias('min_' + metric), f.max(f.col(metric)).alias('max_' + metric), f.sum(f.col(metric)).alias('sum_' + metric), f.stddev(f.col(metric)).alias('stddev_' + metric)])

selected_df = self.df.groupBy(self.partitionCols + [agg_window]).agg(*selectedCols)
summary_df = selected_df.select(*selected_df.columns).withColumn(self.ts_col, f.col('window').start).drop('window')
Expand Down
25 changes: 16 additions & 9 deletions python/tests/tsdf_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ def test_group_stats(self):
"""Test of range stats for 20 minute rolling window"""
schema = StructType([StructField("symbol", StringType()),
StructField("event_ts", StringType()),
StructField("trade_pr", FloatType())])
StructField("trade_pr", FloatType()),
StructField("index", IntegerType())])

expectedSchema = StructType([StructField("symbol", StringType()),
StructField("event_ts", StringType()),
Expand All @@ -514,16 +515,22 @@ def test_group_stats(self):
StructField("min_trade_pr", FloatType()),
StructField("max_trade_pr", FloatType()),
StructField("sum_trade_pr", FloatType()),
StructField("stddev_trade_pr", FloatType())])

data = [["S1", "2020-08-01 00:00:10", 349.21],
["S1", "2020-08-01 00:00:33", 351.32],
["S1", "2020-09-01 00:02:10", 361.1],
["S1", "2020-09-01 00:02:49", 362.1]]
StructField("stddev_trade_pr", FloatType()),
StructField("mean_index", IntegerType()),
StructField("count_index", IntegerType(), nullable=False),
StructField("min_index", IntegerType()),
StructField("max_index", IntegerType()),
StructField("sum_index", IntegerType()),
StructField("stddev_index", IntegerType())])

data = [["S1", "2020-08-01 00:00:10", 349.21, 1],
["S1", "2020-08-01 00:00:33", 351.32, 1],
["S1", "2020-09-01 00:02:10", 361.1, 1],
["S1", "2020-09-01 00:02:49", 362.1, 1]]

expected_data = [
["S1", "2020-08-01 00:00:00", 350.26, 2, 349.21, 351.32, 700.53, 1.49],
["S1", "2020-09-01 00:02:00", 361.6, 2, 361.1, 362.1, 723.2, 0.71]]
["S1", "2020-08-01 00:00:00", 350.26, 2, 349.21, 351.32, 700.53, 1.49, 1, 2, 1, 1, 2, 0],
["S1", "2020-09-01 00:02:00", 361.6, 2, 361.1, 362.1, 723.2, 0.71, 1, 2, 1, 1, 2, 0]]

# construct dataframes
df = self.buildTestDF(schema, data)
Expand Down

0 comments on commit d150a19

Please sign in to comment.