From 79e39576a4f85286a585c27b2d6bcf966661891b Mon Sep 17 00:00:00 2001 From: Paul Hutelmyer Date: Thu, 4 Jan 2024 10:27:15 -0500 Subject: [PATCH] Update scan_pdf.py --- src/python/strelka/scanners/scan_pdf.py | 83 +++++++++++++------------ 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/src/python/strelka/scanners/scan_pdf.py b/src/python/strelka/scanners/scan_pdf.py index a4ae8bb0..d94a1567 100644 --- a/src/python/strelka/scanners/scan_pdf.py +++ b/src/python/strelka/scanners/scan_pdf.py @@ -1,15 +1,3 @@ -""" -This module contains a scanner for extracting metadata and files from PDF files. - -Resources: -- https://pymupdf.readthedocs.io/en/latest/index.html -- https://www.osti.gov/servlets/purl/1030303 - -Requirements: -- PyMuPDF -""" - - import io import re from collections import Counter @@ -19,46 +7,61 @@ from strelka import strelka -# Hide PyMuPDF warnings +# Suppress PyMuPDF warnings fitz.TOOLS.mupdf_display_errors(False) -# Regex to extract phone numbers from PDF file -phone_numbers = re.compile( - r"[+]?(?:\d{1,2})?\s?\(?\d{3}\)?[\s.-]\d{3}[\s.-]\d{2,4}?-?\d{2,4}?", - flags=0, +# Regular expression for extracting phone numbers from PDFs +PHONE_NUMBERS_REGEX = re.compile( + r"[+]?(?:\d{1,2})?\s?\(?\d{3}\)?[\s.-][\n]?\d{3}[\s.-][\n]?\d{2,4}?-?\d{2,4}?", + flags=re.IGNORECASE, ) class ScanPdf(strelka.Scanner): """ - A scanner that collects metadata and extracts files from PDF files. + Extracts metadata, embedded files, images, and text from PDF files. + + This scanner utilizes PyMuPDF to parse PDF files, extracting various types of data, + including metadata, embedded files, images, and textual content. Phone numbers and + URLs within the document are also extracted and reported. """ @staticmethod def _convert_timestamp(timestamp): """ - Converts a date string to a DateTime object, sets the timezone to UTC, and returns it as an ISO string. + Converts a PDF timestamp string to an ISO 8601 formatted string. + + PDF timestamps are typically in the 'D:%Y%m%d%H%M%S%z' format. This function + converts them to a more standard ISO 8601 format. Args: - timestamp (str): A date string in the format 'D:%Y%m%d%H%M%S%z'. + timestamp: A string representing the timestamp in PDF format. Returns: - str: An ISO-formatted date string in the format '%Y-%m-%dT%H:%M:%SZ'. + An ISO 8601 formatted timestamp string, or None if conversion fails. """ - try: - # Date string is converted to DateTime, timezone is set to UTC, and returned as ISO string return ( datetime.strptime(timestamp.replace("'", ""), "D:%Y%m%d%H%M%S%z") .astimezone(timezone.utc) .strftime("%Y-%m-%dT%H:%M:%SZ") ) - except strelka.ScannerTimeout: - raise except Exception: - return + return None def scan(self, data, file, options, expire_at): + """ + Performs the scanning process on the provided data. + + The function opens the PDF using PyMuPDF and extracts metadata, embedded files, + images, and text. Phone numbers and URLs are also extracted using regular expressions. + + Args: + data: Data of the file to be scanned. + file: The File object associated with the data. + options: Dictionary of scanner-specific options. + expire_at: Expiration time of the scan. + """ # Set maximum XREF objects to be collected (default: 250) max_objects = options.get("max_objects", 250) @@ -67,7 +70,7 @@ def scan(self, data, file, options, expire_at): self.event["lines"] = 0 self.event["links"] = [] self.event["words"] = 0 - self.event.setdefault("xref_object", set()) + self.event.setdefault("xref_object", list()) keys = list() try: @@ -108,7 +111,7 @@ def scan(self, data, file, options, expire_at): [ re.sub("[^0-9]", "", x) for x in re.findall( - phone_numbers, + PHONE_NUMBERS_REGEX, reader.get_page_text(i).replace("\t", " "), ) ] @@ -116,11 +119,11 @@ def scan(self, data, file, options, expire_at): self.event["phones"] = list(set(phones)) # iterate through xref objects. Collect, count, and extract objects - self.event["xref_object"] = set() + self.event["xref_object"] = list() for xref in range(1, reader.xref_length()): xref_object = reader.xref_object(xref, compressed=True) if xref_object not in self.event["xref_object"]: - self.event["xref_object"].add(xref_object) + self.event["xref_object"].append(xref_object) for obj in options.get("objects", []): pattern = f"/{obj}" if pattern in xref_object: @@ -130,7 +133,9 @@ def scan(self, data, file, options, expire_at): self.event["objects"] = dict(Counter(keys)) # Convert unique xref_object set back to list - self.event["xref_object"] = list(self.event["xref_object"])[:max_objects] + self.event["xref_object"] = list( + set(self.event["xref_object"][:max_objects]) + ) # Submit embedded files to strelka try: @@ -142,8 +147,8 @@ def scan(self, data, file, options, expire_at): except strelka.ScannerTimeout: raise - except Exception: - self.flags.append("embedded_parsing_failure") + except Exception as e: + self.flags.append(f"pdf_embedded_processing_error: {str(e)[:50]}") # Submit extracted images to strelka try: @@ -157,8 +162,8 @@ def scan(self, data, file, options, expire_at): except strelka.ScannerTimeout: raise - except Exception: - self.flags.append("image_parsing_failure") + except Exception as e: + self.flags.append(f"pdf_image_processing_error: {str(e)[:50]}") # Parse data from each page try: @@ -179,9 +184,9 @@ def scan(self, data, file, options, expire_at): except strelka.ScannerTimeout: raise - except Exception: - self.flags.append("page_parsing_failure") + except Exception as e: + self.flags.append(f"pdf_page_processing_error: {str(e)[:50]}") except strelka.ScannerTimeout: raise - except Exception: - self.flags.append("pdf_load_error") + except Exception as e: + self.flags.append(f"pdf_load_error: {str(e)[:50]}")