forked from roxana-lafuente/ResearchLogger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mytimer.py
60 lines (52 loc) · 1.72 KB
/
mytimer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from threading import Thread, Event
import time
class MyTimer(Thread):
"""
Call a function after a specified number of seconds. Repeat a specified
number of times:
Set repetitions to 0 to do this forever (until cancel is called).
Usage:
t = MyTimer(interval, repetitions, function, args=[], kwargs={})
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
# This timer is modeled after the original Timer class in the python
# threading package.
def __init__(self, interval, repetitions, function, args=[], kwargs={}):
Thread.__init__(self)
self.interval = interval
self.repetitions = repetitions
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
def cancel(self):
"""
Stop the timer if it hasn't finished yet
"""
self.finished.set()
def run(self):
if self.repetitions != 0:
for i in range(0, self.repetitions):
self.finished.wait(self.interval)
if not self.finished.isSet():
self.function(*self.args, **self.kwargs)
else:
while not self.finished.isSet():
self.finished.wait(self.interval)
if not self.finished.isSet():
self.function(*self.args, **self.kwargs)
self.finished.set()
if __name__ == '__main__':
# some test code here.
def hello(name="bla"):
print "hello, ", name
myt = MyTimer(1.0, 5, hello, ["bob"])
myt.start()
time.sleep(4)
myt.cancel()
print "next timer"
myt = MyTimer(1.0, 0, hello, ["bob"])
myt.start()
time.sleep(6)
myt.cancel()