-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathps-clock-corrections.py
243 lines (195 loc) · 8.46 KB
/
ps-clock-corrections.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
from elasticsearch import Elasticsearch, helpers
# from collections import Counter
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
# import time
# import matplotlib.pyplot as plt
import json
from alarms import alarms
# ### loading the data (owd, src/dest pair of nodes) in the last 24 h
tend = datetime.now()
tstart = tend - timedelta(days=1)
start = pd.Timestamp(tstart)
end = pd.Timestamp(tend)
dateFrom = datetime.strftime(start, '%Y-%m-%d %H:%M')
dateTo = datetime.strftime(end, '%Y-%m-%d %H:%M')
with open('/config/config.json') as json_data:
config = json.load(json_data,)
es = Elasticsearch(
hosts=[{'host': config['ES_HOST'], 'port':9200, 'scheme':'https'}],
http_auth=(config['ES_USER'], config['ES_PASS']),
request_timeout=60)
es.ping()
my_query = {
'_source': ['delay_mean', 'dest_host', 'src_host', 'src_netsite', 'dest_netsite'],
'query': {
'bool': {
'must': [{
'range': {
'timestamp': {
'gte': start.strftime('%Y%m%dT%H%M00Z'),
'lt': end.strftime('%Y%m%dT%H%M00Z')}
}
}
]
}
}
}
res = helpers.scan(client=es, index="ps_owd", query=my_query)
count = 0 # tests
delay_mean = []
dest_host = []
src_host = []
src_site, dest_site = [], []
for r in res:
delay_mean.append(r.get('_source').get('delay_mean'))
dest_host.append(r.get('_source').get('dest_host'))
src_host.append(r.get('_source').get('src_host'))
src_site.append(r.get('_source').get('src_netsite'))
dest_site.append(r.get('_source').get('dest_netsite'))
count += 1
if not count % 100000:
print(count)
df = pd.DataFrame({'delay_mean': delay_mean, 'src_host': src_host, 'dest_host': dest_host,
'src_site': src_site, 'dest_site': dest_site})
# prepare to tag sites as well
ddf = df[['src_host', 'dest_host', 'src_site', 'dest_site']].drop_duplicates()
# ### plotting the histogram, providing some basic stats
# fig, ax = plt.subplots()
# delay_hist = plt.hist(df.delay_mean, bins=60, range=(
# min(df['delay_mean']), max(df['delay_mean'])))
# ax.set_yscale('log')
# print(df.shape)
# print(df.describe())
# mean = df.delay_mean.mean()
# variance_orig = df.loc[:, 'delay_mean'].var()
# print(variance_orig)
# print('minimum delay_mean is: ', min(
# df['delay_mean']), 'maximum delay_mean is', max(df['delay_mean']))
# ### filtering out the hosts with too high OWDs
bad_hosts_df = df[(df['delay_mean'] <= -100000) | (df['delay_mean'] >= 100000)]
list_of_hosts_with_bad_measurements = []
while not bad_hosts_df.empty:
# get list of hosts with most bad measurements
sh = bad_hosts_df.src_host.value_counts()
dh = bad_hosts_df.dest_host.value_counts()
# sum = lambda sh, dh: np.nansum(sh + dh)
sum = sh.add(dh, fill_value=0).sort_values(ascending=False)
host_to_remove = sum.index[0]
# print(host_to_remove)
list_of_hosts_with_bad_measurements.append(host_to_remove)
bad_hosts_df = bad_hosts_df[(bad_hosts_df['src_host'] == host_to_remove)]
bad_hosts_df = bad_hosts_df[(bad_hosts_df['dest_host'] == host_to_remove)]
# print(bad_hosts_df)
# print("List of hosts with bad measurements:",
# list_of_hosts_with_bad_measurements)
if len(list_of_hosts_with_bad_measurements):
ALARM = alarms('Networking', 'Infrastructure', 'bad owd measurements')
for bh in list_of_hosts_with_bad_measurements:
# add site names to the list of tags
site = ''
if not ddf[ddf['src_host'] == bh].empty:
site = ddf[ddf['src_host'] == bh]['src_site'].values[0]
else:
site = ddf[ddf['dest_host'] == bh]['dest_site'].values[0]
tags = [bh, site.upper()] if site is not None else [bh]
ALARM.addAlarm(body=bh, tags=tags)
# ### removing hosts with too high measurements
for node in list_of_hosts_with_bad_measurements:
df = df[(df.src_host != node) & (df.dest_host != node)]
# print('remaining rows:', df.shape[0])
# ### removing one sided nodes and getting the final dataframe to work with
all_nodes = np.unique(df[['src_host', 'dest_host']].values)
sc_nodes = np.unique(df['src_host'].values)
ds_nodes = np.unique(df['dest_host'].values)
one_sided_nodes = list(set(sc_nodes).symmetric_difference(ds_nodes))
# print('one sided nodes: ', one_sided_nodes)
# removes one sided nodes from all nodes
correctable_nodes = np.setdiff1d(all_nodes, one_sided_nodes)
# print('one sided nodes', len(one_sided_nodes))
# print('correctable nodes ', len(correctable_nodes))
for node in one_sided_nodes:
df = df[(df.src_host != node) & (df.dest_host != node)]
# print('remaining rows:', df.shape[0])
# print('minimum delay_mean is: ', min(
# df['delay_mean']), 'maximum delay_mean is', max(df['delay_mean']))
# print(df['delay_mean'].var())
# ### creating a new dataframe with the corrections node-wise
dfc = df.copy()
host_dict = {}
review = {'node': [], 'measurements': [],
'owd as source': [], 'owd as destination': []}
print('current variance:{:.2f}'.format(dfc.delay_mean.var()))
for node in correctable_nodes:
df_tmp = dfc[(dfc.src_host == node) | (dfc.dest_host == node)]
review["owd as source"].append(
df_tmp[df_tmp.src_host == node].delay_mean.mean())
review["owd as destination"].append(
df_tmp[df_tmp.dest_host == node].delay_mean.mean())
review["node"].append(node)
review["measurements"].append(df_tmp.shape[0])
df_rev = pd.DataFrame.from_dict(review)
df_rev['correction'] = (df_rev['owd as source']-df_rev['owd as destination'])/2
df_hosts = df_rev.drop(
['measurements', 'owd as source', 'owd as destination'], axis=1)
print(df_hosts)
df_hosts.correction.isna().sum()
# Creating alarms when correction required is larger than 100 ms.
df_corr = df_hosts[abs(df_hosts['correction']) > 100]
ALARM = alarms('Networking', 'Infrastructure', 'large clock correction')
for (node, correction) in df_corr.values:
# add site names to the list of tags
site = ''
if not ddf[ddf['src_host'] == node].empty:
site = ddf[ddf['src_host'] == node]['src_site'].values[0]
else:
site = ddf[ddf['dest_host'] == node]['dest_site'].values[0]
tags = [node, site.upper()] if site is not None else [node]
ALARM.addAlarm(
body=node+" "+str(correction),
tags=tags,
source={"node": node, "correction": correction, "from": dateFrom, "to": dateTo}
)
# print(df_hosts.shape, max(df_hosts.correction), min(df_hosts.correction))
# plt.hist(df_hosts.correction, range=(
# min(df_hosts['correction']), max(df_hosts['correction'])))
df = df.assign(dmc=df.delay_mean)
for (node, correction) in df_hosts.values:
df.loc[(df['src_host'] == node), 'dmc'] = df['dmc']-correction
df.loc[(df['dest_host'] == node), 'dmc'] = df['dmc']+correction
# print(df)
# print('variance after', df.dmc.var())
# print('variance before', df['delay_mean'].var())
# print('minimum delay_mean is: ', min(
# df['delay_mean']), 'maximum delay_mean is', max(df['delay_mean']))
# print('minimum delay_mean_corrected is: ', min(
# df['dmc']), 'maximum delay_mean_corrested is', max(df['dmc']))
# fig, ax = plt.subplots()
# delay_hist_corr = plt.hist(
# df.dmc, bins=60, range=(min(df['dmc']), max(df['dmc'])))
# ax.set_yscale('log')
# print(df.shape)
# df.dmc.describe()
# variance_corr = df.loc[:, 'dmc'].var()
# print(variance_corr)
# bins = np.linspace(min(df['delay_mean']), max(df['delay_mean']))
# fig, ax = plt.subplots(figsize=(10, 10))
# ax.set_yscale('log')
# plt.hist(df.delay_mean, bins=bins, histtype='step', label='messed up')
# plt.hist(df.dmc, bins=bins, histtype='step', label='corrected')
# plt.legend(loc='upper right')
# plt.show()
# removing one-sided nodes after the corrections are applied
for node in one_sided_nodes:
df = df[(df.src_host != node) & (df.dest_host != node)]
# print('remaining rows:', df.shape[0])
# bins = np.linspace(min(df['delay_mean']), max(df['delay_mean']))
# fig, ax = plt.subplots(figsize=(10, 10))
# ax.set_yscale('log')
# plt.hist(df.delay_mean, bins=bins, histtype='step', label='messed up')
# plt.hist(df.dmc, bins=bins, histtype='step', label='corrected')
# plt.legend(loc='upper right')
# plt.show()
# print(min(df['delay_mean']), max(df['delay_mean']))
# print(min(df['dmc']), max(df['dmc']))