-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlibran.py
95 lines (78 loc) · 3.04 KB
/
libran.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# coding: utf-8
import imp
import json
import os
import sys
import traceback
from time import sleep, time
from threading import Thread
this_dir = os.path.abspath(os.path.dirname(__file__))
from routing.regex import RegexCommandRouter
class Libran:
def __init__(self, config_file=None):
if config_file is None:
config_file = os.path.join(this_dir, 'config.json')
with open(config_file) as fp:
self.config = json.load(fp)
self.router = RegexCommandRouter(self)
self.message_in = self.get_instance(self.config['message_in'])
self.message_out = self.get_instance(self.config['message_out'])
self.camera = self.get_instance(self.config['camera'])
self.image_processor = self.get_instance(self.config['image_processor'])
self.message_processor = self.get_instance(self.config['message_processor'])
self.notifier = self.get_instance(self.config['notification'])
self.storage = self.get_instance(self.config['storage'])
self.should_work = True
self.answer = None
self.should_cancel = False
self.is_waiting_for_input = False
self.command_to_exec = None
self.command_thread = Thread(target=self.exec_command)
self.command_thread.start()
self.run()
def get_instance(self, config):
mod_name, cls_name = config['class'].split('.')
src = os.path.join(this_dir, 'lib', mod_name + '.py')
mod = imp.load_source(mod_name, src)
cls = getattr(mod, cls_name)
return cls(config)
def run(self):
self.message_out.talk('おはようございます。何かあったら声をかけてください')
while self.should_work == True:
sentence_candidates = self.message_in.listen()
if not sentence_candidates:
sleep(0.1)
continue
print(sentence_candidates, self.is_waiting_for_input)
if self.is_waiting_for_input:
self.answer = sentence_candidates
continue
if self.command_to_exec is not None:
if self.message_processor.is_cancel(sentence_candidates):
self.should_cancel = True
continue
command = self.router.routes(*sentence_candidates)
if command:
self.command_to_exec = (command, sentence_candidates)
self.message_out.talk('おやすみなさい')
def exec_command(self):
while self.should_work == True:
if self.command_to_exec is None:
sleep(0.1)
continue
command, args = self.command_to_exec
command.exec(*args)
self.command_to_exec = None
def cleanup(self):
self.message_in.stop()
self.camera.stop()
self.command_thead.join()
if __name__ == '__main__':
libran = Libran()
try:
libran.run()
except Exception as e:
sys.stderr.write(traceback.format_exc())
raise e
finally:
libran.cleanup()