Skip to content

Commit

Permalink
Merge branch 'release/0.7'
Browse files Browse the repository at this point in the history
  • Loading branch information
fedelemantuano committed Sep 2, 2016
2 parents 74a4e35 + e05aecc commit 53fbb36
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 11 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ SpamScope can be downloaded, used, and modified free of charge. It is available


## Installation
For more details please visit [wiki page](https://github.com/SpamScope/spamscope/wiki/Installation).

Clone repository

```
git clone https://github.com/fedelemantuano/spamscope.git
git clone https://github.com/SpamScope/spamscope.git
```

Install requirements in file `requirements.txt` with `python-pip`:
Expand Down Expand Up @@ -87,7 +88,7 @@ topology.max.spout.pending: 100
topology.sleep.spout.wait.strategy.time.ms: 10
```

If Apache Tika is enable:
If Apache Tika is enabled:

```
topology.max.spout.pending: 10
Expand All @@ -112,4 +113,11 @@ It's possible add to results (for mail attachments) Virustotal report. Maybe you

## Docker image

It's possible to use a complete Docker image with Apache Storm and SpamScope. Take it [here](https://hub.docker.com/r/fmantuano/spamscope/).
It's possible to use a complete Docker image with Apache Storm and SpamScope. Take it [here](https://hub.docker.com/r/fmantuano/spamscope/). There are two tags: **latest** and **develop**.

![Apache Storm](doc/images/Docker00.png?raw=true "Apache Storm")

![SpamScope](doc/images/Docker01.png?raw=true "SpamScope")

![SpamScope Topology](doc/images/Docker02.png?raw=true "SpamScope Topology")

19 changes: 19 additions & 0 deletions conf/components/bolts.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,22 @@ output-redis-bolt:
max_retry: 60
flush_size: 50
queue_name: spamscope


output-elasticsearch-bolt:
servers:
hosts:
- "node1:9200"
- "node2"

# If your application is long-running consider turning on Sniffing
# to make sure the client is up to date on the cluster location.
sniffer.timeout: 60

# Prefix with dash '-'. SpamScope use a index for day
index.prefix: spamscope-
doc.type.analysis: analysis
doc.type.attachments: attachments

#max number of json to send in a bulk
flush_size: 50
Binary file added doc/images/Docker00.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/Docker01.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/Docker02.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cffi==1.7.0
contextlib2==0.5.3
ecdsa==0.13
elasticsearch==2.4.0
Fabric==1.11.1
invoke==0.12.2
Jinja2==2.8
Expand All @@ -12,6 +13,7 @@ prettytable==0.7.2
pycparser==2.14
pycrypto==2.6.1
pyparsing==2.1.1
PySocks==1.5.7
python-magic==0.4.12
PyYAML==3.11
requests==2.11.1
Expand All @@ -20,4 +22,5 @@ six==1.10.0
ssdeep==3.1.1
streamparse==2.1.4
tika-app==0.3
urllib3==1.16
virustotal-api==1.1.2
23 changes: 19 additions & 4 deletions src/bolts/json_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,33 @@ def _compose_output(self, greedy_data):
# Urls in body
mail['with_urls_body'] = greedy_data['urls_handler_body-bolt'][1]
if mail['with_urls_body']:
mail['urls_body'] = json.loads(
greedy_data['urls_handler_body-bolt'][2]
)

# Change urls format to fix Elasticsearch issue with dot '.'
reformat_urls = []
urls = json.loads(
greedy_data['urls_handler_body-bolt'][2])

for v in urls.values():
reformat_urls.extend(v)

mail['urls_body'] = reformat_urls

# Urls in attachments
mail['with_urls_attachments'] = \
greedy_data['urls_handler_attachments-bolt'][1]
if mail['with_urls_attachments']:
mail['urls_attachments'] = json.loads(

# Change urls format to fix Elasticsearch issue with dot '.'
reformat_urls = []
urls = json.loads(
greedy_data['urls_handler_attachments-bolt'][2]
)

for v in urls.values():
reformat_urls.extend(v)

mail['urls_attachments'] = reformat_urls

return json.dumps(mail, ensure_ascii=False)

def process(self, tup):
Expand Down
146 changes: 146 additions & 0 deletions src/bolts/output_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Copyright 2016 Fedele Mantuano (https://twitter.com/fedelemantuano)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from __future__ import absolute_import, print_function, unicode_literals
from bolts.abstracts import AbstractBolt
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import copy
import datetime

try:
import simplejson as json
except ImportError:
import json


class OutputElasticsearch(AbstractBolt):
"""Output tokenized mails on Elasticsearch. """

def initialize(self, stormconf, context):
super(OutputElasticsearch, self).initialize(stormconf, context)

# Elasticsearch parameters
servers = self.conf['servers']
self._index_prefix = servers['index.prefix']
self._doc_type_analysis = servers['doc.type.analysis']
self._doc_type_attachments = servers['doc.type.attachments']
self._flush_size = servers['flush_size']

# Elasticsearch object
self._es = Elasticsearch(
hosts=servers['hosts'],
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=int(servers['sniffer.timeout']),
)

# Init
self._mails = []
self._attachments = []
self._count = 1

def flush(self):
helpers.bulk(self._es, self._mails)
helpers.bulk(self._es, self._attachments)
self._mails = []
self._attachments = []
self._count = 1

def process(self, tup):
try:
sha256_random = tup.values[0]
mail = json.loads(tup.values[1])

# Date for daily index
timestamp = datetime.datetime.strptime(
mail['analisys_date'],
"%Y-%m-%dT%H:%M:%S.%f",
)
mail_date = timestamp.strftime("%Y.%m.%d")

# Get a copy of attachments
attachments = []
if mail.get("attachments", []):
attachments = copy.deepcopy(mail["attachments"])

# Prepair attachments for bulk
for i in attachments:
i['@timestamp'] = timestamp
i['_index'] = self._index_prefix + mail_date
i['_type'] = self._doc_type_attachments
i['type'] = self._doc_type_attachments
i['is_archived'] = False

for j in i.get("files", []):

f = copy.deepcopy(j)

# Prepair archived files
f['@timestamp'] = timestamp
f['_index'] = self._index_prefix + mail_date
f['_type'] = self._doc_type_attachments
f['type'] = self._doc_type_attachments
f['is_archived'] = True
self._attachments.append(f)

# Remove from archived payload and virustotal,
# now in root
j.pop("payload", None)
j.pop("virustotal", None)

self._attachments.append(i)

# Remove from mail the attachments huge fields like payload
# Fetch from Elasticsearch more fast
for i in mail.get("attachments", []):
i.pop("payload", None)
i.pop("tika", None)
i.pop("virustotal", None)

for j in i.get("files", []):
j.pop("payload", None)
j.pop("virustotal", None)

# Prepair mail for bulk
mail['@timestamp'] = timestamp
mail['_index'] = self._index_prefix + mail_date
mail['type'] = self._doc_type_analysis
mail['_type'] = self._doc_type_analysis

# Append mail in own date
self._mails.append(mail)

# Flush
if self._count == self._flush_size:
self.flush()
else:
self._count += 1

except Exception as e:
self.log(
"Failed process json for mail: {}".format(sha256_random),
"error"
)
self.raise_exception(e, tup)

def process_tick(self, freq):
"""Every freq seconds flush messages. """
super(OutputElasticsearch, self)._conf_loader()

if self._mails:
self.log("Flush mail in Redis server after tick")
self.flush()
2 changes: 2 additions & 0 deletions src/bolts/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ def tokenizer(self, mail_server, mailbox, priority, mail_data, kind_data):
else:
mail['date'] = datetime.datetime.utcnow().isoformat()

mail['analisys_date'] = datetime.datetime.utcnow().isoformat()

# Check message-id
if not mail.get('message_id'):
# to identify mail
Expand Down
4 changes: 0 additions & 4 deletions src/modules/sample_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ class VirusTotalApiKeyInvalid(ValueError):
pass


class TikaServerOffline(Exception):
pass


class SampleParser(object):

def __init__(
Expand Down
10 changes: 10 additions & 0 deletions topologies/spamscope.example.clj
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@
"bolts.conf", "/etc/spamscope/bolts.yml",
}
)
"output-elasticsearch-bolt" (python-bolt-spec
options
{"json-bolt" :shuffle}
"bolts.output_elasticsearch.OutputElasticsearch"
[]
:p 1
:conf {
"bolts.conf", "/etc/spamscope/bolts.yml",
}
)
}
]
)

0 comments on commit 53fbb36

Please sign in to comment.