From 21f2de792cdc50700fc73bc834de187979c3a44f Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Thu, 21 Feb 2019 19:19:50 +0100 Subject: [PATCH 01/14] Bugfix mail already emitted --- project.clj | 2 +- src/options.py | 2 +- src/spouts/files_mails.py | 26 +++++++++++++++----------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/project.clj b/project.clj index c4df0b0..b43dbdb 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject spamscope "2.7.0-SNAPSHOT" +(defproject spamscope "2.7.1-SNAPSHOT" :resource-paths ["_resources"] :target-path "_build" :min-lein-version "2.0.0" diff --git a/src/options.py b/src/options.py index 31ef55e..6c00a9f 100644 --- a/src/options.py +++ b/src/options.py @@ -19,7 +19,7 @@ from os.path import join -__version__ = "2.7.0" +__version__ = "2.7.1" __configuration_path__ = "/etc/spamscope" __defaults__ = { diff --git a/src/spouts/files_mails.py b/src/spouts/files_mails.py index f9a28ae..923f419 100644 --- a/src/spouts/files_mails.py +++ b/src/spouts/files_mails.py @@ -128,17 +128,21 @@ def next_tuple(self): self.log("EMITTED - {!r}".format(mail_string)) processing = mail.filename + ".processing" - shutil.move(mail.filename, processing) - - self.emit([ - processing, # 0 - mail.mail_server, # 1 - mail.mailbox, # 2 - mail.priority, # 3 - mail.trust, # 4 - mail.mail_type, # 5 - mail.headers], # 6 - tup_id=mail.filename) + + try: + shutil.move(mail.filename, processing) + except IOError: + self.log("ALREADY EMITTED - {!r}".format(mail_string)) + else: + self.emit([ + processing, # 0 + mail.mail_server, # 1 + mail.mailbox, # 2 + mail.priority, # 3 + mail.trust, # 4 + mail.mail_type, # 5 + mail.headers], # 6 + tup_id=mail.filename) def ack(self, tup_id): """Acknowledge tup_id, that is the path_mail. """ From d3b0e2429894434fd813c755304d56d589b07015 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Thu, 7 Mar 2019 01:09:31 +0100 Subject: [PATCH 02/14] First commit iter_files_mails spout --- src/spouts/iter_files_mails.py | 167 +++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 src/spouts/iter_files_mails.py diff --git a/src/spouts/iter_files_mails.py b/src/spouts/iter_files_mails.py new file mode 100644 index 0000000..eeae640 --- /dev/null +++ b/src/spouts/iter_files_mails.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Copyright 2019 Fedele Mantuano (https://www.linkedin.com/in/fmantuano/) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from __future__ import absolute_import, print_function, unicode_literals +from datetime import date + +import glob +import os +import shutil + +import six + +from modules import ( + AbstractSpout, + is_file_older_than, + MAIL_PATH_OUTLOOK, + MAIL_PATH, + MailItem, +) + + +class IterFilesMailSpout(AbstractSpout): + outputs = [ + 'raw_mail', + 'mail_server', + 'mailbox', + 'priority', + 'trust', + 'mail_type', + 'headers' + ] + + def initialize(self, stormconf, context): + super(IterFilesMailSpout, self).initialize(stormconf, context) + self._check_conf() + self.mails = self.iter_mails() + + def _check_conf(self): + self._fail_seconds = int(self.conf.get("fail.after.seconds", 60)) + self._what = self.conf["post_processing"].get("what", "remove").lower() + self._where = self.conf["post_processing"].get("where", "/tmp/moved") + if not os.path.exists(self._where): + os.makedirs(self._where) + self._where_failed = self.conf["post_processing"].get( + "where.failed", "/tmp/failed") + if not os.path.exists(self._where_failed): + os.makedirs(self._where_failed) + + def iter_mails(self): + for k, v in self.conf["mailboxes"].items(): + path = v["path_mails"] + pattern = v["files_pattern"] + mail_type = MAIL_PATH + if v.get("outlook", False): + mail_type = MAIL_PATH_OUTLOOK + + for mail in glob.iglob(os.path.join(path, pattern)): + yield MailItem( + filename=mail, + mail_server=v["mail_server"], + mailbox=k, + priority=v["priority"], + trust=v["trust_string"], + mail_type=mail_type, + headers=v.get("headers", [])) + + def next_tuple(self): + try: + # get the next mail + mail = next(self.mails) + + # check if processing + if mail.filename.endswith(".processing"): + self._fail_old_mails(mail.filename) + return + + mail_string = mail.filename.split("/")[-1] + self.log("EMITTED - {!r}".format(mail_string)) + + processing = mail.filename + ".processing" + + try: + shutil.move(mail.filename, processing) + except IOError: + self.log("ALREADY EMITTED - {!r}".format(mail_string)) + else: + self.emit([ + processing, # 0 + mail.mail_server, # 1 + mail.mailbox, # 2 + mail.priority, # 3 + mail.trust, # 4 + mail.mail_type, # 5 + mail.headers], # 6 + tup_id=mail.filename) + + except StopIteration: + # Reload general spout conf + self._conf_loader() + + # Load new mails + self.mails = self.iter_mails() + + def ack(self, tup_id): + """Acknowledge tup_id, that is the path_mail. """ + + mail_string = tup_id.split("/")[-1] + self.log("ACKED - {!r}".format(mail_string)) + + processing = tup_id + ".processing" + + if self._what == "remove": + try: + os.remove(processing) + except Exception: + self.log("Failed to remove {!r} mail".format(processing)) + else: + try: + now = six.text_type(date.today()) + mail_path = os.path.join(self._where, now) + if not os.path.exists(mail_path): + os.makedirs(mail_path) + # this chmod is useful to work under + # nginx directory listing + os.chmod(processing, 0o775) + mail = os.path.join(mail_path, mail_string) + shutil.move(processing, mail) + except shutil.Error: + os.remove(processing) + + def fail(self, tup_id): + self._move_fail(tup_id) + + def _move_fail(self, src): + mail_string = src.split("/")[-1] + mail = os.path.join(self._where_failed, mail_string) + processing = src + ".processing" + + try: + os.chmod(processing, 0o775) + shutil.move(processing, mail) + finally: + self.log("FAILED - {!r}".format(mail_string)) + + def _fail_old_mails(self, process_mail): + mail = process_mail.replace(".processing", "") + mail_string = mail.split("/")[-1] + if is_file_older_than(process_mail, self._fail_seconds): + self.log("Mail {!r} older than {} seconds".format( + mail_string, self._fail_seconds)) + self._move_fail(mail) From 4c2f236330db6156f7b3b250f2455c7bac554dbf Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sun, 10 Mar 2019 00:35:49 +0100 Subject: [PATCH 03/14] Added new working iter spout, tested on dev --- Makefile | 9 ++++ conf/spamscope.example.yml | 44 +++++++++++++++++ src/spouts/README.md | 13 +++++ src/spouts/__init__.py | 1 + src/spouts/iter_files_mails.py | 8 +++- topologies/spamscope_debug_iter.py | 77 ++++++++++++++++++++++++++++++ 6 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 src/spouts/README.md create mode 100644 topologies/spamscope_debug_iter.py diff --git a/Makefile b/Makefile index 71bbea6..3744856 100644 --- a/Makefile +++ b/Makefile @@ -80,3 +80,12 @@ dist: clean ## builds source and wheel package install: clean ## install the package to the active Python's site-packages python setup.py install + +debug-iter-topology: + mkdir /tmp/logs/ 2>/dev/null || echo /tmp/logs/ already exist + sparse run \ + -n spamscope_debug_iter \ + -e debug \ + -o topology.max.spout.pending=1 \ + -o "topology.sleep.spout.wait.strategy.time.ms=10" \ + -o "topology.tick.tuple.freq.secs=10" \ No newline at end of file diff --git a/conf/spamscope.example.yml b/conf/spamscope.example.yml index 4c5ba0a..5fde40a 100644 --- a/conf/spamscope.example.yml +++ b/conf/spamscope.example.yml @@ -49,6 +49,50 @@ files-mails: path_mails: /path/mails2 +# Spout file on file system +# Use an iterator. Safe for RAM +iter-files-mails: + + # The mails in processing older that fail.after.seconds will be failed + fail.after.seconds: 60 + + # Post processing + post_processing: + + # move or remove mails analyzed, default remove + what: remove + + # Where you want move the analyzed mails, default /tmp/moved + where: /tmp/moved + + # Where you want move the failed mails, default /tmp/failed + where.failed: /tmp/failed + + # Mailboxes + mailboxes: + test: + mail_server: hostname + # Trust string is used to get sender IP address from mail server. + # More details: + # https://github.com/SpamScope/mail-parser/blob/v0.4.6/mailparser/__init__.py#L221 + trust_string: "test_trust_string" + files_pattern: "*untroubled*" + path_mails: /path/mails1 + + # This flag enables Outlook msg parsing for every mails in mailbox + # Default value is false + outlook: false + + # List of others headers to get + headers: + - custom_one + - custom_two + test1: + mail_server: hostname + trust_string: "test1_trust_string" + files_pattern: "*" + path_mails: /path/mails2 + # Bolts configurations # Phishing bolt configuration phishing: diff --git a/src/spouts/README.md b/src/spouts/README.md new file mode 100644 index 0000000..7c744cf --- /dev/null +++ b/src/spouts/README.md @@ -0,0 +1,13 @@ +# Overview +In this folder there are all SpamScope `spouts`. + +# How add a new spout +These are the steps to add a new `spout` to Spamscope: + + - add a new module in [spouts](./) folder. This module should implement a new class that has `AbstractSpout` as base. + + - import the new class in [__init__.py](./__init__.py) + + - add the new section in [main configuration file](../../conf/spamscope.example.yml). The name of this section will be used in topology file + + - add the new spout in [topology](../../topologies) \ No newline at end of file diff --git a/src/spouts/__init__.py b/src/spouts/__init__.py index 44223b3..0beb224 100644 --- a/src/spouts/__init__.py +++ b/src/spouts/__init__.py @@ -15,3 +15,4 @@ """ from .files_mails import FilesMailSpout +from .iter_files_mails import IterFilesMailSpout diff --git a/src/spouts/iter_files_mails.py b/src/spouts/iter_files_mails.py index eeae640..0543087 100644 --- a/src/spouts/iter_files_mails.py +++ b/src/spouts/iter_files_mails.py @@ -70,12 +70,16 @@ def iter_mails(self): if v.get("outlook", False): mail_type = MAIL_PATH_OUTLOOK - for mail in glob.iglob(os.path.join(path, pattern)): + mails = sorted( + glob.iglob(os.path.join(path, pattern)), + key=os.path.getmtime) + + for mail in mails: yield MailItem( filename=mail, mail_server=v["mail_server"], mailbox=k, - priority=v["priority"], + priority=None, trust=v["trust_string"], mail_type=mail_type, headers=v.get("headers", [])) diff --git a/topologies/spamscope_debug_iter.py b/topologies/spamscope_debug_iter.py new file mode 100644 index 0000000..4b936c1 --- /dev/null +++ b/topologies/spamscope_debug_iter.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Copyright 2017 Fedele Mantuano (https://twitter.com/fedelemantuano) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +from spouts import IterFilesMailSpout +from bolts import (Attachments, JsonMaker, Phishing, Tokenizer, + Urls, Network, RawMail, OutputDebug) +from streamparse import Grouping, Topology + + +class OutputDebugTopology(Topology): + + files_spout = IterFilesMailSpout.spec( + name="iter-files-mails") + + tokenizer = Tokenizer.spec( + name="tokenizer", + inputs=[files_spout], + par=1) + + attachments = Attachments.spec( + name="attachments", + inputs={tokenizer['attachments']: Grouping.fields('sha256_random')}, + par=1) + + urls = Urls.spec( + name="urls", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['body']: Grouping.fields('sha256_random')}) + + phishing = Phishing.spec( + name="phishing", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + network = Network.spec( + name="network", + inputs={tokenizer['network']: Grouping.fields('sha256_random')}, + par=1) + + raw_mail = RawMail.spec( + name="raw_mail", + inputs={tokenizer['raw_mail']: Grouping.fields('sha256_random')}, + par=1) + + json_maker = JsonMaker.spec( + name="json_maker", + inputs={ + attachments: Grouping.fields('sha256_random'), + network: Grouping.fields('sha256_random'), + phishing: Grouping.fields('sha256_random'), + raw_mail: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + output_debug = OutputDebug.spec( + name="output-debug", + inputs=[json_maker]) From 3907b2e0b4201698bbf45004749319e394dae760 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sun, 10 Mar 2019 00:38:18 +0100 Subject: [PATCH 04/14] Fixed README --- src/spouts/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spouts/README.md b/src/spouts/README.md index 7c744cf..550c2f9 100644 --- a/src/spouts/README.md +++ b/src/spouts/README.md @@ -6,7 +6,7 @@ These are the steps to add a new `spout` to Spamscope: - add a new module in [spouts](./) folder. This module should implement a new class that has `AbstractSpout` as base. - - import the new class in [__init__.py](./__init__.py) + - import the new class in [\_\_init\_\_.py](./__init__.py) - add the new section in [main configuration file](../../conf/spamscope.example.yml). The name of this section will be used in topology file From eac4f983ccd020b5745a4a3a8405507c3daaa068 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sun, 10 Mar 2019 00:48:32 +0100 Subject: [PATCH 05/14] Added new topology --- src/cli/spamscope_topology.py | 9 +++- topologies/README.md | 8 +++ topologies/spamscope_redis_iter.py | 78 ++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 topologies/spamscope_redis_iter.py diff --git a/src/cli/spamscope_topology.py b/src/cli/spamscope_topology.py index f4b6811..dd4ff7b 100644 --- a/src/cli/spamscope_topology.py +++ b/src/cli/spamscope_topology.py @@ -57,8 +57,13 @@ def get_args(): submit.add_argument( "-g", "--topology", - choices=["spamscope_debug", "spamscope_elasticsearch", - "spamscope_redis"], + choices=[ + "spamscope_debug_iter", + "spamscope_debug", + "spamscope_elasticsearch", + "spamscope_redis_iter" + "spamscope_redis", + ], default="debug", help="SpamScope topology.", dest="topology") diff --git a/topologies/README.md b/topologies/README.md index cc47bc7..39e70d0 100644 --- a/topologies/README.md +++ b/topologies/README.md @@ -7,6 +7,9 @@ In this topology the results are stored on file system. ![Schema spamscope_debug](../docs/images/schema_spamscope_debug.png?raw=true "Schema spamscope_debug") +# spamscope_debug_iter +In this topology the results are stored on file system. It's same to `spamscope_debug` but it uses `iter-files-mails` spout. + # spamscope_elasticsearch In this topology the results are stored in Elasticsearch. @@ -15,4 +18,9 @@ In this topology the results are stored in Elasticsearch. # spamscope_redis In this topology the results are stored in Redis. +![Schema spamscope_redis](../docs/images/schema_spamscope_redis.png?raw=true "Schema spamscope_redis") + +# spamscope_redis_iter +In this topology the results are stored in Redis. It's same to `spamscope_redis` but it uses `iter-files-mails` spout. + ![Schema spamscope_redis](../docs/images/schema_spamscope_redis.png?raw=true "Schema spamscope_redis") \ No newline at end of file diff --git a/topologies/spamscope_redis_iter.py b/topologies/spamscope_redis_iter.py new file mode 100644 index 0000000..856a41d --- /dev/null +++ b/topologies/spamscope_redis_iter.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Copyright 2017 Fedele Mantuano (https://twitter.com/fedelemantuano) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +from spouts import IterFilesMailSpout +from bolts import (Attachments, JsonMaker, Phishing, Tokenizer, + Urls, Network, RawMail) +from bolts.output_redis import OutputRedis +from streamparse import Grouping, Topology + + +class OutputRedisTopology(Topology): + + files_spout = IterFilesMailSpout.spec( + name="iter-files-mails") + + tokenizer = Tokenizer.spec( + name="tokenizer", + inputs=[files_spout], + par=1) + + attachments = Attachments.spec( + name="attachments", + inputs={tokenizer['attachments']: Grouping.fields('sha256_random')}, + par=1) + + urls = Urls.spec( + name="urls", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['body']: Grouping.fields('sha256_random')}) + + phishing = Phishing.spec( + name="phishing", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + network = Network.spec( + name="network", + inputs={tokenizer['network']: Grouping.fields('sha256_random')}, + par=1) + + raw_mail = RawMail.spec( + name="raw_mail", + inputs={tokenizer['raw_mail']: Grouping.fields('sha256_random')}, + par=1) + + json_maker = JsonMaker.spec( + name="json_maker", + inputs={ + attachments: Grouping.fields('sha256_random'), + network: Grouping.fields('sha256_random'), + phishing: Grouping.fields('sha256_random'), + raw_mail: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + output_redis = OutputRedis.spec( + name="output-redis", + inputs=[json_maker]) From ea25d7405d13a6d0700bce9074f6bfeaa9ca4947 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sun, 10 Mar 2019 14:22:43 +0100 Subject: [PATCH 06/14] Fixed typo --- src/cli/spamscope_topology.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli/spamscope_topology.py b/src/cli/spamscope_topology.py index dd4ff7b..bb3ba78 100644 --- a/src/cli/spamscope_topology.py +++ b/src/cli/spamscope_topology.py @@ -61,7 +61,7 @@ def get_args(): "spamscope_debug_iter", "spamscope_debug", "spamscope_elasticsearch", - "spamscope_redis_iter" + "spamscope_redis_iter", "spamscope_redis", ], default="debug", From 62a160ae5f1fd4a4c500f506c3d960bd8238297f Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sun, 10 Mar 2019 17:10:28 +0100 Subject: [PATCH 07/14] Refactoring --- src/spouts/iter_files_mails.py | 40 ++++++++++++++++------------------ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/spouts/iter_files_mails.py b/src/spouts/iter_files_mails.py index 0543087..70b0788 100644 --- a/src/spouts/iter_files_mails.py +++ b/src/spouts/iter_files_mails.py @@ -92,28 +92,26 @@ def next_tuple(self): # check if processing if mail.filename.endswith(".processing"): self._fail_old_mails(mail.filename) - return - - mail_string = mail.filename.split("/")[-1] - self.log("EMITTED - {!r}".format(mail_string)) - - processing = mail.filename + ".processing" - - try: - shutil.move(mail.filename, processing) - except IOError: - self.log("ALREADY EMITTED - {!r}".format(mail_string)) else: - self.emit([ - processing, # 0 - mail.mail_server, # 1 - mail.mailbox, # 2 - mail.priority, # 3 - mail.trust, # 4 - mail.mail_type, # 5 - mail.headers], # 6 - tup_id=mail.filename) - + mail_string = mail.filename.split("/")[-1] + self.log("EMITTED - {!r}".format(mail_string)) + + processing = mail.filename + ".processing" + + try: + shutil.move(mail.filename, processing) + except IOError: + self.log("ALREADY EMITTED - {!r}".format(mail_string)) + else: + self.emit([ + processing, # 0 + mail.mail_server, # 1 + mail.mailbox, # 2 + mail.priority, # 3 + mail.trust, # 4 + mail.mail_type, # 5 + mail.headers], # 6 + tup_id=mail.filename) except StopIteration: # Reload general spout conf self._conf_loader() From 40a7746c22ca38868f0bcb9799f6fb5e7b3c92eb Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Mon, 11 Mar 2019 21:57:03 +0100 Subject: [PATCH 08/14] Fixed issue OSerror in iter_files_mails --- src/spouts/iter_files_mails.py | 62 +++++++++++++++------------------- 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/src/spouts/iter_files_mails.py b/src/spouts/iter_files_mails.py index 70b0788..18463f0 100644 --- a/src/spouts/iter_files_mails.py +++ b/src/spouts/iter_files_mails.py @@ -70,48 +70,42 @@ def iter_mails(self): if v.get("outlook", False): mail_type = MAIL_PATH_OUTLOOK - mails = sorted( - glob.iglob(os.path.join(path, pattern)), - key=os.path.getmtime) - - for mail in mails: - yield MailItem( - filename=mail, - mail_server=v["mail_server"], - mailbox=k, - priority=None, - trust=v["trust_string"], - mail_type=mail_type, - headers=v.get("headers", [])) + for mail in glob.iglob(os.path.join(path, pattern)): + if mail.endswith(".processing"): + self._fail_old_mails(mail) + else: + yield MailItem( + filename=mail, + mail_server=v["mail_server"], + mailbox=k, + priority=None, + trust=v["trust_string"], + mail_type=mail_type, + headers=v.get("headers", [])) def next_tuple(self): try: # get the next mail mail = next(self.mails) + mail_string = mail.filename.split("/")[-1] + self.log("EMITTED - {!r}".format(mail_string)) + processing = mail.filename + ".processing" - # check if processing - if mail.filename.endswith(".processing"): - self._fail_old_mails(mail.filename) + try: + shutil.move(mail.filename, processing) + except IOError: + self.log("ALREADY EMITTED - {!r}".format(mail_string)) else: - mail_string = mail.filename.split("/")[-1] - self.log("EMITTED - {!r}".format(mail_string)) + self.emit([ + processing, # 0 + mail.mail_server, # 1 + mail.mailbox, # 2 + mail.priority, # 3 + mail.trust, # 4 + mail.mail_type, # 5 + mail.headers], # 6 + tup_id=mail.filename) - processing = mail.filename + ".processing" - - try: - shutil.move(mail.filename, processing) - except IOError: - self.log("ALREADY EMITTED - {!r}".format(mail_string)) - else: - self.emit([ - processing, # 0 - mail.mail_server, # 1 - mail.mailbox, # 2 - mail.priority, # 3 - mail.trust, # 4 - mail.mail_type, # 5 - mail.headers], # 6 - tup_id=mail.filename) except StopIteration: # Reload general spout conf self._conf_loader() From 8dd649421c6897b30692fa723bf86388b9b6eb5c Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Tue, 12 Mar 2019 00:55:31 +0100 Subject: [PATCH 09/14] Fixed OSerror --- src/spouts/iter_files_mails.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/spouts/iter_files_mails.py b/src/spouts/iter_files_mails.py index 18463f0..ef622f7 100644 --- a/src/spouts/iter_files_mails.py +++ b/src/spouts/iter_files_mails.py @@ -72,7 +72,11 @@ def iter_mails(self): for mail in glob.iglob(os.path.join(path, pattern)): if mail.endswith(".processing"): - self._fail_old_mails(mail) + try: + self._fail_old_mails(mail) + except OSError: + # mail already deleted + pass else: yield MailItem( filename=mail, From c552f5f9ca68acd9c5ff5ec122537856ed5f39a1 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Wed, 13 Mar 2019 00:00:43 +0100 Subject: [PATCH 10/14] Added _iter topologies --- README.md | 5 +- README.rst | 12 +++- src/cli/spamscope_topology.py | 1 + topologies/README.md | 5 ++ topologies/spamscope_elasticsearch_iter.py | 78 ++++++++++++++++++++++ 5 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 topologies/spamscope_elasticsearch_iter.py diff --git a/README.md b/README.md index b059909..ce550af 100644 --- a/README.md +++ b/README.md @@ -177,10 +177,13 @@ You can use: * [Ansible](./ansible/README.md): to install and run SpamScope on server # Topologies -SpamScope comes with three topologies: +SpamScope comes with six topologies: - [spamscope_debug](./topologies/spamscope_debug.py): the output are JSON files on file system. - [spamscope_elasticsearch](./topologies/spamscope_elasticsearch.py): the output are stored in Elasticsearch indexes. - [spamscope_redis](./topologies/spamscope_redis.py): the output are stored in Redis. + - [spamscope_debug_iter](./topologies/spamscope_debug_iter.py): It uses generator to send mails in topology. The output are JSON files on file system. + - [spamscope_elasticsearch_iter](./topologies/spamscope_elasticsearch_iter.py): It uses generator to send mails in topology. The output are stored in Elasticsearch indexes. + - [spamscope_redis_iter](./topologies/spamscope_redis_iter.py): It uses generator to send mails in topology. The output are stored in Redis. If you want submit SpamScope topology use `spamscope-topology submit` tool. For more details [see SpamScope cli tools](src/cli/README.md): diff --git a/README.rst b/README.rst index 119db3b..5702bd9 100644 --- a/README.rst +++ b/README.rst @@ -291,13 +291,21 @@ run SpamScope on server Topologies ========== -SpamScope comes with three topologies: - +SpamScope comes with six topologies: - `spamscope_debug <./topologies/spamscope_debug.py>`__: the output are JSON files on file system. - `spamscope_elasticsearch <./topologies/spamscope_elasticsearch.py>`__: the output are stored in Elasticsearch indexes. - `spamscope_redis <./topologies/spamscope_redis.py>`__: the output are -stored in Redis. +stored in Redis. - +`spamscope_debug_iter <./topologies/spamscope_debug_iter.py>`__: It uses +generator to send mails in topology. The output are JSON files on file +system. - +`spamscope_elasticsearch_iter <./topologies/spamscope_elasticsearch_iter.py>`__: +It uses generator to send mails in topology. The output are stored in +Elasticsearch indexes. - +`spamscope_redis_iter <./topologies/spamscope_redis_iter.py>`__: It uses +generator to send mails in topology. The output are stored in Redis. If you want submit SpamScope topology use ``spamscope-topology submit`` tool. For more details `see SpamScope cli tools `__: diff --git a/src/cli/spamscope_topology.py b/src/cli/spamscope_topology.py index bb3ba78..58e376b 100644 --- a/src/cli/spamscope_topology.py +++ b/src/cli/spamscope_topology.py @@ -60,6 +60,7 @@ def get_args(): choices=[ "spamscope_debug_iter", "spamscope_debug", + "spamscope_elasticsearch_iter", "spamscope_elasticsearch", "spamscope_redis_iter", "spamscope_redis", diff --git a/topologies/README.md b/topologies/README.md index 39e70d0..1e7ca86 100644 --- a/topologies/README.md +++ b/topologies/README.md @@ -2,6 +2,8 @@ In this folder there are all SpamScope topologies. You will see that all topologies are same, except that changes where you store the JSON results. +The topologies `_iter` are more stable because use generator to send mails to bolts. They are RAM safe. + # spamscope_debug In this topology the results are stored on file system. @@ -15,6 +17,9 @@ In this topology the results are stored in Elasticsearch. ![Schema spamscope_elasticsearch](../docs/images/schema_spamscope_elasticsearch.png?raw=true "Schema spamscope_elasticsearch") +# spamscope_elasticsearch_iter +In this topology the results are stored in Elasticsearch. It's same to `spamscope_elasticsearch` but it uses `iter-files-mails` spout. + # spamscope_redis In this topology the results are stored in Redis. diff --git a/topologies/spamscope_elasticsearch_iter.py b/topologies/spamscope_elasticsearch_iter.py new file mode 100644 index 0000000..c903817 --- /dev/null +++ b/topologies/spamscope_elasticsearch_iter.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Copyright 2019 Fedele Mantuano (https://twitter.com/fedelemantuano) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + + +from spouts import IterFilesMailSpout +from bolts import (Attachments, JsonMaker, Phishing, Tokenizer, + Urls, Network, RawMail) +from bolts.output_elasticsearch import OutputElasticsearch +from streamparse import Grouping, Topology + + +class OutputElasticsearchTopology(Topology): + + files_spout = IterFilesMailSpout.spec( + name="iter-files-mails") + + tokenizer = Tokenizer.spec( + name="tokenizer", + inputs=[files_spout], + par=1) + + attachments = Attachments.spec( + name="attachments", + inputs={tokenizer['attachments']: Grouping.fields('sha256_random')}, + par=1) + + urls = Urls.spec( + name="urls", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['body']: Grouping.fields('sha256_random')}) + + phishing = Phishing.spec( + name="phishing", + inputs={ + attachments: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + network = Network.spec( + name="network", + inputs={tokenizer['network']: Grouping.fields('sha256_random')}, + par=1) + + raw_mail = RawMail.spec( + name="raw_mail", + inputs={tokenizer['raw_mail']: Grouping.fields('sha256_random')}, + par=1) + + json_maker = JsonMaker.spec( + name="json_maker", + inputs={ + attachments: Grouping.fields('sha256_random'), + network: Grouping.fields('sha256_random'), + phishing: Grouping.fields('sha256_random'), + raw_mail: Grouping.fields('sha256_random'), + tokenizer['mail']: Grouping.fields('sha256_random'), + urls: Grouping.fields('sha256_random')}) + + output_elasticsearch = OutputElasticsearch.spec( + name="output-elasticsearch", + inputs=[json_maker]) From 1a3183f602df223b8fc0b7094c81782c4351bd4c Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Thu, 14 Mar 2019 23:53:03 +0100 Subject: [PATCH 11/14] Added new par for topology spamscope_redis_iter --- topologies/spamscope_redis_iter.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/topologies/spamscope_redis_iter.py b/topologies/spamscope_redis_iter.py index 856a41d..18b7cba 100644 --- a/topologies/spamscope_redis_iter.py +++ b/topologies/spamscope_redis_iter.py @@ -33,18 +33,19 @@ class OutputRedisTopology(Topology): tokenizer = Tokenizer.spec( name="tokenizer", inputs=[files_spout], - par=1) + par=2) attachments = Attachments.spec( name="attachments", inputs={tokenizer['attachments']: Grouping.fields('sha256_random')}, - par=1) + par=2) urls = Urls.spec( name="urls", inputs={ attachments: Grouping.fields('sha256_random'), - tokenizer['body']: Grouping.fields('sha256_random')}) + tokenizer['body']: Grouping.fields('sha256_random')}, + par=2) phishing = Phishing.spec( name="phishing", @@ -61,7 +62,7 @@ class OutputRedisTopology(Topology): raw_mail = RawMail.spec( name="raw_mail", inputs={tokenizer['raw_mail']: Grouping.fields('sha256_random')}, - par=1) + par=2) json_maker = JsonMaker.spec( name="json_maker", @@ -71,7 +72,8 @@ class OutputRedisTopology(Topology): phishing: Grouping.fields('sha256_random'), raw_mail: Grouping.fields('sha256_random'), tokenizer['mail']: Grouping.fields('sha256_random'), - urls: Grouping.fields('sha256_random')}) + urls: Grouping.fields('sha256_random')}, + par=2) output_redis = OutputRedis.spec( name="output-redis", From 82dc653f298ae109385fa2da7b5076bc9ce15e2e Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Fri, 15 Mar 2019 19:41:16 +0100 Subject: [PATCH 12/14] Catch all PickleError --- src/bolts/tokenizer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bolts/tokenizer.py b/src/bolts/tokenizer.py index 7c4d096..6258922 100644 --- a/src/bolts/tokenizer.py +++ b/src/bolts/tokenizer.py @@ -24,7 +24,7 @@ import random import six from collections import deque -from cPickle import BadPickleGet +from cPickle import PickleError from streamparse import Stream import mailparser @@ -85,7 +85,7 @@ def load_filters(self): try: obj = load_obj(path) setattr(self, "analyzed_" + i, obj) - except (IOError, EOFError, ValueError, BadPickleGet): + except (IOError, EOFError, ValueError, PickleError): setattr(self, "analyzed_" + i, deque( maxlen=getattr(self, "maxlen_" + i))) From fc1931ed26340015fd5eec399e9233b844b205f1 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sat, 16 Mar 2019 00:42:30 +0100 Subject: [PATCH 13/14] Catch TypeError --- src/modules/attachments/attachments.py | 8 +++++--- src/modules/attachments/thug_analysis.py | 8 ++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/modules/attachments/attachments.py b/src/modules/attachments/attachments.py index 63198d2..eb49961 100644 --- a/src/modules/attachments/attachments.py +++ b/src/modules/attachments/attachments.py @@ -353,9 +353,11 @@ def withhashes(cls, attachments=[]): try: payload = base64.b64decode(i["payload"]) except TypeError, e: - payload = base64.b64decode(i["payload"] + "===") - i.setdefault("errors", []).append(repr(e)) - + try: + payload = base64.b64decode(i["payload"] + "===") + i.setdefault("errors", []).append(repr(e)) + except TypeError: + continue else: payload = i["payload"] diff --git a/src/modules/attachments/thug_analysis.py b/src/modules/attachments/thug_analysis.py index 4c1323d..6167f11 100644 --- a/src/modules/attachments/thug_analysis.py +++ b/src/modules/attachments/thug_analysis.py @@ -58,8 +58,12 @@ def generate_json_report(): if m is None: return - report = json.loads(m(tempfile.gettempdir())) - return report + try: + report = json.loads(m(tempfile.gettempdir())) + except TypeError: + return + else: + return report class CustomWatchdog(Watchdog): From 424a33fe5ceda3c9be86a99d5b82754e503dfec1 Mon Sep 17 00:00:00 2001 From: Fedele Mantuano Date: Sat, 16 Mar 2019 15:10:27 +0100 Subject: [PATCH 14/14] Updated version --- project.clj | 2 +- src/options.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/project.clj b/project.clj index b43dbdb..741cfce 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject spamscope "2.7.1-SNAPSHOT" +(defproject spamscope "2.8.0-SNAPSHOT" :resource-paths ["_resources"] :target-path "_build" :min-lein-version "2.0.0" diff --git a/src/options.py b/src/options.py index 6c00a9f..661e2ae 100644 --- a/src/options.py +++ b/src/options.py @@ -19,7 +19,7 @@ from os.path import join -__version__ = "2.7.1" +__version__ = "2.8.0" __configuration_path__ = "/etc/spamscope" __defaults__ = {