|
| 1 | +import numpy as np |
| 2 | +import time |
| 3 | +from datetime import datetime, timedelta |
| 4 | + |
| 5 | +import csp |
| 6 | + |
| 7 | + |
| 8 | +class StatsBenchmarkSuite: |
| 9 | + def setup(self): |
| 10 | + self.st = datetime(2020, 1, 1) |
| 11 | + self.N = 10_000 |
| 12 | + self.ARRAY_SIZE = 100 |
| 13 | + self.TEST_TIMES = [self.st + timedelta(seconds=i) for i in range(self.N)] |
| 14 | + self.RANDOM_VALUES = [np.random.normal(size=(self.ARRAY_SIZE,)) for i in range(self.N)] # 100 element np array |
| 15 | + self.DATA = list(zip(self.TEST_TIMES, self.RANDOM_VALUES)) |
| 16 | + self.INTERVAL = 1000 |
| 17 | + self.NUM_SAMPLES = 100 |
| 18 | + |
| 19 | + def time_stats_qtl(self): |
| 20 | + def g_qtl(): |
| 21 | + data = csp.curve(typ=np.ndarray, data=self.DATA) |
| 22 | + median = csp.stats.median(data, interval=self.INTERVAL) |
| 23 | + csp.add_graph_output("final_median", median, tick_count=1) |
| 24 | + |
| 25 | + qtl_times = [] |
| 26 | + |
| 27 | + for _ in range(self.NUM_SAMPLES): |
| 28 | + start = time.time() |
| 29 | + csp.run(g_qtl, starttime=self.st, endtime=timedelta(seconds=self.N)) |
| 30 | + post_qtl = time.time() |
| 31 | + qtl_times.append(post_qtl - start) |
| 32 | + |
| 33 | + avg_med = sum(qtl_times) / self.NUM_SAMPLES |
| 34 | + print( |
| 35 | + f"Average time in {self.NUM_SAMPLES} tests for median with {self.N=}, {self.ARRAY_SIZE=}, {self.INTERVAL=}: {round(avg_med, 2)} s" |
| 36 | + ) |
| 37 | + return avg_med |
| 38 | + |
| 39 | + def time_stats_rank(self): |
| 40 | + def g_rank(self): |
| 41 | + data = csp.curve(typ=np.ndarray, data=self.DATA) |
| 42 | + rank = csp.stats.rank(data, interval=self.INTERVAL) |
| 43 | + csp.add_graph_output("final_rank", rank, tick_count=1) |
| 44 | + |
| 45 | + rank_times = [] |
| 46 | + |
| 47 | + for _ in range(self.NUM_SAMPLES): |
| 48 | + start = time.time() |
| 49 | + csp.run(g_rank, starttime=self.st, endtime=timedelta(seconds=self.stN)) |
| 50 | + post_rank = time.time() |
| 51 | + rank_times.append(post_rank - start) |
| 52 | + |
| 53 | + avg_rank = sum(rank_times) / self.NUM_SAMPLES |
| 54 | + print( |
| 55 | + f"Average time in {self.NUM_SAMPLES} tests for rank with {self.N=}, {self.ARRAY_SIZE=}, {self.INTERVAL=}: {round(avg_rank, 2)} s" |
| 56 | + ) |
| 57 | + return avg_rank |
0 commit comments