-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathpmt.py
executable file
·244 lines (214 loc) · 8.34 KB
/
pmt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python
"""
PyModel Tester
"""
import os
import sys
import imp
import pdb # can optionally run in debugger, see main
import random
import signal
import traceback
import TesterOptions
import observation_queue as observation
from ProductModelProgram import ProductModelProgram
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException()
def SelectAction(enabled):
"""
Default strategy: choose an enabled action at random
"""
if enabled: # might be empty
(aname, args, result, next, properties) = random.choice(enabled)
return (aname, args)
else:
return (None,None)
# for printing output file
def fmtarg(arg):
return '%s' % arg # FIXME, quotes around literal strings, print None
def fmtargs(args):
return '%s' % args # FIXME?
#return ','.join([ fmtarg(arg) for arg in args ])
def quote(s):
return "'%s'" % s if isinstance(s, str) else s
def RunTest(options, mp, stepper, strategy, f, krun):
"""
Execute one test run
"""
if options.output:
f.write(' [\n')
isteps = 0
failMessage = None # no failMessage indicates success
observable_action = False
infoMessage = ''
cleanup = False
maxsteps = options.nsteps
# execute test run steps
while options.nsteps == 0 or isteps < maxsteps:
# execute one test run step.
# actions identified by aname (string) at this level, for composition
# print 'Current state: %s' % mp.Current() # DEBUG
# observable action
if observation.queue: # if queue not empty
# print 'observable action branch, call observation.queue.popleft()'
(aname, args) = observation.queue.popleft()
observable_action = True
# print '< test runner gets', (aname, args)
if not mp.ActionEnabled(aname, args):
# args here is return value captured from implementation
# might be more helpful to also show expected return value here
failMessage = '%s%s, action not enabled' % (aname, args)
break
else:
pass # go on, execute observable action in model BUT NOT stepper!
# don't forget to reset observable_action at the bottom of while body
# controllable action
else:
# EnabledTranstions returns: [(aname,args,result,next,properties),...]
enabled = mp.EnabledTransitions(cleanup)
(aname, args) = strategy.SelectAction(enabled)
# exit conditions
if not aname:
if observation.asynch:
# before exit, wait for item to appear in observation queue
stepper.wait(options.timeout if options.timeout else None)
continue # get observable action from queue
# if not asynch, come directly here
# if asynch, only reach here if wait times out
if not cleanup:
infoMessage = 'no more actions enabled'
break
elif cleanup and mp.Accepting():
break
else:
pass # go on, execute controllable action in model and maybe stepper
# execute the action in the model and print progress message
isteps = isteps + 1
modelResult = mp.DoAction(aname, args) # Execute in model, get result
qResult = quote(modelResult)
if modelResult != None:
print aname if options.quiet else '%s%s / %s' % (aname, args, qResult)
else:
print aname if options.quiet else '%s%s' % (aname, args)
if options.output:
if qResult != None:
f.write(' (%s, %s, %s),\n' % (aname, args, qResult))
else:
f.write(' (%s, %s),\n' % (aname, args)) # optional missing result
# execute controllable action in the stepper if present
if stepper and not observable_action:
failMessage = None
try:
if options.timeout:
# docs.python.org/library/signal says Availability: Unix
# tested on Mac OS X, might not work on Windows
signal.alarm(options.timeout) # schedule timeout
# Execute action in stepper
result = stepper.TestAction(aname, args, modelResult)
if options.timeout:
signal.alarm(0) # cancel timeout
# stepper returns None to indicate success
if result == None:
pass # success, go on to next step
# stepper returns string to indicate failure
elif isinstance(result, str):
failMessage = result # failure, prepare to print message
# stepper may append observable action to observation_queue
# if so, will be detected by if observation_queue: at top of loop
else:
failMessage = 'stepper returned unhandled result: %s' % (result,)
except BaseException as e:
traceback.print_exc() # looks just like unhandled exception
failMessage = 'stepper raised exception: %s, %s' % \
(e.__class__.__name__, e)
if failMessage:
break
# not stepper or observable_action
else:
observable_action = False # must reset in all paths through while body
# begin cleanup phase
if isteps == options.nsteps:
cleanup = True
maxsteps += options.cleanupSteps
# end one test run step
# end while executing test run steps
# Print test run outcome, including explanation and accepting state status
acceptMsg = 'reached accepting state' if mp.Accepting() else \
'ended in non-accepting state'
infoMessage += '%s%s' % (', ' if infoMessage else '', acceptMsg)
if stepper and not mp.Accepting() and not failMessage:
failMessage = infoMessage # test run ends in non-accepting state: fail
if failMessage:
print '%3d. Failure at step %s, %s' % (krun, isteps, failMessage)
else:
print '%3d. %s at step %s%s' % (krun, 'Success' if stepper else 'Finished',
isteps,
(', %s' % infoMessage) if infoMessage else '')
if options.output:
f.write(' ],\n')
def main():
(options, args) = TesterOptions.parse_args()
# args can include model programs, FSMs, test suites
if not args:
TesterOptions.print_help() # must have at least one arg, not optional
exit()
else:
mp = ProductModelProgram(options, args)
stepper = __import__(options.iut) if options.iut else None
if stepper:
# recognize PEP-8 style names (all lowercase) if present
if hasattr(stepper, 'testaction'):
stepper.TestAction = stepper.testaction
if hasattr(stepper, 'test_action'):
stepper.TestAction = stepper.test_action
if hasattr(stepper, 'reset'):
stepper.Reset = stepper.reset
if options.strategy:
strategy = __import__(options.strategy)
if hasattr(strategy, 'selectaction'):
strategy.SelectAction = strategy.selectaction
if hasattr(strategy, 'select_action'):
strategy.SelectAction = strategy.select_action
else:
strategy = imp.new_module('strategy')
strategy.SelectAction = SelectAction # handle default strategy in same way
if options.seed: # NB -s 0 has no effect, by definition!
random.seed(options.seed) # make calls to random.choice reproducible
f = None # must be bound when passed to RunTest
if options.output:
f = open("%s.py" % options.output, 'w')
f.write('\n# %s' % os.path.basename(sys.argv[0])) # echo command line ...
f.write(' %s\n' % ' '.join(['%s' % arg for arg in sys.argv[1:]])) # ...etc.
f.write('\n# actions here are just labels, but must be symbols with __name__ attribute\n\n')
f.writelines([ 'def %s(): pass\n' % aname for aname in mp.anames ])
f.write('\n# action symbols\n')
f.write('actions = (%s)\n' % ', '.join([ aname for aname in mp.anames]))
f.write('\ntestSuite = [\n')
if options.timeout:
signal.signal(signal.SIGALRM, timeout_handler)
#print 'SIGALRM handler is now %s' % signal.getsignal(signal.SIGALRM)
k = 0
while k < options.nruns or options.nruns == 0 or mp.TestSuite:
if k > 0:
try:
mp.Reset()
if options.output:
f.write('#\n')
except StopIteration: # raised by TestSuite Reset after last run in suite
break
if stepper:
stepper.Reset()
RunTest(options, mp, stepper, strategy, f, k)
k += 1
if k > 1:
print 'Test finished, completed %s runs' % k
if options.output:
f.write(']')
f.close()
if __name__ == '__main__':
if '-d' in sys.argv or '--debug' in sys.argv: # run in debugger
pdb.runcall(main)
else:
main ()