This repository has been archived by the owner on Mar 18, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
data.py
114 lines (91 loc) · 3.75 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
from functools import partial
from itertools import chain
import json
import time
import numpy
import pandas as pd
from pandas.tseries.frequencies import to_offset
def round_timestamp(t, freq):
"round a Timestamp to a specified frequency"
return round(t.value/freq.delta.value)*freq.delta.value
def resample(df, freq=None):
"""
Resample helper
For display purposes it may make sense to downsample the data
to some reasonable time slice, e.g 5 minutes.
"freq" should be a string on the form 30s, 15m etc
"""
# Here we calculate a new column which is the microsecond timestamp.
# Datashader does not (yet) support datetime axes.
df.loc[:, "t"] = pd.Series(
(df["data_time"].astype("int64") // 1000 # datetime64[ns]
+ df["data_time_us"]),
index=df.index)
if not freq:
return df
# Pandas uses some weird conventions for time intervals, let's
# translate (TODO: there may be more of these)
if freq.endswith("ms"):
freq = freq.replace("ms", "L")
elif freq.endswith("s"):
freq = freq.replace("s", "S")
elif freq.endswith("m"):
freq = freq.replace("m", "T")
# TODO: perhaps this is not really necessary, downsampling on ms frequence
# seems pretty useless... can we get away without creating a new index?
df.index = pd.to_datetime(df["t"], unit="us")
return df.groupby(partial(round_timestamp, freq=to_offset(freq))).mean()
def render_data_csv(request, data):
"Render data when the client requests text format, e.g. Accept:text/csv"
return "\n".join("{}\n{}".format(
name, "".join(df.to_csv(columns=["t", "value_r"], index=False,
sep=",", header=["t[us]", "value_r"])))
for name, df in data.items()).encode()
def render_data_json(request, data):
"Renderer for when the client wants json, i.e. 'Accept:application/json'"
# The output follows the Grafana data source format, see
# http://docs.grafana.org/plugins/developing/datasources/
# Note: this implementation will potentially use a lot of memory
# as it will create two new representations of the data in
# memory...
for _name, df in data.items():
df.replace({numpy.nan: None}, inplace=True)
return json.dumps([
{
"target": name,
"datapoints": list(zip(df["value_r"],
df["t"].astype("float") / 1000))
}
for name, df in data.items()
]).encode()
async def get_data(hdbpp, attributes, time_range, interval=None,
restrict_time=False):
"""
Fetch data for all the given attributes over the time range. It
allows restricting the time range exactly, otherwise it will
always return entire days of data. It also allows an optional
interval for resampling the data.
Note: this is a coroutine, and so must be run in an asyncio
event loop!
"""
t0, t1 = time_range
futures = [hdbpp.get_attribute_data(attribute.lower(), t0, t1)
for attribute in attributes]
# Fetch all the attributes in parallel.
# TODO I'm not sure if this is good or not, as it could lead to a lot of parallel
# queries. But at least each attribute is fetched in chunks. Something to tweak.
#results = await asyncio.gather(*futures)
results = [await fut for fut in futures]
if restrict_time:
return {
attr: resample(res[(t0 <= res["data_time"])
& (res["data_time"] <= t1)], interval)
for attr, res in zip(attributes, results)
if res is not None
}
return {
attr: resample(res, interval)
for attr, res in zip(attributes, results)
if res is not None
}