-
Notifications
You must be signed in to change notification settings - Fork 0
/
CozmoMainCode.py
383 lines (322 loc) · 16.3 KB
/
CozmoMainCode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
import cleverwrap
import cozmo
import time
import sys
import ast
#import asyncio
from cozmo.util import degrees, distance_mm, speed_mmps
import voiceParse
#from gtts import gTTS
#import speech_recognition as sr
import os.path
import re
import webbrowser
import smtplib
import requests
import random
import functionsCozmo
import face_follower
from weather import Weather
import asyncio
from Docking import *
APIKEY = "Put your own API, you can get this from Clever api website"
# Charger
cozmo.robot.Robot.drive_off_charger_on_connect = False
# Strings
cozmoString = ""
humanString = ""
# Function to get the log started
def initLog():
log = 0
# Let's try to open the log file to append to. If we fail, we are forced to
# exit the program. Otherwise, we state that the log file is opened.
try:
log = open("log.txt", "a")
except:
print("Error opening log file!")
sys.exit()
else:
print("Log file opened.")
return log
# Function to add entry to the log file.
def addEntry(log, entry):
entryTime = time.gmtime()
# Time format: Day-Month-Year Hour:Minute:Second
parsedTime = str(entryTime.tm_mday) + "-" + str(entryTime.tm_mon) + "-" + \
str(entryTime.tm_year) + " " + str(entryTime.tm_hour) + ":" + \
str(entryTime.tm_min) + ":" + str(entryTime.tm_sec)
# Now we patch together the log message
logMessage = parsedTime + "# " + entry
# Let's try to log it. If it fails, we simply skip logging the message
try:
log.write(logMessage + "\n")
except:
print("Error logging message! Skipping...")
# A simple error checking wrapper for the cleverwrap script
def initBot(apiKey):
bot = 0
# Let's try to open it, if it fails, we log and exit.
try:
bot = cleverwrap.CleverWrap(apiKey)
except:
print("Error connecting to CleverBot!")
sys.exit()
else:
print("Connected to CleverBot!")
return bot
import threading
def mainLoop(robot: cozmo.robot.Robot):
threadFace = threading.Thread(target=face_follower.follow_faces, args=(robot,))
threadFace.start()
while True:
print('Expression:' + face_follower.notifyExpression())
# In a loop, we grab the user input
print("Listening...")
humanString = (voiceParse.parseVoice()).lower()
ListOfCommand = [str(s) for s in (humanString.lower()).split()]
# Check it for a quit condition.
if {'shut','down','cozmo'} <= set(ListOfCommand) or {'cosmo','shut','down'} <= set(ListOfCommand):
# If we quit, we log the quit and leave the program.
robot.say_text("Ok, shut down", in_parallel=True).wait_for_completed()
docking_cozmo(robot)
ListOfCommand.clear()
addEntry(log, "Conversation ended.")
ListOfCommand.clear()
sys.exit()
if robot.battery_voltage <= 2.0:
print('Running out of battery')
robot.say_text("Running out of battery", in_parallel=True).wait_for_completed()
docking_cozmo(robot)
if {'go','out','charger'} <= set(ListOfCommand):
robot.drive_off_charger_contacts().wait_for_completed()
robot.drive_straight(distance_mm(150),speed_mmps(60)).wait_for_completed()
ListOfCommand.clear()
if 'open reddit' in humanString:
reg_ex = re.search('open reddit(.*)', humanString.lower())
url = 'https://www.reddit.com/'
addEntry(log, "Human says: " + humanString)
print("Human says: " + humanString)
if reg_ex:
subreddit = reg_ex.group(1)
url = url + 'r/' + subreddit
webbrowser.open(url)
addEntry(log, "Cozmo says: " + humanString)
print("Cozmo says: " + "Done")
robot.say_text("Done",in_parallel=True).wait_for_completed()
continue
if {'go','to','charger'} <= set(ListOfCommand):
docking_cozmo(robot)
ListOfCommand.clear()
if 'open google' in humanString:
reg_ex = re.search('open google(.*)', humanString.lower())
url = 'https://www.google.com/'
if reg_ex:
subreddit = reg_ex.group(1)
url = url + 'search?q=' + subreddit
webbrowser.open(url)
addEntry(log, "Human says: " + humanString)
print("Human says: " + humanString)
addEntry(log, "Cozmo says: " + humanString)
print("Cozmo says: " + "Done")
robot.say_text("Done",in_parallel=True).wait_for_completed()
continue
if 'open my favorite song' in humanString:
robot.say_text("What song?",in_parallel=True).wait_for_completed()
humanString1 = (voiceParse.parseVoice()).lower()
print("Human says: " + humanString1)
reg_ex = re.search('(.+)', humanString1)
if reg_ex:
SongName = reg_ex.group(1)
#you can add any song you want in the ListofSongs, by adding key word with its watch?v=?????????
json_file = open('texttest.json')
json_str = json_file.read()
dictOfSong = ast.literal_eval(json_str)
"""ListofSongs = {"part of your world": "watch?v=gtpLsPPtC88", "full nocturne": 'watch?v=liTSRH4fix4',
'eyes on fire': 'watch?v=LAxCqlU-OAo', 'nocturne in f minor': 'watch?v=E3qHO9aOQYM',
'moonlight sonata': 'watch?v=4Tr0otuiQuU', 'clair de lune': 'watch?v=ea2WoUtbzuw',
'hello': 'watch?v=YQHsXMglC9A', 'skyfall': 'watch?v=DeumyOzKqgI'}"""
subUrl = 'https://www.youtube.com/search?q=' + SongName
has_Song=False
for x in range(len(dictOfSong['song'])):
if (dictOfSong['song'][x]['name']).lower() == SongName:
webbrowser.open(dictOfSong['song'][x]['url'])
has_Song=True
break
if has_Song == False:
webbrowser.open(subUrl)
continue
continue
elif type(SongName) == type(None):
robot.say_text("No command found",in_parallel=True).wait_for_completed()
continue
robot.say_text("Done",in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "Done")
print("Cozmo says: " + "Done")
continue
if 'open website' in humanString:
robot.say_text("What do you want me to open",in_parallel=True).wait_for_completed()
humanString1 = (voiceParse.parseVoice()).lower()
print("Human says: " + humanString1)
reg_ex = re.search('(.+)', humanString1)
if reg_ex:
domain = reg_ex.group(1)
url = 'https://www.' + domain + ".com"
ListofDictionaries = {"google":"search?q=","youtube":'search?q=',"reddit":"r/",
'amazon':'s?url=search-alias%3Daps&field-keywords='}
if domain not in str(ListofDictionaries):
webbrowser.open(url)
continue
robot.say_text("What are you looking for in " + domain,in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "What are you looking for in " + domain)
print("Cozmo says: " + "What are you looking for in " + domain)
humanString2 = (voiceParse.parseVoice()).lower()
addEntry(log, "Human says: " + humanString2)
print("Human says: " + humanString2)
if "nothing" in humanString2:
webbrowser.open(url)
else:
newurl = url + "/" + ListofDictionaries.get(domain) + humanString2
webbrowser.open(newurl)
else:
pass
robot.say_text("Done",in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "Done")
print("Cozmo says: " + "Done")
continue
if ({"cosmo", "email"} <= set(ListOfCommand)) or ({"cozmo", "email"} <= set(ListOfCommand)):
ListOfCommand.clear()
robot.say_text("Who is the recipient?",in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "Who is the recipient?")
print("Cozmo says: " + "Who is the recipient?")
recipient = (voiceParse.parseVoice()).lower()
print(recipient)
addEntry(log, "Human says: " + recipient)
print("Human says: " + recipient)
#you can add any email in the line below
"""ListofEmails = {"bright":"[email protected]","boss":"[email protected]",
"tim":"[email protected]","lucky":"[email protected]"}"""
json_file1 = open('emailtext.json')
json_str1 = json_file1.read()
dictOfEmail = ast.literal_eval(json_str1)
has_Email = False
for x in range(len(dictOfEmail['email'])):
print((dictOfEmail['email'][x]['name']).lower())
if (dictOfEmail['email'][x]['name']).lower() == recipient:
robot.say_text("What should I say?", in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "What should I say?")
print("Cozmo says: " + "What should I say?")
content = (voiceParse.parseVoice()).lower()
addEntry(log, "Human says: " + content)
print("Human says: " + content)
# init gmail SMTP
ListOfSmtp = {'gmail':[{'name':'smtp.gmail.com','key': '587'}],
'outlook':[{'name':'smtp-mail.outlook.com','key':'587' }],
'hotmail': [{'name': 'smtp.live.com', 'key': '25'}]}
checkSmtp = dictOfEmail['email'][0]['url'].split(".")
b = str(checkSmtp[0]).split("@")
c = b[1].split(".")
d = c[0]
#check email whether our email is outlook or gmail
if d in ListOfSmtp:
mail = smtplib.SMTP(ListOfSmtp[d][0]['name'], ListOfSmtp[d][0]['key'])
# smtp-mail.outlook.com
# identify to server
mail.ehlo()
# encrypt session
mail.starttls()
# In this line, we open text file from gui
json_file = open('useradding.json')
json_str = json_file.read()
dictOfUser = ast.literal_eval(json_str)
# In this line, put your email and password
mail.login(dictOfUser['email'][0]['name'], dictOfUser['email'][0]['password'])
# send to the recipient
mail.sendmail('recipient', dictOfEmail['email'][x]['url'], content + "\n" + "sent via Cozmo")
# end mail connection
mail.close()
robot.say_text("Sent", in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "Sent")
print("Cozmo says: " + "Sent")
has_Email = True
break
else:
robot.say_text("I can\'t send because I don't know the SMTP name", in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "I can\'t send because I don't know the SMTP name")
print("Cozmo says: " + "I can\'t send because I don't know the SMTP name")
return mainLoop()
if has_Email == False:
robot.say_text("I don\'t know him", in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + "I don\'t know him")
print("Cozmo says: " + "I don\'t know him")
continue
if 'weather forecast in' in humanString:
addEntry(log, "Human says: " + humanString)
print("Human says: " + humanString)
reg_ex = re.search('weather forecast in (.*)', humanString)
if reg_ex:
city = reg_ex.group(1)
weather = Weather()
location = weather.lookup_by_location(city)
forecasts = location.forecast
for i in range(0, 3):
TempetureForecast = ('On %s will it %s. The maximum temperture will be %.1f degrees Celcius.'
'The lowest temperature will be %.1f degrees Celcius.' % (
forecasts[i].date, forecasts[i].text, int(forecasts[i].high),
int(forecasts[i].low)))
robot.say_text(TempetureForecast, use_cozmo_voice=True,duration_scalar=1, in_parallel=True).wait_for_completed()
addEntry(log, "Cozmo says: " + TempetureForecast)
print("Cozmo says: " + TempetureForecast)
continue
if ({"cosmo", "joke"} <= set(ListOfCommand)) or ({"cozmo", "joke"} <= set(ListOfCommand)):
ListOfCommand.clear()
res = requests.get(
'https://icanhazdadjoke.com/',
headers={"Accept": "application/json"}
)
if res.status_code == requests.codes.ok:
robot.say_text(str(res.json()['joke']),in_parallel=True,duration_scalar=1.2).wait_for_completed()
functionsCozmo.motion_step(robot)
print("Cozmo says: " + str(res.json()['joke']))
addEntry(log, "Cozmo says: " + str(res.json()['joke']))
else:
robot.say_text(str(res.json()['oops!I ran out of jokes']),in_parallel=True).wait_for_completed()
print("Cozmo says: " + 'oops!I ran out of jokes')
addEntry(log, "Cozmo says: " + 'oops!I ran out of jokes')
continue
#Action Commands
if ({"sing", "song", "cosmo","stupid"} <= set(ListOfCommand)) or ({"sing", "song", "cozmo","stupid"} <= set(ListOfCommand)):
addEntry(log, "Human says: " + humanString)
print("Human says: " + humanString)
ListOfCommand.clear()
functionsCozmo.cozmo_singing(robot)
robot.say_text("Do you like my song?",play_excited_animation=True,in_parallel=True).wait_for_completed()
print("Cozmo says: " + "Do you like my song?")
addEntry(log, "Cozmo says: " + "Do you like my song?")
continue
if ({"random", "cosmo",'emotion'} <= set(ListOfCommand)) or ({"random", "cozmo",'emotion'} <= set(ListOfCommand)):
ListOfCommand.clear()
functionsCozmo.motion_step(robot)
continue
# Else, we log what the human said.
addEntry(log, "Human says: " + humanString)
print("Human says: " + humanString)
# Grab the response from CleverBot
cozmoString = cleverbot.say(humanString)
# Print the response to the screen and add it to the log
print("Cozmo says: " + cozmoString)
addEntry(log, "Cozmo says: " + cozmoString)
# Then we make Cozmo say it.
robot.say_text(cozmoString,in_parallel=True).wait_for_completed()
ListOfCommand.clear()
# This is where our code begins. We can first initialize everything,
# Then once it's all started, we log that the conversation has started
# and print the quit instructions to the user.
voiceParse.initSpeech()
log = initLog()
cleverbot = initBot(APIKEY)
addEntry(log, "Conversation started.")
print("######################")
print("#Type 'quit' to exit.#")
print("######################")
def run():
cozmo.run_program(mainLoop, use_viewer=False, force_viewer_on_top=False)