diff --git a/docs/source/usage.md b/docs/source/usage.md index 3bd0487c..81f0fa56 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -166,6 +166,9 @@ The full set of configuration options are: - `check_timeout` - int: Number of seconds to wait for a IMAP IDLE response or the number of seconds until the next mail check (Default: `30`) + - `since` - str: Search for messages since certain time. (Examples: `5m|3h|2d|1w`) + Acceptable units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}). + Defaults to `1d` if incorrect value is provided. - `imap` - `host` - str: The IMAP server hostname or IP address - `port` - int: The IMAP server port (Default: `993`) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 7ababcf0..a83b200b 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -17,7 +17,7 @@ from base64 import b64decode from collections import OrderedDict from csv import DictWriter -from datetime import datetime +from datetime import datetime, timedelta from io import BytesIO, StringIO from typing import Callable @@ -28,7 +28,8 @@ from mailsuite.smtp import send_email from parsedmarc.log import logger -from parsedmarc.mail import MailboxConnection +from parsedmarc.mail import MailboxConnection, IMAPConnection, \ + MSGraphConnection, GmailConnection from parsedmarc.utils import get_base_domain, get_ip_address_info from parsedmarc.utils import is_outlook_msg, convert_outlook_msg from parsedmarc.utils import parse_email @@ -1483,24 +1484,23 @@ def get_dmarc_reports_from_mbox( ) -def get_dmarc_reports_from_mailbox( - connection: MailboxConnection, - reports_folder="INBOX", - archive_folder="Archive", - delete=False, - test=False, - ip_db_path=None, - always_use_local_files=False, - reverse_dns_map_path=None, - reverse_dns_map_url=None, - offline=False, - nameservers=None, - dns_timeout=6.0, - strip_attachment_payloads=False, - results=None, - batch_size=10, - create_folders=True, -): +def get_dmarc_reports_from_mailbox(connection: MailboxConnection, + reports_folder="INBOX", + archive_folder="Archive", + delete=False, + test=False, + ip_db_path=None, + always_use_local_files=False, + reverse_dns_map_path=None, + reverse_dns_map_url=None, + offline=False, + nameservers=None, + dns_timeout=6.0, + strip_attachment_payloads=False, + results=None, + batch_size=10, + since=None, + create_folders=True): """ Fetches and parses DMARC reports from a mailbox @@ -1522,6 +1522,8 @@ def get_dmarc_reports_from_mailbox( results (dict): Results from the previous run batch_size (int): Number of messages to read and process before saving (use 0 for no limit) + since: Search for messages since certain time + (units - {"m":"minutes", "h":"hours", "d":"days", "w":"weeks"}) create_folders (bool): Whether to create the destination folders (not used in watch) @@ -1534,6 +1536,9 @@ def get_dmarc_reports_from_mailbox( if connection is None: raise ValueError("Must supply a connection") + # current_time useful to fetch_messages later in the program + current_time = None + aggregate_reports = [] forensic_reports = [] smtp_tls_reports = [] @@ -1557,11 +1562,48 @@ def get_dmarc_reports_from_mailbox( connection.create_folder(smtp_tls_reports_folder) connection.create_folder(invalid_reports_folder) - messages = connection.fetch_messages(reports_folder, batch_size=batch_size) + if since: + _since = 1440 # default one day + if re.match(r'\d+[mhd]$', since): + s = re.split(r'(\d+)', since) + if s[2] == 'm': + _since = int(s[1]) + elif s[2] == 'h': + _since = int(s[1])*60 + elif s[2] == 'd': + _since = int(s[1])*60*24 + elif s[2] == 'w': + _since = int(s[1])*60*24*7 + else: + logger.warning("Incorrect format for \'since\' option. \ + Provided value:{0}, Expected values:(5m|3h|2d|1w). \ + Ignoring option, fetching messages for last 24hrs" \ + "SMTP does not support a time or timezone in since." \ + "See https://www.rfc-editor.org/rfc/rfc3501#page-52" + .format(since)) + + if isinstance(connection, IMAPConnection): + logger.debug("Only days and weeks values in \'since\' option are \ + considered for IMAP conections. Examples: 2d or 1w") + since = (datetime.utcnow() - timedelta(minutes=_since)).date() + current_time = datetime.utcnow().date() + elif isinstance(connection, MSGraphConnection): + since = (datetime.utcnow() - timedelta(minutes=_since)) \ + .isoformat() + 'Z' + current_time = datetime.utcnow().isoformat() + 'Z' + elif isinstance(connection, GmailConnection): + since = (datetime.utcnow() - timedelta(minutes=_since)) \ + .strftime('%s') + current_time = datetime.utcnow().strftime('%s') + else: + pass + + messages = connection.fetch_messages(reports_folder, batch_size=batch_size, + since=since) total_messages = len(messages) logger.debug("Found {0} messages in {1}".format(len(messages), reports_folder)) - if batch_size: + if batch_size and not since: message_limit = min(total_messages, batch_size) else: message_limit = total_messages @@ -1570,12 +1612,18 @@ def get_dmarc_reports_from_mailbox( for i in range(message_limit): msg_uid = messages[i] - logger.debug( - "Processing message {0} of {1}: UID {2}".format( - i + 1, message_limit, msg_uid - ) - ) - msg_content = connection.fetch_message(msg_uid) + logger.debug("Processing message {0} of {1}: UID {2}".format( + i+1, message_limit, msg_uid + )) + if isinstance(mailbox, MSGraphConnection): + if test: + msg_content = connection.fetch_message(msg_uid, + mark_read=False) + else: + msg_content = connection.fetch_message(msg_uid, + mark_read=True) + else: + msg_content = connection.fetch_message(msg_uid) try: sa = strip_attachment_payloads parsed_email = parse_report_email( @@ -1706,7 +1754,11 @@ def get_dmarc_reports_from_mailbox( ] ) - total_messages = len(connection.fetch_messages(reports_folder)) + if current_time: + total_messages = len(connection.fetch_messages(reports_folder, + since=current_time)) + else: + total_messages = len(connection.fetch_messages(reports_folder)) if not test and not batch_size and total_messages > 0: # Process emails that came in during the last run @@ -1725,6 +1777,7 @@ def get_dmarc_reports_from_mailbox( reverse_dns_map_path=reverse_dns_map_path, reverse_dns_map_url=reverse_dns_map_url, offline=offline, + since=current_time, ) return results diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 64fca398..7e7b2275 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -510,6 +510,7 @@ def process_reports(reports_): mailbox_test=False, mailbox_batch_size=10, mailbox_check_timeout=30, + mailbox_since=None, imap_host=None, imap_skip_certificate_verification=False, imap_ssl=True, @@ -713,7 +714,10 @@ def process_reports(reports_): if "batch_size" in mailbox_config: opts.mailbox_batch_size = mailbox_config.getint("batch_size") if "check_timeout" in mailbox_config: - opts.mailbox_check_timeout = mailbox_config.getint("check_timeout") + opts.mailbox_check_timeout = mailbox_config.getint( + "check_timeout") + if "since" in mailbox_config: + opts.mailbox_since = mailbox_config["since"] if "imap" in config.sections(): imap_config = config["imap"] @@ -1540,6 +1544,7 @@ def process_reports(reports_): nameservers=opts.nameservers, test=opts.mailbox_test, strip_attachment_payloads=opts.strip_attachment_payloads, + since=opts.mailbox_since, ) aggregate_reports += reports["aggregate_reports"] diff --git a/parsedmarc/mail/gmail.py b/parsedmarc/mail/gmail.py index b426746f..6f812b36 100644 --- a/parsedmarc/mail/gmail.py +++ b/parsedmarc/mail/gmail.py @@ -69,18 +69,33 @@ def create_folder(self, folder_name: str): else: raise e - def _fetch_all_message_ids(self, reports_label_id, page_token=None): - results = ( - self.service.users() - .messages() - .list( - userId="me", - includeSpamTrash=self.include_spam_trash, - labelIds=[reports_label_id], - pageToken=page_token, + def _fetch_all_message_ids(self, reports_label_id, page_token=None, + since=None): + if since: + results = ( + self.service.users() + .messages() + .list( + userId="me", + includeSpamTrash=self.include_spam_trash, + labelIds=[reports_label_id], + pageToken=page_token, + q=f'after:{since}', + ) + .execute() + ) + else: + results = ( + self.service.users() + .messages() + .list( + userId="me", + includeSpamTrash=self.include_spam_trash, + labelIds=[reports_label_id], + pageToken=page_token, + ) + .execute() ) - .execute() - ) messages = results.get("messages", []) for message in messages: yield message["id"] @@ -92,7 +107,12 @@ def _fetch_all_message_ids(self, reports_label_id, page_token=None): def fetch_messages(self, reports_folder: str, **kwargs) -> List[str]: reports_label_id = self._find_label_id_for_label(reports_folder) - return [id for id in self._fetch_all_message_ids(reports_label_id)] + since = kwargs.get('since') + if since: + return [id for id in self._fetch_all_message_ids(reports_label_id, + since=since)] + else: + return [id for id in self._fetch_all_message_ids(reports_label_id)] def fetch_message(self, message_id): msg = ( diff --git a/parsedmarc/mail/graph.py b/parsedmarc/mail/graph.py index f022ca25..92e032ac 100644 --- a/parsedmarc/mail/graph.py +++ b/parsedmarc/mail/graph.py @@ -146,16 +146,24 @@ def create_folder(self, folder_name: str): def fetch_messages(self, folder_name: str, **kwargs) -> List[str]: """Returns a list of message UIDs in the specified folder""" folder_id = self._find_folder_id_from_folder_path(folder_name) - url = f"/users/{self.mailbox_name}/mailFolders/" f"{folder_id}/messages" - batch_size = kwargs.get("batch_size") + url = f'/users/{self.mailbox_name}/mailFolders/' \ + f'{folder_id}/messages' + since = kwargs.get('since') + if not since: + since = None + batch_size = kwargs.get('batch_size') if not batch_size: batch_size = 0 - emails = self._get_all_messages(url, batch_size) - return [email["id"] for email in emails] + emails = self._get_all_messages(url, batch_size, since) + return [email['id'] for email in emails] - def _get_all_messages(self, url, batch_size): + def _get_all_messages(self, url, batch_size, since): messages: list - params = {"$select": "id"} + params = { + '$select': 'id' + } + if since: + params['$filter'] = f'receivedDateTime ge {since}' if batch_size and batch_size > 0: params["$top"] = batch_size else: @@ -165,10 +173,11 @@ def _get_all_messages(self, url, batch_size): raise RuntimeError(f"Failed to fetch messages {result.text}") messages = result.json()["value"] # Loop if next page is present and not obtained message limit. - while "@odata.nextLink" in result.json() and ( - batch_size == 0 or batch_size - len(messages) > 0 - ): - result = self._client.get(result.json()["@odata.nextLink"]) + while '@odata.nextLink' in result.json() and ( + since is not None or ( + batch_size == 0 or + batch_size - len(messages) > 0)): + result = self._client.get(result.json()['@odata.nextLink']) if result.status_code != 200: raise RuntimeError(f"Failed to fetch messages {result.text}") messages.extend(result.json()["value"]) @@ -183,14 +192,15 @@ def mark_message_read(self, message_id: str): f"Failed to mark message read" f"{resp.status_code}: {resp.json()}" ) - def fetch_message(self, message_id: str): - url = f"/users/{self.mailbox_name}/messages/{message_id}/$value" + def fetch_message(self, message_id: str, **kwargs): + url = f'/users/{self.mailbox_name}/messages/{message_id}/$value' result = self._client.get(url) if result.status_code != 200: - raise RuntimeWarning( - f"Failed to fetch message" f"{result.status_code}: {result.json()}" - ) - self.mark_message_read(message_id) + raise RuntimeWarning(f"Failed to fetch message" + f"{result.status_code}: {result.json()}") + mark_read = kwargs.get('mark_read') + if mark_read: + self.mark_message_read(message_id) return result.text def delete_message(self, message_id: str): diff --git a/parsedmarc/mail/imap.py b/parsedmarc/mail/imap.py index 403bbeb7..2a85c655 100644 --- a/parsedmarc/mail/imap.py +++ b/parsedmarc/mail/imap.py @@ -39,7 +39,11 @@ def create_folder(self, folder_name: str): def fetch_messages(self, reports_folder: str, **kwargs): self._client.select_folder(reports_folder) - return self._client.search() + since = kwargs.get('since') + if since: + return self._client.search([u'SINCE', since]) + else: + return self._client.search() def fetch_message(self, message_id): return self._client.fetch_message(message_id, parse=False)