Skip to content

Commit

Permalink
Add Kafka output for SMTP TLS reports
Browse files Browse the repository at this point in the history
  • Loading branch information
seanthegeek committed Jan 3, 2024
1 parent 6a8b2ff commit a9d1675
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
11 changes: 9 additions & 2 deletions parsedmarc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ def process_reports(reports_):
logger.error(error_.__str__())
try:
if opts.kafka_hosts:
pass
# TODO: save SMTP TLS reports to Kafka
kafka_client.save_smtp_tls_reports_to_kafka(
smtp_tls_reports, kafka_smtp_tls_topic)
except Exception as error_:
logger.error("Kafka Error: {0}".format(
error_.__str__()))
Expand Down Expand Up @@ -370,6 +370,7 @@ def process_reports(reports_):
kafka_password=None,
kafka_aggregate_topic=None,
kafka_forensic_topic=None,
kafka_smtp_tls_topic=None,
kafka_ssl=False,
kafka_skip_certificate_verification=False,
smtp_host=None,
Expand Down Expand Up @@ -722,6 +723,11 @@ def process_reports(reports_):
exit(-1)
if "forensic_topic" in kafka_config:
opts.kafka_username = kafka_config["forensic_topic"]
else:
logger.critical("forensic_topic setting missing from the "
"kafka config section")
if "smtp_tls_topic" in kafka_config:
opts.kafka_username = kafka_config["smtp_tls_topic"]
else:
logger.critical("forensic_topic setting missing from the "
"splunk_hec config section")
Expand Down Expand Up @@ -953,6 +959,7 @@ def process_reports(reports_):

kafka_aggregate_topic = opts.kafka_aggregate_topic
kafka_forensic_topic = opts.kafka_forensic_topic
kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic

file_paths = []
mbox_paths = []
Expand Down
33 changes: 33 additions & 0 deletions parsedmarc/kafkaclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,36 @@ def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic):
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))

def save_smtp_tls_reports_to_kafka(self, smtp_tls_reports, smtp_tls_topic):
"""
Saves SMTP TLS reports to Kafka, sends individual
records (slices) since Kafka requires messages to be <= 1MB
by default.
Args:
smtp_tls_reports (list): A list of forensic report dicts
to save to Kafka
smtp_tls_topic (str): The name of the Kafka topic
"""
if isinstance(smtp_tls_reports, dict):
smtp_tls_reports = [smtp_tls_reports]

if len(smtp_tls_reports) < 1:
return

try:
logger.debug("Saving forensic reports to Kafka")
self.producer.send(smtp_tls_topic, smtp_tls_reports)
except UnknownTopicOrPartitionError:
raise KafkaError(
"Kafka error: Unknown topic or partition on broker")
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))
try:
self.producer.flush()
except Exception as e:
raise KafkaError(
"Kafka error: {0}".format(e.__str__()))

0 comments on commit a9d1675

Please sign in to comment.