From a9d16755ae8438b3d7fe661e2f4c72c6b9a26b51 Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Tue, 2 Jan 2024 21:26:48 -0500 Subject: [PATCH] Add Kafka output for SMTP TLS reports --- parsedmarc/cli.py | 11 +++++++++-- parsedmarc/kafkaclient.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 4923fbea..76edb96f 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -196,8 +196,8 @@ def process_reports(reports_): logger.error(error_.__str__()) try: if opts.kafka_hosts: - pass - # TODO: save SMTP TLS reports to Kafka + kafka_client.save_smtp_tls_reports_to_kafka( + smtp_tls_reports, kafka_smtp_tls_topic) except Exception as error_: logger.error("Kafka Error: {0}".format( error_.__str__())) @@ -370,6 +370,7 @@ def process_reports(reports_): kafka_password=None, kafka_aggregate_topic=None, kafka_forensic_topic=None, + kafka_smtp_tls_topic=None, kafka_ssl=False, kafka_skip_certificate_verification=False, smtp_host=None, @@ -722,6 +723,11 @@ def process_reports(reports_): exit(-1) if "forensic_topic" in kafka_config: opts.kafka_username = kafka_config["forensic_topic"] + else: + logger.critical("forensic_topic setting missing from the " + "kafka config section") + if "smtp_tls_topic" in kafka_config: + opts.kafka_username = kafka_config["smtp_tls_topic"] else: logger.critical("forensic_topic setting missing from the " "splunk_hec config section") @@ -953,6 +959,7 @@ def process_reports(reports_): kafka_aggregate_topic = opts.kafka_aggregate_topic kafka_forensic_topic = opts.kafka_forensic_topic + kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic file_paths = [] mbox_paths = [] diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 02bf833a..68eef1f4 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -161,3 +161,36 @@ def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): except Exception as e: raise KafkaError( "Kafka error: {0}".format(e.__str__())) + + def save_smtp_tls_reports_to_kafka(self, smtp_tls_reports, smtp_tls_topic): + """ + Saves SMTP TLS reports to Kafka, sends individual + records (slices) since Kafka requires messages to be <= 1MB + by default. + + Args: + smtp_tls_reports (list): A list of forensic report dicts + to save to Kafka + smtp_tls_topic (str): The name of the Kafka topic + + """ + if isinstance(smtp_tls_reports, dict): + smtp_tls_reports = [smtp_tls_reports] + + if len(smtp_tls_reports) < 1: + return + + try: + logger.debug("Saving forensic reports to Kafka") + self.producer.send(smtp_tls_topic, smtp_tls_reports) + except UnknownTopicOrPartitionError: + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__()))