Skip to content

Commit

Permalink
Merge pull request #420 from target/scanpdf_hotfix
Browse files Browse the repository at this point in the history
Error Handling + Readibility Update for ScanPdf Scanner
  • Loading branch information
phutelmyer authored Jan 4, 2024
2 parents bec16b7 + 79e3957 commit e2cd87a
Showing 1 changed file with 44 additions and 39 deletions.
83 changes: 44 additions & 39 deletions src/python/strelka/scanners/scan_pdf.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
"""
This module contains a scanner for extracting metadata and files from PDF files.
Resources:
- https://pymupdf.readthedocs.io/en/latest/index.html
- https://www.osti.gov/servlets/purl/1030303
Requirements:
- PyMuPDF
"""


import io
import re
from collections import Counter
Expand All @@ -19,46 +7,61 @@

from strelka import strelka

# Hide PyMuPDF warnings
# Suppress PyMuPDF warnings
fitz.TOOLS.mupdf_display_errors(False)

# Regex to extract phone numbers from PDF file
phone_numbers = re.compile(
r"[+]?(?:\d{1,2})?\s?\(?\d{3}\)?[\s.-]\d{3}[\s.-]\d{2,4}?-?\d{2,4}?",
flags=0,
# Regular expression for extracting phone numbers from PDFs
PHONE_NUMBERS_REGEX = re.compile(
r"[+]?(?:\d{1,2})?\s?\(?\d{3}\)?[\s.-][\n]?\d{3}[\s.-][\n]?\d{2,4}?-?\d{2,4}?",
flags=re.IGNORECASE,
)


class ScanPdf(strelka.Scanner):
"""
A scanner that collects metadata and extracts files from PDF files.
Extracts metadata, embedded files, images, and text from PDF files.
This scanner utilizes PyMuPDF to parse PDF files, extracting various types of data,
including metadata, embedded files, images, and textual content. Phone numbers and
URLs within the document are also extracted and reported.
"""

@staticmethod
def _convert_timestamp(timestamp):
"""
Converts a date string to a DateTime object, sets the timezone to UTC, and returns it as an ISO string.
Converts a PDF timestamp string to an ISO 8601 formatted string.
PDF timestamps are typically in the 'D:%Y%m%d%H%M%S%z' format. This function
converts them to a more standard ISO 8601 format.
Args:
timestamp (str): A date string in the format 'D:%Y%m%d%H%M%S%z'.
timestamp: A string representing the timestamp in PDF format.
Returns:
str: An ISO-formatted date string in the format '%Y-%m-%dT%H:%M:%SZ'.
An ISO 8601 formatted timestamp string, or None if conversion fails.
"""

try:
# Date string is converted to DateTime, timezone is set to UTC, and returned as ISO string
return (
datetime.strptime(timestamp.replace("'", ""), "D:%Y%m%d%H%M%S%z")
.astimezone(timezone.utc)
.strftime("%Y-%m-%dT%H:%M:%SZ")
)
except strelka.ScannerTimeout:
raise
except Exception:
return
return None

def scan(self, data, file, options, expire_at):
"""
Performs the scanning process on the provided data.
The function opens the PDF using PyMuPDF and extracts metadata, embedded files,
images, and text. Phone numbers and URLs are also extracted using regular expressions.
Args:
data: Data of the file to be scanned.
file: The File object associated with the data.
options: Dictionary of scanner-specific options.
expire_at: Expiration time of the scan.
"""
# Set maximum XREF objects to be collected (default: 250)
max_objects = options.get("max_objects", 250)

Expand All @@ -67,7 +70,7 @@ def scan(self, data, file, options, expire_at):
self.event["lines"] = 0
self.event["links"] = []
self.event["words"] = 0
self.event.setdefault("xref_object", set())
self.event.setdefault("xref_object", list())
keys = list()

try:
Expand Down Expand Up @@ -108,19 +111,19 @@ def scan(self, data, file, options, expire_at):
[
re.sub("[^0-9]", "", x)
for x in re.findall(
phone_numbers,
PHONE_NUMBERS_REGEX,
reader.get_page_text(i).replace("\t", " "),
)
]
)
self.event["phones"] = list(set(phones))

# iterate through xref objects. Collect, count, and extract objects
self.event["xref_object"] = set()
self.event["xref_object"] = list()
for xref in range(1, reader.xref_length()):
xref_object = reader.xref_object(xref, compressed=True)
if xref_object not in self.event["xref_object"]:
self.event["xref_object"].add(xref_object)
self.event["xref_object"].append(xref_object)
for obj in options.get("objects", []):
pattern = f"/{obj}"
if pattern in xref_object:
Expand All @@ -130,7 +133,9 @@ def scan(self, data, file, options, expire_at):
self.event["objects"] = dict(Counter(keys))

# Convert unique xref_object set back to list
self.event["xref_object"] = list(self.event["xref_object"])[:max_objects]
self.event["xref_object"] = list(
set(self.event["xref_object"][:max_objects])
)

# Submit embedded files to strelka
try:
Expand All @@ -142,8 +147,8 @@ def scan(self, data, file, options, expire_at):

except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("embedded_parsing_failure")
except Exception as e:
self.flags.append(f"pdf_embedded_processing_error: {str(e)[:50]}")

# Submit extracted images to strelka
try:
Expand All @@ -157,8 +162,8 @@ def scan(self, data, file, options, expire_at):

except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("image_parsing_failure")
except Exception as e:
self.flags.append(f"pdf_image_processing_error: {str(e)[:50]}")

# Parse data from each page
try:
Expand All @@ -179,9 +184,9 @@ def scan(self, data, file, options, expire_at):

except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("page_parsing_failure")
except Exception as e:
self.flags.append(f"pdf_page_processing_error: {str(e)[:50]}")
except strelka.ScannerTimeout:
raise
except Exception:
self.flags.append("pdf_load_error")
except Exception as e:
self.flags.append(f"pdf_load_error: {str(e)[:50]}")

0 comments on commit e2cd87a

Please sign in to comment.