-
Notifications
You must be signed in to change notification settings - Fork 1
/
toolkit_imap.py
172 lines (150 loc) · 6.37 KB
/
toolkit_imap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import imaplib
import email
import datetime
import re
import os
import logging
import toolkit_config
config = toolkit_config.read_config_mail('config.ini')
config = toolkit_config.read_config_mail('config_test.ini')
class Imap():
"""docstring for Imap"""
def __init__(self, IMAP_SERVER, EMAIL_ACCOUNT, EMAIL_PASSWORD):
self.con = imaplib.IMAP4_SSL(IMAP_SERVER, port=993)
try:
rv, data = self.con.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
# Default select INBOX folder
self.con.select('"INBOX"')
except imaplib.IMAP4.error:
logging.error("LOGIN FAILED!!! ")
raise
else:
logging.info('rv: {}, data: {}'.format(rv, data))
def __enter__(self):
return self
def __exit__(self, Type, value, traceback):
'''
Executed after "with"
'''
logging.info('Close the imap connection')
self.con.close()
def get_mail_folder(self):
rv, mailboxes = self.con.list()
if rv == 'OK':
# print("mailboxes:")
# print(mailboxes)
# get inbox alias
logging.debug(mailboxes)
def g(i): return re.compile(r'(?<=" )(.*)',
re.IGNORECASE).findall(i.decode('utf-8'))[0]
return list(map(g, mailboxes))
def set_mail_read(self, mail_num):
'''
Mark mail as read/unread
'''
self.con.store(mail_num, '+FLAGS', '\\Seen')
# Mark as unread
# self.con.store(mail_num, '-FLAGS', '\\Seen')
def delete_mail(self, mail_num):
logging.info("Delete mail {}".format(self.get_mail_subject(mail_num)))
self.con.store(mail_num, '+FLAGS', '\\Deleted')
self.con.expunge()
def get_mail_subject(self, mail_num):
rv, mail_data_bin = self.con.fetch(mail_num, '(RFC822)')
if rv != 'OK':
logging.error("ERROR getting message: mail_num {}".format(mail_num))
return
# Fetch again if the mail not fetched
mail_data_raw = mail_data_bin[0][1].decode(
'utf-8', 'ignore').replace('\r\n', '\n')
self.email_message = email.message_from_string(mail_data_raw)
return self.email_message['Subject']
def download_attachment(self, mail_num):
rv, mail_data_bin = self.con.fetch(mail_num, '(RFC822)')
if rv != 'OK':
logging.error("ERROR getting message {}".format(mail_num))
return
# Fetch again if the mail not fetched
mail_data_raw = mail_data_bin[0][1].decode(
'utf-8', 'ignore').replace('\r\n', '\n')
self.email_message = email.message_from_string(mail_data_raw)
for part in self.email_message.walk():
# print(part.get_content_maintype())
if part.get_content_maintype() == 'multipart':
# print(part.as_string())
continue
if part.get('Content-Disposition') is None:
# print (part.as_string())
continue
fileName = part.get_filename()
detach_dir = '.'
if 'attachments' not in os.listdir(detach_dir):
os.mkdir('attachments')
if fileName:
fileFolder = detach_dir + os.sep + 'attachments'
filePath = fileFolder + os.sep + fileName
if not os.path.isfile(filePath):
logging.info('Saving {} to {}...'.format(
fileName, filePath))
with open(filePath, 'wb') as fp:
fp.write(part.get_payload(decode=True))
else:
logging.info('Same file {} found, skip.'.format(filePath))
def get_mail_num(self, mailFolder):
'''
Fetch the mail index in a mail folder
'''
# mailFolder = '"{}"'.format(mailFolder)
logging.info("Processing mailbox {}...\n".format(mailFolder))
rv, data = self.con.select(mailFolder)
if rv != 'OK':
logging.error("ERROR: Unable to open mailbox: {}".format(rv))
return
rv, data = self.con.search(None, "ALL")
return data[0].split()
def empty_mail_folder(self, mailFolder):
rev_list = imap.get_mail_num(mailFolder)
for mail_num in rev_list:
# Always delete the first mail
imap.delete_mail(rev_list[0])
def walk_mail_folder(self, mailFolder):
rv, data = self.con.select(mailFolder)
if rv != 'OK':
logging.error("ERROR: Unable to open mailbox: {}".format(rv))
return
logging.info("Processing mailbox {}...\n".format(mailFolder))
rv, data = self.con.search(None, "ALL")
logging.info(data)
logging.info('-' * 90)
for mail_num in data[0].split():
rv, mail_data_bin = self.con.fetch(mail_num, '(RFC822)')
if rv != 'OK':
logging.error("ERROR getting message: mail number {}".format(mail_num))
return
mail_data_raw = mail_data_bin[0][1].decode(
'utf-8').replace('\r\n', '\n')
# print(mail_data_raw)
self.email_message = email.message_from_string(mail_data_raw)
logging.info(mail_num.decode('utf-8') + ': ' +
self.email_message['Subject'])
# print(email.utils.parseaddr(self.email_message['To']))
# print(self.email_message.items())
logging.info('x' * 90)
def mark_all_mail_read(self):
for mailbox in self.get_mail_folder():
logging.info("Mark all mails in folder: {}".format(mailbox))
for mail_num in self.get_mail_num(mailbox):
self.set_mail_read(mail_num)
if __name__ == '__main__':
# config = {
# 'EMAIL_SERVER' : 'imap.126.com',
# 'EMAIL_ACCOUNT' : '[email protected]',
# 'EMAIL_PASSWORD' : 'xxxxx'
# }
with Imap(config['IMAP_SERVER'], config['EMAIL_ACCOUNT'], config['EMAIL_PASSWORD']) as imap:
# print(imap.get_mail_folder())
# imap.walk_mail_folder('"INBOX"')
imap.mark_all_mail_read()
# for mailbox in imap.get_mail_folder():
# mailbox = 'Raspberry pi booted'
# imap.empty_mail_folder(mailbox)