This repository has been archived by the owner on Jun 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmip_io2.py
224 lines (204 loc) · 7.35 KB
/
smip_io2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Rewrite of smip_io using a class"""
from concurrent.futures import Future, as_completed
from datetime import datetime
from typing import List, cast
import jwt
import requests
from pandas import date_range
from requests_futures.sessions import FuturesSession
# GraphQL mutation to generate a challenge for user
MUTATION_CHALLENGE = """
mutation Challenge($authenticator: String, $role: String, $userName: String) {
authenticationRequest(
input: {
authenticator: $authenticator,
role: $role,
userName: $userName
}
) {
jwtRequest {
challenge
}
}
}"""
# GraphQL mutation to generate token using challenge and password
MUTATION_TOKEN = """
mutation Token($authenticator: String, $signedChallenge: String) {
authenticationValidation(
input: {
authenticator: $authenticator,
signedChallenge: $signedChallenge
}
) {
jwtClaim
}
}"""
# GraphQL mutation to add data to SMIP
MUTATION_ADDDATA = """
mutation AddData($id: BigInt, $entries: [TimeSeriesEntryInput]) {
replaceTimeSeriesRange(
input: {
attributeOrTagId: $id,
entries: $entries
}
) {
json
}
}
"""
# GraphQL query to clear data from SMIP
MUTATION_CLEARDATA = """
mutation ClearData($startTime: Datetime, $endTime: Datetime, $id: BigInt) {
replaceTimeSeriesRange(
input: {
endTime: $endTime
startTime: $startTime
attributeOrTagId: $id,
}
) {
json
}
}
"""
# GraphQL query to get data from SMIP
QUERY_GETDATA = """
query GetData($startTime: Datetime, $endTime: Datetime, $ids: [BigInt]) {
getRawHistoryDataWithSampling(
endTime: $endTime
startTime: $startTime
ids: $ids
maxSamples: 0
) {
floatvalue
ts
id
}
}
"""
class SMIP:
def __init__(self, endpoint: str, authenticator: str, role: str, userName: str, password: str) -> None:
self.__endpoint = endpoint
self.__session = requests.Session()
self.__futureSession = FuturesSession(session=self.__session)
self.__authenticator = authenticator
self.__role = role
self.__userName = userName
self.__password = password
self.token = self.get_token()
def get_token(self) -> str:
"""Posts GraphQL mutations to get an auth token."""
r = self.__session.post(self.__endpoint, json={
"query": MUTATION_CHALLENGE,
"variables": {
"authenticator": self.__authenticator,
"role": self.__role,
"userName": self.__userName
}
})
r.raise_for_status()
challenge = r.json()[
'data']['authenticationRequest']['jwtRequest']['challenge']
r = self.__session.post(self.__endpoint, json={
"query": MUTATION_TOKEN,
"variables": {
"authenticator": self.__authenticator,
"signedChallenge": challenge + '|' + self.__password
}
})
r.raise_for_status()
token = r.json()['data']['authenticationValidation']['jwtClaim']
return token
def update_token(self) -> bool:
"""Helper function to check if a token is valid and updates it if not.
Returns True if token is valid, False if token was updated.
"""
try:
jwt.decode(self.token, algorithms="HS256", options={
"verify_signature": False, "verify_exp": True})
return True
except:
self.token = self.get_token()
return False
def add_data(self, id: int, entries: List[dict], timeout: float = None, async_mode: bool = False) -> requests.Response:
"""Sends timeseries to SMIP."""
self.update_token()
s = self.__futureSession if async_mode else self.__session
json = {
"query": MUTATION_ADDDATA,
"variables": {
"id": id,
"entries": entries
}
}
headers = {"Authorization": f"Bearer {self.token}"}
r = s.post(
self.__endpoint, json=json, headers=headers, timeout=timeout)
return r
@staticmethod
def batcher(toSplit, n: int = 1000):
"""Yields generator that splits long list into chunks of length n."""
l = len(toSplit)
for ndx in range(0, l, n):
yield toSplit[ndx:min(ndx + n, l)]
def add_data_serial(self, id: int, entries: List[dict], timeout: float = None) -> List[requests.Response]:
"""Breaks up timeseries into chunks of 8000 and uploads serially, returns a list of Responses."""
resp_list = [self.add_data(id, batch, timeout)
for batch in self.batcher(entries, 8000)]
for r in resp_list:
r.raise_for_status()
return resp_list
def add_data_async(self, id: int, entries: List[dict], timeout: float = None) -> List[requests.Response]:
"""Breaks up timeseries into chunks of 1000 and uploads asynchronously, returns a list of Responses."""
post = [self.add_data(id, batch, timeout, async_mode=True)
for batch in self.batcher(entries)]
post = cast(List[Future], post)
resp_list = [cast(requests.Response, future.result())
for future in as_completed(post)]
for r in resp_list:
r.raise_for_status()
return resp_list
def add_data_from_ts(self, id: int, entries: List, startTime: datetime, freq: float, timeout: float = None, async_mode=True) -> List[requests.Response]:
"""Calculates timestamps from start time and frequency, then uploads. Returns a list of Responses."""
add = self.add_data_async if async_mode else self.add_data_serial
time_range = date_range(
start=startTime, periods=len(entries), freq=f'{1/freq}S')
data = [{'timestamp': ts.isoformat(),
'value': str(val).strip(),
'status': 0} for ts, val in zip(time_range, entries)]
return add(id=id, entries=data, timeout=timeout)
def clear_data(self, start_time: str, end_time: str, id: int, timeout: float = None) -> requests.Response:
"""Clears timeseries from SMIP."""
self.update_token()
json = {
"query": MUTATION_CLEARDATA,
"variables": {
"endTime": end_time,
"startTime": start_time,
"id": id
}
}
headers = {"Authorization": f"Bearer {self.token}"}
r = self.__session.post(
self.__endpoint, json=json, headers=headers, timeout=timeout)
r.raise_for_status()
return r
def get_data(self, start_time: str, end_time: str, ids: List[int], timeout: float = None) -> requests.Response:
"""Gets timeseries from SMIP."""
self.update_token()
json = {
"query": QUERY_GETDATA,
"variables": {
"endTime": end_time,
"startTime": start_time,
"ids": ids
}
}
headers = {"Authorization": f"Bearer {self.token}"}
r = self.__session.post(
self.__endpoint, json=json, headers=headers, timeout=timeout)
r.raise_for_status()
return r
if __name__ == '__main__':
conn = SMIP("https://smtamu.cesmii.net/graphql", "test",
"smtamu_group", "parthdave", "parth1234")
print(conn.token)