-
Notifications
You must be signed in to change notification settings - Fork 1
/
userdefined.py
334 lines (284 loc) · 8.73 KB
/
userdefined.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#*********************************************************************************************#
#*********************************************************************************************#
#******************witten by Neil Hao([email protected]), 2013/08/09*************************#
#*********************************************************************************************#
#*********************************************************************************************#
#*********************************************************************************************#
from StringIO import StringIO
from operator import itemgetter
from base import *
from analyser import *
from factory import *
from errorlog import *
SUCC_STATUS = [ 200, 206, 304 ]
class BurstHelper( AnalyserHelper ):
def __init__( self ):
super(BurstHelper, self).__init__()
self.lastReqMap = dict()
self.hitMap = dict()
self.totalHit = 0
def get_value( self, logInfo ):
return logInfo
def init_value( self, value ):
return (0, 0)
def update_value( self, oldValue, sampleValue ):
logInfo = sampleValue
url = self.__get_url_no_query( logInfo.urlAll )
description = logInfo.description
servTime = logInfo.servTime / 1000000.0
lastLogInfo = None
lastServTime = None
if url in self.lastReqMap:
lastLogInfo = self.lastReqMap[url]
lastServTime = lastLogInfo.servTime / 1000000.0
hitIssue = False
if url in self.hitMap:
hitIssue = self.hitMap[url]
isFirstHit = False
#print hitIssue, lastServTime, servTime
if hitIssue:
#check if issue end
endTime = lastLogInfo.recvdTime + lastServTime
curTime = logInfo.recvdTime
#print curTime, endTime
#print str_seconds(curTime), str_seconds(endTime)
if curTime >= endTime or description.find('HIT') >= 0:
hitIssue = False
elif description.startswith( 'TCP_REFRESH_MISS' ):
#check if we hit issue the first time
if lastLogInfo is not None:
endTime = lastLogInfo.recvdTime + lastServTime
curTime = logInfo.recvdTime
if curTime < endTime:
self.totalHit += 1
if (self.totalHit%100) == 0:
print '*****************hit ' + str(self.totalHit) + 'th time*****************'
print lastLogInfo.originLine
print logInfo.originLine
print ''
isFirstHit = True
hitIssue = True
#update last log info if the current logInfo has greater end time
if not hitIssue:
if not self.__is_err_response(logInfo.status):
if lastLogInfo is None:
self.lastReqMap[url] = logInfo
else:
endTime = lastLogInfo.recvdTime + lastServTime
curEndTime = logInfo.recvdTime + servTime
if curEndTime > endTime:
self.lastReqMap[url] = logInfo
#print self.lastReqMap[url]
#print ''
else:
#update the sample value
hitCount = oldValue[1] + 1
marks = oldValue[0]
if isFirstHit:
marks += 1
oldValue = (marks, hitCount)
#update the issue map
self.hitMap[url] = hitIssue
return oldValue
def exclude_value( self, value ):
return False
def str_value( self, value ):
split = self.get_split()
return str(value[0]) + split + str(value[1])
def __get_url_no_query( self, urlAll ):
idx = urlAll.find( '?' )
if idx < 0:
return urlAll
return urlAll[0:idx]
def __is_err_response( self, status ):
return status not in SUCC_STATUS
class ThressSecHelper( AnalyserHelper ):
def __init__( self ):
super(ThressSecHelper, self).__init__()
self.workerMap = dict()
def get_value( self, logInfo ):
return logInfo
def init_value( self, value ):
return list()
def update_value( self, oldValue, sampleValue ):
logInfo = sampleValue
line = logInfo.originLine
urllog = 'HTTPRequestReader.cpp:516'
cancellog = 'Cancelling original feed'
time = parse_errlog_time( line )
wid = parse_errlog_workerid( line )
wid = str(wid)
if line.find(urllog) > 0:
url = self.__parse_url( line )
if url is None:
return oldValue
if url.find( '.m3u8' ) > 0:
self.__add_url_item( time, wid, url )
elif line.find(cancellog) > 0:
item = self.__find_url_item( time, wid )
if item is not None:
oldValue.append( item )
#print time, item
return oldValue
def __parse_url( self, line ):
idx = line.find( 'http://' )
if idx < 0:
return None
nidx = line.find( ']', idx )
if nidx < 0:
return None
return line[idx:nidx]
def __add_url_item( self, time, wid, url ):
itemList = None
if wid not in self.workerMap:
itemList = list()
else:
itemList = self.workerMap[wid]
itemList.append( (time, url) )
self.workerMap[wid] = itemList
def __find_url_item( self, time, wid ):
if wid not in self.workerMap:
return None
itemList = self.workerMap[wid]
size = len(itemList)
idx = size - 1
while idx >= 0:
(urlTime, url) = itemList[idx]
#print time, urlTime, url
diff = abs(time - urlTime - 3)
if diff <= 0.5:
#we find it
break
idx -= 1
item = None
if idx >= 0:
item = itemList.pop( idx )
return item
def exclude_value( self, value ):
if value is None:
return True
return False
def str_value( self, value ):
ilist = value
bufio = StringIO()
bufio.write( '\n' )
for item in ilist:
(time, url) = item
split = self.get_split()
nline = '\t' + str(time) + split + url + '\n'
bufio.write( nline )
return bufio.getvalue()
class Xact3secHelper( AnalyserHelper ):
def __init__( self, urlFilePath ):
super(Xact3secHelper, self).__init__()
self.urlMap = dict()
self.markMap = dict()
self.__load_url_map( urlFilePath )
self.totalHit = 0
def __load_url_map( self, path ):
#path = '/home/neil/customer/telstra/626750547/0813/SR-626879315_haydc-cdn220-ca-8_20130805/errorlogs/web/reordered/output/20130814-160942__1_3secissue_60.txt'
print 'load urls from file:', path
fin = open( path, 'r' )
split = self.get_split()
for line in fin:
if line[0] != '\t':
continue
line = line.strip()
segs = line.split( split )
time = float(segs[0])
url = segs[1]
url = self.__parse_url_path( url )
tlist = None
if url in self.urlMap:
tlist = self.urlMap[url]
else:
tlist = list()
tlist.append( time )
self.urlMap[url] = tlist
for path in self.urlMap.keys():
item = self.urlMap[path]
self.markMap[path] = 1
#print path, item
fin.close()
def get_value( self, logInfo ):
return logInfo
def init_value( self, value ):
return list()
def update_value( self, oldValue, sampleValue ):
logInfo = sampleValue
url = logInfo.urlAll
path = self.__parse_url_path( url )
time = self.__is_path_in_map( path, logInfo.recvdTime )
if time is not None:
if path in self.markMap:
del self.markMap[path]
self.totalHit += 1
if (self.totalHit%1000) == 0:
print 'got the', str(self.totalHit), 'th one'
print logInfo.originLine
print '\t', str_seconds(time)
oldValue.append( (logInfo.recvdTime, logInfo.originLine, time) )
return oldValue
def __parse_url_path( self, url ):
idx = url.find( '://' )
idx = url.find( '/', idx+3 )
return url[idx:len(url)]
def __is_path_in_map( self, path, rtime ):
if path in self.urlMap:
tlist = self.urlMap[path]
for time in tlist:
diff = time - rtime
if diff >= 0 and diff < 0.2:
return time
return None
def exclude_value( self, value ):
if len(value) == 0:
return True
return False
def str_value( self, value ):
itemList = sorted( value, key=itemgetter(0) )
bufio = StringIO()
bufio.write( '\n' )
for item in itemList:
(rtime, oline, time) = item
split = self.get_split()
bufio.write( oline )
bufio.write( '\t' )
tstr = str_seconds( time )
bufio.write(tstr)
bufio.write(',')
bufio.write( '\n' )
return bufio.getvalue()
def on_close( self ):
for path in self.markMap:
print 'not used'
print path
class UserDefinedCtx:
def __init__( self ):
pass
def register_user_defined( self ):
register_anlyser( 'burstissue', UserDefinedCtx.__parse_dummy, UserDefinedCtx.__create_analyser, self )
register_anlyser( '3secissue', UserDefinedCtx.__parse_dummy, UserDefinedCtx.__create_analyser, self )
register_anlyser( 'xact3sec', UserDefinedCtx.__parse_xact3sec, UserDefinedCtx.__create_analyser, self )
def __parse_dummy( self, config, node ):
pass
def __parse_xact3sec( self, config, node ):
urlfileList = get_xmlnode( node, 'urlfile' )
if urlfileList is not None and len(urlfileList) > 0:
config.urlFilePath = get_nodevalue( urlfileList[0] )
else:
print 'error for paring xact3sec analyser, no urlfile configured'
def __create_analyser( self, config ):
atype = 'single'
xtype = config.type
helper = None
anly = None
if xtype == 'burstissue':
helper = BurstHelper()
elif xtype == '3secissue':
helper = ThressSecHelper()
elif xtype == 'xact3sec':
helper = Xact3secHelper( config.urlFilePath )
if atype == 'single':
anly = SingleAnalyser( config, helper )
return anly