diff --git a/integrations/reporting_channels/gmail/__init__.py b/integrations/reporting_channels/email/__init__.py similarity index 76% rename from integrations/reporting_channels/gmail/__init__.py rename to integrations/reporting_channels/email/__init__.py index ef3ba3fd..0c3c05bc 100644 --- a/integrations/reporting_channels/gmail/__init__.py +++ b/integrations/reporting_channels/email/__init__.py @@ -13,9 +13,9 @@ __license__ = 'Apache 2.0' __copyright__ = 'Copyright 2013 Charlie Guo' -from .gmail import Gmail +from .email import Email from .mailbox import Mailbox from .message import Message -from .exceptions import GmailException, ConnectionError, AuthenticationError +from .exceptions import EmailException, ConnectionError, AuthenticationError from .utils import login, authenticate diff --git a/integrations/reporting_channels/gmail/gmail.py b/integrations/reporting_channels/email/email.py similarity index 68% rename from integrations/reporting_channels/gmail/gmail.py rename to integrations/reporting_channels/email/email.py index 4529d532..8f537a97 100644 --- a/integrations/reporting_channels/gmail/gmail.py +++ b/integrations/reporting_channels/email/email.py @@ -1,41 +1,45 @@ -""" -This module defines the `Gmail` class, which provides methods to interact with a Gmail account -via IMAP. The class includes functionalities to connect to the Gmail server, login using -credentials, and manage various mailboxes. Also, has functions for fetching mailboxes, -selecting a specific mailbox, searching for and retrieving emails -based on different criteria, managing labels, and handling authentication. -""" - from __future__ import absolute_import +import os +import sys import re import imaplib -from integrations.reporting_channels.gmail.mailbox import Mailbox -from integrations.reporting_channels.gmail.utf import encode as encode_utf7, decode as decode_utf7 -from integrations.reporting_channels.gmail.exceptions import * +import logging +from email.header import decode_header +from dotenv import load_dotenv +from integrations.reporting_channels.email.mailbox import Mailbox +from integrations.reporting_channels.email.utf import encode as encode_utf7, decode as decode_utf7 +from integrations.reporting_channels.email.exceptions import * +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +load_dotenv() + + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') -class Gmail(): - "Class interact with Gmail using IMAP" - # GMail IMAP defaults - GMAIL_IMAP_HOST = 'imap.gmail.com' - GMAIL_IMAP_PORT = 993 +class Email(): + # EMail IMAP defaults + IMAP_HOST = os.getenv('imaphost') + IMAP_PORT = 993 - # GMail SMTP defaults - GMAIL_SMTP_HOST = "smtp.gmail.com" - GMAIL_SMTP_PORT = 587 + # EMail SMTP defaults + EMAIL_SMTP_HOST = os.getenv('smtp_ssl_host') + EMAIL_SMTP_PORT = os.getenv('smtp_ssl_port') def __init__(self): self.username = None self.password = None self.access_token = None + self.imap = None self.smtp = None self.logged_in = False self.mailboxes = {} self.current_mailbox = None + + # self.connect() + def connect(self, raise_errors=True): - "Establishes an IMAP connection to the Gmail server." # try: # self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT) # except socket.error: @@ -43,7 +47,7 @@ def connect(self, raise_errors=True): # raise Exception('Connection failure.') # self.imap = None - self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT) + self.imap = imaplib.IMAP4_SSL(self.IMAP_HOST, self.IMAP_PORT) # self.smtp = smtplib.SMTP(self.server,self.port) # self.smtp.set_debuglevel(self.debug) @@ -53,8 +57,9 @@ def connect(self, raise_errors=True): return self.imap + + # Add fetch_mailboxes method in the Email class def fetch_mailboxes(self): - "Retrieves and stores the list of mailboxes available in the Gmail account." response, mailbox_list = self.imap.list() if response == 'OK': mailbox_list = [item.decode('utf-8') if isinstance(item, bytes) else item for item in mailbox_list] @@ -67,26 +72,24 @@ def fetch_mailboxes(self): else: raise Exception("Failed to fetch mailboxes.") + def use_mailbox(self, mailbox): - "Selects a specific mailbox for further operations." if mailbox: self.imap.select(mailbox) self.current_mailbox = mailbox return Mailbox(self, mailbox) + def mailbox(self, mailbox_name): - "Returns a Mailbox object for the given mailbox name." if mailbox_name not in self.mailboxes: mailbox_name = encode_utf7(mailbox_name) mailbox = self.mailboxes.get(mailbox_name) - if mailbox and not self.current_mailbox == mailbox_name: self.use_mailbox(mailbox_name) return mailbox def create_mailbox(self, mailbox_name): - "Creates a new mailbox with the given name if it does not already exist." mailbox = self.mailboxes.get(mailbox_name) if not mailbox: self.imap.create(mailbox_name) @@ -96,19 +99,20 @@ def create_mailbox(self, mailbox_name): return mailbox def delete_mailbox(self, mailbox_name): - "Deletes the specified mailbox and removes it from the cache." mailbox = self.mailboxes.get(mailbox_name) if mailbox: self.imap.delete(mailbox_name) del self.mailboxes[mailbox_name] + + def login(self, username, password): - "Login to Gmail using the provided username and password. " self.username = username self.password = password if not self.imap: self.connect() + try: imap_login = self.imap.login(self.username, self.password) self.logged_in = (imap_login and imap_login[0] == 'OK') @@ -117,15 +121,18 @@ def login(self, username, password): except imaplib.IMAP4.error: raise AuthenticationError + + # smtp_login(username, password) + return self.logged_in def authenticate(self, username, access_token): - "Login to Gmail using OAuth2 with the provided username and access token." self.username = username self.access_token = access_token if not self.imap: self.connect() + try: auth_string = 'user=%s\1auth=Bearer %s\1\1' % (username, access_token) imap_auth = self.imap.authenticate('XOAUTH2', lambda x: auth_string) @@ -138,32 +145,29 @@ def authenticate(self, username, access_token): return self.logged_in def logout(self): - "Logout from the Gmail account and closes the IMAP connection." self.imap.logout() self.logged_in = False + def label(self, label_name): - "Retrieves a Mailbox object for the specified label (mailbox)." return self.mailbox(label_name) def find(self, mailbox_name="[Gmail]/All Mail", **kwargs): - "Searches and returns emails based on the provided search criteria." box = self.mailbox(mailbox_name) return box.mail(**kwargs) + def copy(self, uid, to_mailbox, from_mailbox=None): - "Copies an email with the given UID from one mailbox to another." if from_mailbox: self.use_mailbox(from_mailbox) self.imap.uid('COPY', uid, to_mailbox) def fetch_multiple_messages(self, messages): - "Fetches and parses multiple messages given a dictionary of `Message` objects." if not isinstance(messages, dict): raise Exception('Messages must be a dictionary') fetch_str = ','.join(messages.keys()) - response, results = self.imap.uid('FETCH', fetch_str, '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)') + response, results = self.imap.uid('FETCH', fetch_str, '(UID BODY.PEEK[] FLAGS)') for raw_message in results: if isinstance(raw_message, tuple): @@ -172,39 +176,40 @@ def fetch_multiple_messages(self, messages): uid = uid_match.group(1).decode('utf-8') if uid in messages: messages[uid].parse(raw_message) + else: + logging.warning(f'UID {uid} not found in messages dictionary') + else: + logging.warning('UID not found in raw message') + elif isinstance(raw_message, bytes): + continue + else: + logging.warning('Invalid raw message format') + return messages def labels(self, require_unicode=False): - "Returns a list of all available mailbox names." keys = self.mailboxes.keys() if require_unicode: keys = [decode_utf7(key) for key in keys] return keys def inbox(self): - "Returns a `Mailbox` object for the Inbox." return self.mailbox("INBOX") def spam(self): - "Returns a `Mailbox` object for the Spam." return self.mailbox("[Gmail]/Spam") def starred(self): - "Returns a `Mailbox` object for the starred folder." return self.mailbox("[Gmail]/Starred") def all_mail(self): - "Returns a `Mailbox` object for the All mail." return self.mailbox("[Gmail]/All Mail") def sent_mail(self): - "Returns a `Mailbox` object for the Sent mail." return self.mailbox("[Gmail]/Sent Mail") def important(self): - "Returns a `Mailbox` object for the Important." return self.mailbox("[Gmail]/Important") def mail_domain(self): - "Returns the domain part of the logged-in email address" - return self.username.split('@')[-1] + return self.username.split('@')[-1] \ No newline at end of file diff --git a/integrations/reporting_channels/email/exceptions.py b/integrations/reporting_channels/email/exceptions.py new file mode 100644 index 00000000..f328c359 --- /dev/null +++ b/integrations/reporting_channels/email/exceptions.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +""" +email.exceptions +~~~~~~~~~~~~~~~~~~~ + +This module contains the set of Emails' exceptions. + +""" + + +class EmailException(RuntimeError): + """There was an ambiguous exception that occurred while handling your + request.""" + +class ConnectionError(EmailException): + """A Connection error occurred.""" + +class AuthenticationError(EmailException): + """Email Authentication failed.""" + +class Timeout(EmailException): + """The request timed out.""" diff --git a/integrations/reporting_channels/email/mailbox.py b/integrations/reporting_channels/email/mailbox.py new file mode 100644 index 00000000..7b5182ee --- /dev/null +++ b/integrations/reporting_channels/email/mailbox.py @@ -0,0 +1,105 @@ +import re +from .message import Message +from .utf import encode as encode_utf7, decode as decode_utf7 + +class Mailbox(): + def __init__(self, email, name="INBOX"): + self.name = name + self.email = email + self.date_format = "%d-%b-%Y" + self.messages = {} + + @property + def external_name(self): + if "external_name" not in vars(self): + vars(self)["external_name"] = encode_utf7(self.name) + return vars(self)["external_name"] + + @external_name.setter + def external_name(self, value): + if "external_name" in vars(self): + del vars(self)["external_name"] + self.name = decode_utf7(value) + + def mail(self, prefetch=False, **kwargs): + search = ['ALL'] + + kwargs.get('read') and search.append('SEEN') + kwargs.get('unread') and search.append('UNSEEN') + + kwargs.get('starred') and search.append('FLAGGED') + kwargs.get('unstarred') and search.append('UNFLAGGED') + + kwargs.get('deleted') and search.append('DELETED') + kwargs.get('undeleted') and search.append('UNDELETED') + + kwargs.get('draft') and search.append('DRAFT') + kwargs.get('undraft') and search.append('UNDRAFT') + + kwargs.get('before') and search.extend(['BEFORE', kwargs.get('before').strftime(self.date_format)]) + kwargs.get('after') and search.extend(['SINCE', kwargs.get('after').strftime(self.date_format)]) + kwargs.get('on') and search.extend(['ON', kwargs.get('on').strftime(self.date_format)]) + + kwargs.get('header') and search.extend(['HEADER', kwargs.get('header')[0], kwargs.get('header')[1]]) + + kwargs.get('sender') and search.extend(['FROM', kwargs.get('sender')]) + kwargs.get('fr') and search.extend(['FROM', kwargs.get('fr')]) + kwargs.get('to') and search.extend(['TO', kwargs.get('to')]) + kwargs.get('cc') and search.extend(['CC', kwargs.get('cc')]) + + kwargs.get('subject') and search.extend(['SUBJECT', kwargs.get('subject')]) + kwargs.get('body') and search.extend(['BODY', kwargs.get('body')]) + + kwargs.get('label') and search.extend(['X-GM-LABELS', kwargs.get('label')]) + kwargs.get('attachment') and search.extend(['HAS', 'attachment']) + + kwargs.get('query') and search.extend([kwargs.get('query')]) + + emails = [] + search_criteria = ' '.join(search).encode('utf-8') # Ensure the search criteria are byte strings + + response, data = self.email.imap.uid('SEARCH', None, search_criteria) + if response == 'OK': + uids = filter(None, data[0].split(b' ')) # filter out empty strings + + for uid in uids: + if not self.messages.get(uid): + self.messages[uid] = Message(self, uid) + emails.append(self.messages[uid]) + + if prefetch and emails: + messages_dict = {} + for email in emails: + messages_dict[email.uid] = email + self.messages.update(self.email.fetch_multiple_messages(messages_dict)) + + return emails + + # WORK IN PROGRESS. NOT FOR ACTUAL USE + def threads(self, prefetch=False, **kwargs): + emails = [] + response, data = self.email.imap.uid('SEARCH', None, 'ALL'.encode('utf-8')) + if response == 'OK': + uids = data[0].split(b' ') + + for uid in uids: + if not self.messages.get(uid): + self.messages[uid] = Message(self, uid) + emails.append(self.messages[uid]) + + if prefetch: + fetch_str = ','.join(uids).encode('utf-8') + response, results = self.email.imap.uid('FETCH', fetch_str, '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)') + for index in range(len(results) - 1): + raw_message = results[index] + if re.search(rb'UID (\d+)', raw_message[0]): + uid = re.search(rb'UID (\d+)', raw_message[0]).groups(1)[0] + self.messages[uid].parse(raw_message) + + return emails + + def count(self, **kwargs): + return len(self.mail(**kwargs)) + + def cached_messages(self): + return self.messages diff --git a/integrations/reporting_channels/gmail/message.py b/integrations/reporting_channels/email/message.py similarity index 62% rename from integrations/reporting_channels/gmail/message.py rename to integrations/reporting_channels/email/message.py index 4a9f85d3..eabb6aaf 100644 --- a/integrations/reporting_channels/gmail/message.py +++ b/integrations/reporting_channels/email/message.py @@ -3,14 +3,16 @@ import re import time import os -from email.header import decode_header +from email.header import decode_header, make_header +from imaplib import ParseFlags class Message(): - "Message class provides methods for mail functions." + + def __init__(self, mailbox, uid): self.uid = uid self.mailbox = mailbox - self.gmail = mailbox.gmail if mailbox else None + self.email = mailbox.email if mailbox else None self.message = None self.headers = {} @@ -35,17 +37,19 @@ def __init__(self, mailbox, uid): self.attachments = None + + def is_read(self): return ('\\Seen' in self.flags) def read(self): flag = '\\Seen' - self.gmail.imap.uid('STORE', self.uid, '+FLAGS', flag) + self.email.imap.uid('STORE', self.uid, '+FLAGS', flag) if flag not in self.flags: self.flags.append(flag) def unread(self): flag = '\\Seen' - self.gmail.imap.uid('STORE', self.uid, '-FLAGS', flag) + self.email.imap.uid('STORE', self.uid, '-FLAGS', flag) if flag in self.flags: self.flags.remove(flag) def is_starred(self): @@ -53,12 +57,12 @@ def is_starred(self): def star(self): flag = '\\Flagged' - self.gmail.imap.uid('STORE', self.uid, '+FLAGS', flag) + self.email.imap.uid('STORE', self.uid, '+FLAGS', flag) if flag not in self.flags: self.flags.append(flag) def unstar(self): flag = '\\Flagged' - self.gmail.imap.uid('STORE', self.uid, '-FLAGS', flag) + self.email.imap.uid('STORE', self.uid, '-FLAGS', flag) if flag in self.flags: self.flags.remove(flag) def is_draft(self): @@ -70,36 +74,40 @@ def has_label(self, label): def add_label(self, label): full_label = '%s' % label - self.gmail.imap.uid('STORE', self.uid, '+X-GM-LABELS', full_label) + self.email.imap.uid('STORE', self.uid, '+X-GM-LABELS', full_label) if full_label not in self.labels: self.labels.append(full_label) def remove_label(self, label): full_label = '%s' % label - self.gmail.imap.uid('STORE', self.uid, '-X-GM-LABELS', full_label) + self.email.imap.uid('STORE', self.uid, '-X-GM-LABELS', full_label) if full_label in self.labels: self.labels.remove(full_label) + def is_deleted(self): return ('\\Deleted' in self.flags) def delete(self): flag = '\\Deleted' - self.gmail.imap.uid('STORE', self.uid, '+FLAGS', flag) + self.email.imap.uid('STORE', self.uid, '+FLAGS', flag) if flag not in self.flags: self.flags.append(flag) - trash = '[Gmail]/Trash' if '[Gmail]/Trash' in self.gmail.labels() else '[Gmail]/Bin' + trash = '[Gmail]/Trash' if '[Gmail]/Trash' in self.email.labels() else '[Gmail]/Bin' if self.mailbox.name not in ['[Gmail]/Bin', '[Gmail]/Trash']: self.move_to(trash) # def undelete(self): # flag = '\\Deleted' - # self.gmail.imap.uid('STORE', self.uid, '-FLAGS', flag) + # self.email.imap.uid('STORE', self.uid, '-FLAGS', flag) # if flag in self.flags: self.flags.remove(flag) + def move_to(self, name): - self.gmail.copy(self.uid, name, self.mailbox.name) + self.email.copy(self.uid, name, self.mailbox.name) if name not in ['[Gmail]/Bin', '[Gmail]/Trash']: self.delete() + + def archive(self): self.move_to('[Gmail]/All Mail') @@ -149,7 +157,7 @@ def parse_subject(self, encoded_subject): subject_parts.append(part) parsed_subject = ''.join(subject_parts) return parsed_subject - + def parse(self, raw_message): raw_headers = raw_message[0] raw_email = raw_message[1] @@ -158,7 +166,7 @@ def parse(self, raw_message): raw_headers = raw_headers.decode('utf-8', errors='replace') if isinstance(raw_email, bytes): - raw_email = raw_email.decode('utf-8', errors='replace') + raw_email = raw_email.decode('utf-8', errors='replace') if not isinstance(raw_email, str): raise ValueError("Decoded raw_email is not a string") @@ -188,10 +196,11 @@ def parse(self, raw_message): self.flags = self.parse_flags(raw_headers) self.labels = self.parse_labels(raw_headers) - if re.search(r'X-GM-THRID (\d+)', raw_headers): - self.thread_id = re.search(r'X-GM-THRID (\d+)', raw_headers).groups(1)[0] + thread_match = re.search(r'X-GM-THRID (\d+)', raw_headers) + if thread_match: + self.thread_id = thread_match.group(1) if re.search(r'X-GM-MSGID (\d+)', raw_headers): - self.message_id = re.search(r'X-GM-MSGID (\d+)', raw_headers).groups(1)[0] + self.message_id = re.search(r'X-GM-MSGID (\d+)', raw_headers).groups(1) self.attachments = [ Attachment(attachment) for attachment in self.message.get_payload() @@ -200,40 +209,76 @@ def parse(self, raw_message): def fetch(self): if not self.message: - response, results = self.gmail.imap.uid('FETCH', self.uid, '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)') - self.parse(results[0]) + check_gmail_host = 'gmail.com' in self.email.IMAP_HOST + + if check_gmail_host: + response, results = self.email.imap.uid('FETCH', self.uid, '(UID BODY.PEEK[] X-GM-THRID)') + self.parse(results[0]) + + else: + response, results = self.email.imap.uid('FETCH', self.uid, '(UID BODY.PEEK[])') + self.parse(results[0]) + return self.message - # returns a list of fetched messages (both sent and received) in chronological order + def fetch_thread(self): self.fetch() original_mailbox = self.mailbox - self.gmail.use_mailbox(original_mailbox.name) - - # fetch and cache messages from inbox or other received mailbox - response, results = self.gmail.imap.uid('SEARCH', None, '(X-GM-THRID ' + self.thread_id + ')') - received_messages = {} - uids = results[0].split(' ') - if response == 'OK': - for uid in uids: received_messages[uid] = Message(original_mailbox, uid) - self.gmail.fetch_multiple_messages(received_messages) - self.mailbox.messages.update(received_messages) - - # fetch and cache messages from 'sent' - self.gmail.use_mailbox('[Gmail]/Sent Mail') - response, results = self.gmail.imap.uid('SEARCH', None, '(X-GM-THRID ' + self.thread_id + ')') - sent_messages = {} - uids = results[0].split(' ') - if response == 'OK': - for uid in uids: sent_messages[uid] = Message(self.gmail.mailboxes['[Gmail]/Sent Mail'], uid) - self.gmail.fetch_multiple_messages(sent_messages) - self.gmail.mailboxes['[Gmail]/Sent Mail'].messages.update(sent_messages) - - self.gmail.use_mailbox(original_mailbox.name) - return sorted(dict(received_messages.items() + sent_messages.items()).values(), key=lambda m: m.sent_at) + self.email.use_mailbox(original_mailbox.name) + + combined_messages = {} + + # Check whether it's Gmail or Outlook based on IMAP host + check_gmail_host = 'gmail.com' in self.email.IMAP_HOST + + if check_gmail_host: + # Gmail - Use X-GM-THRID for fetching the thread + response, results = self.email.imap.uid('SEARCH', None, f'(X-GM-THRID {self.thread_id})') + else: + # Outlook - Use Message-ID and References to fetch the thread + response, results = self.email.imap.uid('FETCH', self.uid, '(UID BODY[HEADER.FIELDS (References)])') + if response == 'OK' and results: + headers = results[0][1].decode('utf-8') + message_id_match = re.search(r'References:\s*(.*)', headers) + if message_id_match: + message_id = message_id_match.group(1).strip() + response, results = self.email.imap.uid('SEARCH', None, f'(HEADER References "{message_id}")') + + # Common processing for received messages + if response == 'OK' and results and results[0]: + uids = results[0].decode('utf-8').split(' ') + received_messages = {uid: Message(original_mailbox, uid) for uid in uids} + self.email.fetch_multiple_messages(received_messages) + combined_messages.update(received_messages) + else: + print(f"No received messages found with thread ID: {self.thread_id} in {self.email.current_mailbox}.") + + # Fetch messages from the sent mail folder for both Gmail and Outlook + sent_mailbox = '"[Gmail]/Sent Mail"' if check_gmail_host else 'Sent' + self.email.use_mailbox(sent_mailbox) + + search_criteria = f'(X-GM-THRID {self.thread_id})' if check_gmail_host else f'(HEADER Message-ID "{message_id}")' + response, results = self.email.imap.uid('SEARCH', None, search_criteria) + if response == 'OK' and results and results[0]: + uids = results[0].decode('utf-8').split(' ') + sent_messages = {uid: Message(self.email.mailboxes[sent_mailbox], uid) for uid in uids} + self.email.fetch_multiple_messages(sent_messages) + combined_messages.update(sent_messages) + else: + print(f"No sent messages found in {sent_mailbox}.") + + self.email.use_mailbox(original_mailbox.name) + # Combine and sort messages if any were found + if combined_messages: + sorted_messages = sorted(combined_messages.values(), key=lambda m: m.sent_at) + return sorted_messages + else: + print("No messages found in the thread.") + return None class Attachment: - "Attachment class methods for email attachment." + def __init__(self, attachment): self.name = attachment.get_filename() # Raw file data diff --git a/integrations/reporting_channels/gmail/utf.py b/integrations/reporting_channels/email/utf.py similarity index 90% rename from integrations/reporting_channels/gmail/utf.py rename to integrations/reporting_channels/email/utf.py index 8fa0b84d..67c08f74 100644 --- a/integrations/reporting_channels/gmail/utf.py +++ b/integrations/reporting_channels/email/utf.py @@ -1,4 +1,3 @@ -""" # The contents of this file has been derived code from the Twisted project # (http://twistedmatrix.com/). The original author is Jp Calderone. @@ -22,9 +21,9 @@ # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -""" -TEXT_TYPE = str -BINARY_TYPE = bytes + +text_type = str +binary_type = bytes PRINTABLE = set(range(0x20, 0x26)) | set(range(0x27, 0x7f)) @@ -33,7 +32,7 @@ def encode(s): Despite the function's name, the output is still a unicode string. """ - if not isinstance(s, TEXT_TYPE): + if not isinstance(s, text_type): return s r = [] @@ -64,9 +63,9 @@ def decode(s): Despite the function's name, the input may still be a unicode string. If the input is bytes, it's first decoded to unicode. """ - if isinstance(s, BINARY_TYPE): + if isinstance(s, binary_type): s = s.decode('latin-1') - if not isinstance(s, TEXT_TYPE): + if not isinstance(s, text_type): return s r = [] @@ -90,13 +89,11 @@ def decode(s): return ''.join(r) def modified_utf7(s): - "Convert a string to modified UTF-7 encoding." # encode to utf-7: '\xff' => b'+AP8-', decode from latin-1 => '+AP8-' s_utf7 = s.encode('utf-7').decode('latin-1') return s_utf7[1:-1].replace('/', ',') def modified_deutf7(s): - "Convert a modified UTF-7 encoded string back to UTF-8." s_utf7 = '+' + s.replace(',', '/') + '-' # encode to latin-1: '+AP8-' => b'+AP8-', decode from utf-7 => '\xff' - return s_utf7.encode('latin-1').decode('utf-7') + return s_utf7.encode('latin-1').decode('utf-7') \ No newline at end of file diff --git a/integrations/reporting_channels/email/utils.py b/integrations/reporting_channels/email/utils.py new file mode 100644 index 00000000..fbba512e --- /dev/null +++ b/integrations/reporting_channels/email/utils.py @@ -0,0 +1,13 @@ + + +from .email import Email + +def login(username, password): + email = Email() + email.login(username, password) + return email + +def authenticate(username, access_token): + email = Email() + email.authenticate(username, access_token) + return email \ No newline at end of file diff --git a/integrations/reporting_channels/gmail/exceptions.py b/integrations/reporting_channels/gmail/exceptions.py deleted file mode 100644 index 3306d87e..00000000 --- a/integrations/reporting_channels/gmail/exceptions.py +++ /dev/null @@ -1,23 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -gmail.exceptions -~~~~~~~~~~~~~~~~~~~ - -This module contains the set of Gmails' exceptions. - -""" - - -class GmailException(RuntimeError): - """There was an ambiguous exception that occurred while handling your - request.""" - -class ConnectionError(GmailException): - """A Connection error occurred.""" - -class AuthenticationError(GmailException): - """Gmail Authentication failed.""" - -class Timeout(GmailException): - """The request timed out.""" diff --git a/integrations/reporting_channels/gmail/gmailtest.py b/integrations/reporting_channels/gmail/gmailtest.py deleted file mode 100644 index 7f2799da..00000000 --- a/integrations/reporting_channels/gmail/gmailtest.py +++ /dev/null @@ -1,84 +0,0 @@ -""" -This is an example automated test to check gmail utils -Our automated test will do the following: - #login to gmail and fetch mailboxes - #After fetching the mail box ,select and fetch messages and print the number of messages - #and the subject of the messages - -Prerequisites: - - Gmail account with app password -""" -import sys -import os -import io -from dotenv import load_dotenv -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))) -from integrations.reporting_channels.gmail.gmail import Gmail, AuthenticationError -from integrations.reporting_channels.gmail.mailbox import Mailbox -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') - -load_dotenv() - -def gmail_test(): - "Run the Gmail utility test" - try: - # 1. Initialize Gmail object and connect - gmail = Gmail() - gmail.connect() - print("Connected to Gmail") - - # 2. Login to Gmail - username = os.getenv('app_username') - password = os.getenv('app_password') - - try: - gmail.login(username, password) - print("Login successful") - except AuthenticationError as e: - print(f"Authentication failed: Check for the login credentials {str(e)}") - return - - # 3. Fetch mailboxes - mailboxes = gmail.fetch_mailboxes() - if mailboxes: - print(f"Fetched mailboxes: {mailboxes}") - else: - raise ValueError("Failed to fetch mailboxes!") - - # 4. Select and fetch messages from SPAM mailbox - inbox_mailbox = gmail.use_mailbox("[Gmail]/Spam") - if isinstance(inbox_mailbox, Mailbox): - print("SPAM mailbox selected successfully") - else: - raise TypeError(f"Error: Expected Mailbox instance, got {type(inbox_mailbox)}") - - # 5. Fetch and print messages from SPAM - messages = inbox_mailbox.mail() - print(f"Number of messages in SPAM: {len(messages)}") - - # 6. Fetch and print message subjects - if messages: - msg = messages[0] - fetched_msg = msg.fetch() - print(f"Fetching Message subject: {fetched_msg.get('subject')}") - - # Fetch multiple messages and log subjects - messages_dict = {msg.uid.decode('utf-8'): msg for msg in messages} - fetched_messages = gmail.fetch_multiple_messages(messages_dict) - for uid, message in fetched_messages.items(): - subject = getattr(message, 'subject', 'No subject') - print(f"UID: {uid}, Subject: {subject.encode('utf-8', errors='replace').decode('utf-8')}") - - else: - print("No messages found in SPAM") - - # 7. Logout - gmail.logout() - print("Logged out successfully") - - except Exception as e: - print(f"Exception encountered: {str(e)}") - raise - -if __name__ == "__main__": - gmail_test() diff --git a/integrations/reporting_channels/gmail/mailbox.py b/integrations/reporting_channels/gmail/mailbox.py deleted file mode 100644 index e31964ad..00000000 --- a/integrations/reporting_channels/gmail/mailbox.py +++ /dev/null @@ -1,141 +0,0 @@ -""" -This module defines the `Mailbox` class, which represents a mailbox in a Gmail account. -The class provides methods to interact with the mailbox, including searching for emails -based on various criteria, fetching email threads, counting emails, and managing cached -messages. -""" - -import re -from .message import Message -from .utf import encode as encode_utf7, decode as decode_utf7 - -class Mailbox(): - "Mailbox class provides methods for email operations." - def __init__(self, gmail, name="INBOX"): - self.name = name - self.gmail = gmail - self.date_format = "%d-%b-%Y" - self.messages = {} - - @property - def external_name(self): - "Encodes the name to IMAP modified UTF-7 format." - if "external_name" not in vars(self): - vars(self)["external_name"] = encode_utf7(self.name) - return vars(self)["external_name"] - - @external_name.setter - def external_name(self, value): - "Decodes and sets the mailbox name from IMAP modified UTF-7 format." - if "external_name" in vars(self): - del vars(self)["external_name"] - self.name = decode_utf7(value) - - def mail(self, prefetch=False, **kwargs): - "Searches and returns a list of emails matching the specified search criteria." - search = ['ALL'] - - if kwargs.get('read'): - search.append('SEEN') - if kwargs.get('unread'): - search.append('UNSEEN') - - if kwargs.get('starred'): - search.append('FLAGGED') - if kwargs.get('unstarred'): - search.append('UNFLAGGED') - - if kwargs.get('deleted'): - search.append('DELETED') - if kwargs.get('undeleted'): - search.append('UNDELETED') - - if kwargs.get('draft'): - search.append('DRAFT') - if kwargs.get('undraft'): - search.append('UNDRAFT') - - if kwargs.get('before'): - search.extend(['BEFORE', kwargs.get('before').strftime(self.date_format)]) - if kwargs.get('after'): - search.extend(['SINCE', kwargs.get('after').strftime(self.date_format)]) - if kwargs.get('on'): - search.extend(['ON', kwargs.get('on').strftime(self.date_format)]) - - if kwargs.get('header'): - search.extend(['HEADER', kwargs.get('header')[0], kwargs.get('header')[1]]) - - if kwargs.get('sender'): - search.extend(['FROM', kwargs.get('sender')]) - if kwargs.get('fr'): - search.extend(['FROM', kwargs.get('fr')]) - if kwargs.get('to'): - search.extend(['TO', kwargs.get('to')]) - if kwargs.get('cc'): - search.extend(['CC', kwargs.get('cc')]) - - if kwargs.get('subject'): - search.extend(['SUBJECT', kwargs.get('subject')]) - if kwargs.get('body'): - search.extend(['BODY', kwargs.get('body')]) - - if kwargs.get('label'): - search.extend(['X-GM-LABELS', kwargs.get('label')]) - if kwargs.get('attachment'): - search.extend(['HAS', 'attachment']) - - if kwargs.get('query'): - search.extend([kwargs.get('query')]) - - emails = [] - search_criteria = ' '.join(search).encode('utf-8') - - response, data = self.gmail.imap.uid('SEARCH', None, search_criteria) - if response == 'OK': - uids = filter(None, data[0].split(b' ')) # filter out empty strings - - for uid in uids: - if not self.messages.get(uid): - self.messages[uid] = Message(self, uid) - emails.append(self.messages[uid]) - - if prefetch and emails: - messages_dict = {} - for email in emails: - messages_dict[email.uid] = email - self.messages.update(self.gmail.fetch_multiple_messages(messages_dict)) - - return emails - - # WORK IN PROGRESS. NOT FOR ACTUAL USE - def threads(self, prefetch=False): - "Fetches email threads from the mailbox." - emails = [] - response, data = self.gmail.imap.uid('SEARCH', None, 'ALL'.encode('utf-8')) - if response == 'OK': - uids = data[0].split(b' ') - - for uid in uids: - if not self.messages.get(uid): - self.messages[uid] = Message(self, uid) - emails.append(self.messages[uid]) - - if prefetch: - fetch_str = ','.join(uids).encode('utf-8') - response, results = self.gmail.imap.uid('FETCH', fetch_str, - '(BODY.PEEK[] FLAGS X-GM-THRID X-GM-MSGID X-GM-LABELS)') - for index in range(len(results) - 1): - raw_message = results[index] - if re.search(rb'UID (\d+)', raw_message[0]): - uid = re.search(rb'UID (\d+)', raw_message[0]).groups(1)[0] - self.messages[uid].parse(raw_message) - - return emails - - def count(self, **kwargs): - "Returns the length of emails matching the specified search criteria." - return len(self.mail(**kwargs)) - - def cached_messages(self): - "Returns a dictionary of cached messages in the mailbox" - return self.messages diff --git a/integrations/reporting_channels/gmail/utils.py b/integrations/reporting_channels/gmail/utils.py deleted file mode 100644 index 50676e79..00000000 --- a/integrations/reporting_channels/gmail/utils.py +++ /dev/null @@ -1,13 +0,0 @@ - - -from .gmail import Gmail - -def login(username, password): - gmail = Gmail() - gmail.login(username, password) - return gmail - -def authenticate(username, access_token): - gmail = Gmail() - gmail.authenticate(username, access_token) - return gmail \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index 4df9b3fa..aa3fd080 100644 --- a/pytest.ini +++ b/pytest.ini @@ -6,4 +6,5 @@ markers = API: mark a test as part of the API regression suite MOBILE: mark a test as part of the MOBILE regression suite ACCESSIBILITY: mark a test as part of the ACCESSIBILITY suite + EMAIL: mark as part of the EMAIL utils regression suite junit_family=xunit2 diff --git a/tests/test_email_utils.py b/tests/test_email_utils.py new file mode 100644 index 00000000..f68dcb17 --- /dev/null +++ b/tests/test_email_utils.py @@ -0,0 +1,105 @@ +""" +This is an example automated test to check email utils +Our automated test will do the following: + #login to email and fetch mailboxes + #After fetching the mail box ,select and fetch messages and print the number of messages and the subject of the messages + +Prerequisites: + - Email account with app password +""" +import os +import sys +import pytest +from integrations.reporting_channels.email.email import Email +from integrations.reporting_channels.email.mailbox import Mailbox +from dotenv import load_dotenv +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +load_dotenv() + +@pytest.mark.EMAIL +def test_email_util(): + try: + # Initialize flags for test summary + expected_pass = 0 + actual_pass = 0 + + # Initialize Gmail class instance + email = Email() + email.connect() + + username = os.getenv('app_username') + password = os.getenv('app_password') + + # Attempt to log in + result_flag = email.login(username, password) + assert result_flag, "Login failed!" + print("Login successful!") + expected_pass += 1 + actual_pass += 1 + + # Fetch and print mailboxes + mailboxes = email.fetch_mailboxes() + assert mailboxes, "Failed to fetch mailboxes!" + print(f"Fetched mailboxes: {mailboxes}") + expected_pass += 1 + actual_pass += 1 + + # Select the INBOX mailbox + inbox_mailbox = email.use_mailbox('Inbox') + assert isinstance(inbox_mailbox, Mailbox), f"Error: Expected Mailbox instance, got {type(inbox_mailbox)}." + print("INBOX selected successfully!") + expected_pass += 1 + actual_pass += 1 + + # Fetch and print number of messages in INBOX + messages = inbox_mailbox.mail() + print(f"Number of messages in Inbox: {len(messages)}") + expected_pass += 1 + actual_pass += 1 + + if messages: + # Fetch and print the subject of the first message + msg = messages[0] + fetched_msg = msg.fetch() + assert 'subject' in fetched_msg, "Subject not found in fetched message!" + print(f"Fetching Message subject from test script: {fetched_msg.get('subject')}") + expected_pass += 1 + actual_pass += 1 + + msg.fetch() + thread_messages = msg.fetch_thread() + if thread_messages: + print(f"Number of messages in Thread: {len(thread_messages)}") + for message in thread_messages: + subject = getattr(message, 'subject', 'No subject attribute') + body = getattr(message, 'body', 'no body message found') + print(f"Thread message subject: {subject}") + else: + print("No messages found in thread.") + + # Fetch and print subjects of multiple messages + messages_dict = {msg.uid.decode('utf-8'): msg for msg in messages} + fetched_messages = email.fetch_multiple_messages(messages_dict) + assert fetched_messages, "Failed to fetch multiple messages!" + expected_pass += 1 + actual_pass += 1 + + for uid, message in fetched_messages.items(): + subject = getattr(message, 'subject', 'No subject attribute') + print(f"UID: {uid}, Email Subject: {subject}") + else: + print("No messages found in INBOX.") + + except Exception as e: + print("Exception when trying to run test: %s" %__file__) + print("Python says: %s" % str(e)) + + finally: + # logout + email.logout() + print("Logged out!") + expected_pass += 1 + actual_pass += 1 + + assert expected_pass == actual_pass, "Test failed: %s" %__file__ \ No newline at end of file