-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathtest.py
358 lines (267 loc) · 10.3 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
#!/usr/bin/env python
"""
Make sure you change the parameters - username, password, mailbox,
paths and options.
Based on @skalavala work at https://blog.kalavala.net/usps/homeassistant/mqtt/2018/01/12/usps.html
"""
import datetime
import email
import imaplib
import os
import re
import subprocess
import sys
import time
from datetime import timedelta
from shutil import copyfile
host = "imap.mail.com"
port = 993
username = "[email protected]"
password = "password"
folder = "Inbox"
image_output_path = "/home/homeassistant/.homeassistant/images/mail_and_packages/"
USPS_Email = "[email protected]"
USPS_Mail_Subject = "Informed Delivery Daily Digest"
USPS_Delivering_Subject = "Expected Delivery on"
USPS_Delivered_Subject = "Item Delivered"
UPS_Email = "[email protected]"
UPS_Delivering_Subject = "UPS Update: Package Scheduled for Delivery Today"
UPS_Delivered_Subject = "Your UPS Package was delivered"
FEDEX_Email = "[email protected]"
FEDEX_Delivering_Subject = "Delivery scheduled for today"
FEDEX_Delivered_Subject = "Your package has been delivered"
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
GIF_FILE_NAME = "mail_today_test.gif"
IMG_RESIZE_OPTIONS = "convert -resize 700x315\> "
GIF_MAKER_OPTIONS = (
"convert -delay 300 -loop 0 -coalesce -fill white -dispose Background "
)
# Login Method
###############################################################################
def login():
account = imaplib.IMAP4_SSL(host, port)
try:
rv, data = account.login(username, password)
print("Logged into your email server successfully!")
except imaplib.IMAP4.error:
print(
"Failed to authenticate using the given credentials. Check your username, password, host and port."
)
sys.exit(1)
return account
# Select folder inside the mailbox
###############################################################################
def selectfolder(account, folder):
(rv, mailboxes) = account.list()
(rv, data) = account.select(folder)
# Returns today in specific format
###############################################################################
def get_formatted_date():
return datetime.datetime.today().strftime("%d-%b-%Y")
# gets update time
###############################################################################
def update_time():
updated = datetime.datetime.now().strftime("%b-%d-%Y %I:%M %p")
return updated
# Creates GIF image based on the attachments in the inbox
###############################################################################
def get_mails(account):
today = get_formatted_date()
# Custom email date
today = "21-Sep-2019"
image_count = 0
images = []
imagesDelete = []
print("Attempting to find Informed Delivery mail for {}".format(str(today)))
# Check the mail piece for mail images
rv, data = account.search(
None,
'(FROM "'
+ USPS_Email
+ '" SUBJECT "'
+ USPS_Mail_Subject
+ '" ON "'
+ today
+ '")',
)
# Get number of emails found
messageIDsString = str(data[0], encoding="utf8")
listOfSplitStrings = messageIDsString.split(" ")
msg_count = len(listOfSplitStrings)
print("Found emails: {}\n".format(str(msg_count)))
if rv == "OK":
for num in data[0].split():
(rv, data) = account.fetch(num, "(RFC822)")
msg = email.message_from_string(data[0][1].decode("utf-8"))
# walking through the email parts to find images
for part in msg.walk():
if part.get_content_maintype() == "multipart":
continue
if part.get("Content-Disposition") is None:
continue
filepath = image_output_path + part.get_filename()
print("Image found: {}.".format(str(filepath)))
fp = open(filepath, "wb")
fp.write(part.get_payload(decode=True))
images.append(filepath)
# print(*images, sep = "\n")
# print ("\n")
image_count = image_count + 1
fp.close()
print("Total mages found in email: {}. \n".format(str(image_count)))
# Remove duplicate images
print("Removing duplicate images.")
images = list(dict.fromkeys(images))
# Create copy of image list before checking for image-no-mailpieces image so we can delete them
imagesDelete = images
print("Images to delete.")
print(*imagesDelete, sep="\n")
print("\n")
image_count = len(images)
print("Found {} unique images total to generate gif.".format(str(image_count)))
print(*images, sep="\n")
print("\n")
# Remove USPS announcement images
print("Removing USPS announcement images.")
remove_terms = ["mailerProvidedImage", "ra_0"]
images = [
el for el in images if not any(ignore in el for ignore in remove_terms)
]
image_count = len(images)
print("Found {} mail images total to generate gif.".format(str(image_count)))
print(*images, sep="\n")
print("\n")
# Look for mail pieces without images image
print("Checking for mail with no images.")
html_text = str(msg)
link_pattern = re.compile("image-no-mailpieces700.jpg")
search = link_pattern.search(html_text)
if search is not None:
images.append(image_output_path + "image-no-mailpieces700.jpg")
image_count = image_count + 1
print("Image found: " + image_output_path + "image-no-mailpieces700.jpg.")
else:
print("No image-no-mailpieces700.jpg image found.\n")
print("\n")
# Creating the GIF
if image_count > 0:
all_images = ""
# Convert to similar images sizes
for image in images:
try:
os.system(IMG_RESIZE_OPTIONS + image + " " + image)
except Exception as err:
print("Error attempting to resize images: {}".format(str(err)))
# Combine images into GIF
for image in images:
all_images = all_images + image + " "
try:
os.system(
GIF_MAKER_OPTIONS + all_images + image_output_path + GIF_FILE_NAME
)
print("GIF of mail images generated.\n")
except Exception as err:
print("Error attempting to generate image: {}".format(str(err)))
print("Deleting temporary images.")
for image in imagesDelete:
try:
os.remove(image)
print("Removed image: {}.".format(str(image)))
except Exception as err:
print("Error attempting to remove image: {}".format(str(err)))
print("\n")
if image_count == 0:
try:
os.remove(image_output_path + GIF_FILE_NAME)
print("No mail GIF generated.\n")
except Exception as err:
print("Error attempting to remove image: {}".format(str(err)))
try:
copyfile(
image_output_path + "mail_none.gif", image_output_path + GIF_FILE_NAME
)
except Exception as err:
print("Error attempting to copy image: {}".format(str(err)))
print("\n")
return image_count
# Get Package Count
###############################################################################
def get_count(account, email, subject):
count = 0
today = get_formatted_date()
(rv, data) = account.search(
None, '(FROM "' + email + '" SUBJECT "' + subject + '" SINCE "' + today + '")'
)
if rv == "OK":
count = len(data[0].split())
return count
# Get Package Count
###############################################################################
def MailCheck():
updated = update_time()
print("Update Time '{}'".format(updated))
def USPS_Mail():
account = login()
selectfolder(account, folder)
count = get_mails(account)
print("USPS Mail 1/4: Found '{}' mail pieces being delivered".format(count))
account = login()
selectfolder(account, folder)
count = get_count(account, USPS_Email, USPS_Mail_Subject)
print("USPS Mail 2/4: Found '{}' USPS emails".format(count))
def USPS_Delivering():
account = login()
selectfolder(account, folder)
count = get_count(account, USPS_Email, USPS_Delivering_Subject)
print("USPS 3/4: Found '{}' USPS packages delivering".format(count))
def USPS_Delivered():
account = login()
selectfolder(account, folder)
count = get_count(account, USPS_Email, USPS_Delivered_Subject)
print("USPS 4/4: Found '{}' USPS packages delivered".format(count))
def UPS_Delivering():
account = login()
selectfolder(account, folder)
count = get_count(account, UPS_Email, UPS_Delivering_Subject)
print("UPS 1/2: Found '{}' UPS packages delivering".format(count))
def UPS_Delivered():
account = login()
selectfolder(account, folder)
count = get_count(account, UPS_Email, UPS_Delivered_Subject)
print("UPS 2/2: Found '{}' UPS packages delivered".format(count))
def FEDEX_Delivering():
account = login()
selectfolder(account, folder)
count = get_count(account, FEDEX_Email, FEDEX_Delivering_Subject)
print("FEDEX 1/2: Found '{}' FedEx packages delivering".format(count))
def FEDEX_Delivered():
account = login()
selectfolder(account, folder)
count = get_count(account, FEDEX_Email, FEDEX_Delivered_Subject)
print("FEDEX 2/2: Found '{}' FedEx packages delivered".format(count))
# Primary logic for the component starts here
###############################################################################
try:
while True:
try:
account = login()
selectfolder(account, folder)
except Exception as exx:
print("Error connecting logging into email server.")
print(str(exx))
sys.exit(1)
MailCheck()
USPS_Mail()
USPS_Delivering()
USPS_Delivered()
UPS_Delivering()
UPS_Delivered()
FEDEX_Delivering()
FEDEX_Delivered()
print("Finished checking.")
# For testing, running manually within the env
sys.exit(1)
except Exception as e:
print("Error occured while either processing email.")
print(str(e))
sys.exit(1)