diff --git a/README.md b/README.md index 743f735c..d628748a 100644 --- a/README.md +++ b/README.md @@ -254,6 +254,21 @@ interpolated_tsdf = input_tsdf.interpolate( ``` +#### 8 - Grouped Stats by Frequency + +Group by partition columns and a frequency to get the minimum, maximum, count, mean, standard deviation, and sum for all or some subset of numeric columns. + +###### Parameters: + +`freq` = (required) Frequency at which the grouping should take place - acceptable parameters are strings of the form "1 minute", "40 seconds", etc. + +`metricCols` = (optional) List of columns to compute metrics for. These should be numeric columns. If this is not supplied, this method will compute stats on all numeric columns in the TSDF + +```python +grouped_stats = watch_accel_tsdf.withGroupedStats(metricCols = ["y"], freq="1 minute") +display(grouped_stats) +``` + ## Project Support Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects. diff --git a/python/README.md b/python/README.md index 32ff101c..5a0a6af0 100644 --- a/python/README.md +++ b/python/README.md @@ -232,6 +232,22 @@ interpolated_tsdf = input_tsdf.interpolate( ``` +#### 8 - Grouped Stats by Frequency + +Group by partition columns and a frequency to get the minimum, maximum, count, mean, standard deviation, and sum for all or some subset of numeric columns. + +###### Parameters: + +`freq` = (required) Frequency at which the grouping should take place - acceptable parameters are strings of the form "1 minute", "40 seconds", etc. + +`metricCols` = (optional) List of columns to compute metrics for. These should be numeric columns. If this is not supplied, this method will compute stats on all numeric columns in the TSDF + +```python +grouped_stats = watch_accel_tsdf.withGroupedStats(metricCols = ["y"], freq="1 minute") +display(grouped_stats) +``` + + ## Project Support Please note that all projects in the /databrickslabs github account are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of these projects. diff --git a/python/setup.py b/python/setup.py index 29d3951f..2754808f 100644 --- a/python/setup.py +++ b/python/setup.py @@ -6,7 +6,7 @@ setuptools.setup( name='dbl-tempo', - version='0.1.8', + version='0.1.9', author='Ricardo Portilla, Tristan Nixon, Max Thone, Sonali Guleria', author_email='labs@databricks.com', description='Spark Time Series Utility Package', diff --git a/python/tempo/tsdf.py b/python/tempo/tsdf.py index 4dab294a..1742b22b 100644 --- a/python/tempo/tsdf.py +++ b/python/tempo/tsdf.py @@ -623,8 +623,8 @@ def withGroupedStats(self, metricCols=[], freq = None): # compute column summaries selectedCols = [] - reduce(lambda selectedCols, metric: - selectedCols.extend([f.mean(f.col(metric)).alias('mean_' + metric), f.count(f.col(metric)).alias('count_' + metric), f.min(f.col(metric)).alias('min_' + metric), f.max(f.col(metric)).alias('max_' + metric), f.sum(f.col(metric)).alias('sum_' + metric), f.stddev(f.col(metric)).alias('stddev_' + metric)]), metricCols, selectedCols) + for metric in metricCols: + selectedCols.extend([f.mean(f.col(metric)).alias('mean_' + metric), f.count(f.col(metric)).alias('count_' + metric), f.min(f.col(metric)).alias('min_' + metric), f.max(f.col(metric)).alias('max_' + metric), f.sum(f.col(metric)).alias('sum_' + metric), f.stddev(f.col(metric)).alias('stddev_' + metric)]) selected_df = self.df.groupBy(self.partitionCols + [agg_window]).agg(*selectedCols) summary_df = selected_df.select(*selected_df.columns).withColumn(self.ts_col, f.col('window').start).drop('window') diff --git a/python/tests/tsdf_tests.py b/python/tests/tsdf_tests.py index 233ef04a..3ed790c8 100644 --- a/python/tests/tsdf_tests.py +++ b/python/tests/tsdf_tests.py @@ -505,7 +505,8 @@ def test_group_stats(self): """Test of range stats for 20 minute rolling window""" schema = StructType([StructField("symbol", StringType()), StructField("event_ts", StringType()), - StructField("trade_pr", FloatType())]) + StructField("trade_pr", FloatType()), + StructField("index", IntegerType())]) expectedSchema = StructType([StructField("symbol", StringType()), StructField("event_ts", StringType()), @@ -514,16 +515,22 @@ def test_group_stats(self): StructField("min_trade_pr", FloatType()), StructField("max_trade_pr", FloatType()), StructField("sum_trade_pr", FloatType()), - StructField("stddev_trade_pr", FloatType())]) - - data = [["S1", "2020-08-01 00:00:10", 349.21], - ["S1", "2020-08-01 00:00:33", 351.32], - ["S1", "2020-09-01 00:02:10", 361.1], - ["S1", "2020-09-01 00:02:49", 362.1]] + StructField("stddev_trade_pr", FloatType()), + StructField("mean_index", IntegerType()), + StructField("count_index", IntegerType(), nullable=False), + StructField("min_index", IntegerType()), + StructField("max_index", IntegerType()), + StructField("sum_index", IntegerType()), + StructField("stddev_index", IntegerType())]) + + data = [["S1", "2020-08-01 00:00:10", 349.21, 1], + ["S1", "2020-08-01 00:00:33", 351.32, 1], + ["S1", "2020-09-01 00:02:10", 361.1, 1], + ["S1", "2020-09-01 00:02:49", 362.1, 1]] expected_data = [ - ["S1", "2020-08-01 00:00:00", 350.26, 2, 349.21, 351.32, 700.53, 1.49], - ["S1", "2020-09-01 00:02:00", 361.6, 2, 361.1, 362.1, 723.2, 0.71]] + ["S1", "2020-08-01 00:00:00", 350.26, 2, 349.21, 351.32, 700.53, 1.49, 1, 2, 1, 1, 2, 0], + ["S1", "2020-09-01 00:02:00", 361.6, 2, 361.1, 362.1, 723.2, 0.71, 1, 2, 1, 1, 2, 0]] # construct dataframes df = self.buildTestDF(schema, data)