-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtaskmanager.py
41 lines (35 loc) · 1.1 KB
/
taskmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from light_progress.commandline import ProgressBar
from light_progress import widget
import threading
class TaskManager(object):
def __init__(self, tasks, no_threads=10):
self.tasks = tasks
self.no_threads = no_threads
widgets = [widget.Bar(bar='*', tip='>'),
widget.Percentage(),
widget.Num(),
widget.ElapsedSeconds(),
widget.FinishedAt()]
self.pb = ProgressBar(len(self.tasks), widgets=widgets)
def start(self):
self.pb.start()
for i in range(0, self.no_threads):
self.thread_start()
def pop(self):
if len(self.tasks) == 0:
return
task = self.tasks.pop()
self.pb.forward()
if len(self.tasks) == 0:
self.pb.finish()
return task
def thread_start(self):
if len(self.tasks) > 0:
thread = threading.Thread(target=self.execute)
thread.start()
def execute(self):
t = self.pop()
if not t:
return
t.execute()
self.thread_start()