forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
clamav.py
128 lines (103 loc) · 4.3 KB
/
clamav.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import base64
import io
import json
import logging
import sys
import zipfile
import clamd
from . import check_input_attribute, standard_error_message
from typing import Optional
from pymisp import MISPEvent, MISPObject
log = logging.getLogger("clamav")
log.setLevel(logging.DEBUG)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
fmt = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
sh.setFormatter(fmt)
log.addHandler(sh)
moduleinfo = {
"version": "0.1",
"author": "Jakub Onderka",
"description": "Submit file to ClamAV",
"module-type": ["expansion"]
}
moduleconfig = ["connection"]
mispattributes = {
"input": ["attachment", "malware-sample"],
"format": "misp_standard"
}
def create_response(original_attribute: dict, software: str, signature: Optional[str] = None) -> dict:
misp_event = MISPEvent()
if signature:
misp_event.add_attribute(**original_attribute)
av_signature_object = MISPObject("av-signature")
av_signature_object.add_attribute("signature", signature)
av_signature_object.add_attribute("software", software)
av_signature_object.add_reference(original_attribute["uuid"], "belongs-to")
misp_event.add_object(av_signature_object)
event = json.loads(misp_event.to_json())
results = {key: event[key] for key in ('Attribute', 'Object') if (key in event and event[key])}
return {"results": results}
def connect_to_clamav(connection_string: str) -> clamd.ClamdNetworkSocket:
if connection_string.startswith("unix://"):
return clamd.ClamdUnixSocket(connection_string.replace("unix://", ""))
elif ":" in connection_string:
host, port = connection_string.split(":")
return clamd.ClamdNetworkSocket(host, int(port))
else:
raise Exception("ClamAV connection string is invalid. It must be unix socket path with 'unix://' prefix or IP:PORT.")
def handler(q=False):
if q is False:
return False
request = json.loads(q)
connection_string: str = request["config"].get("connection")
if not connection_string:
return {"error": "No ClamAV connection string provided"}
attribute = request.get("attribute")
if not attribute:
return {"error": "No attribute provided"}
if not check_input_attribute(request['attribute']):
return {'error': f'{standard_error_message}, which should contain at least a type, a value and an uuid.'}
if attribute["type"] not in mispattributes["input"]:
return {"error": "Invalid attribute type provided, expected 'malware-sample' or 'attachment'"}
attribute_data = attribute.get("data")
if not attribute_data:
return {"error": "No attribute data provided"}
try:
clamav = connect_to_clamav(connection_string)
software_version = clamav.version()
except Exception:
logging.exception("Could not connect to ClamAV")
return {"error": "Could not connect to ClamAV"}
try:
data = base64.b64decode(attribute_data, validate=True)
except Exception:
logging.exception("Provided data is not valid base64 encoded string")
return {"error": "Provided data is not valid base64 encoded string"}
if attribute["type"] == "malware-sample":
try:
with zipfile.ZipFile(io.BytesIO(data)) as zipf:
data = zipf.read(zipf.namelist()[0], pwd=b"infected")
except Exception:
logging.exception("Could not extract malware sample from ZIP file")
return {"error": "Could not extract malware sample from ZIP file"}
try:
status, reason = clamav.instream(io.BytesIO(data))["stream"]
except Exception:
logging.exception("Could not send attribute data to ClamAV. Maybe file is too big?")
return {"error": "Could not send attribute data to ClamAV. Maybe file is too big?"}
if status == "ERROR":
return {"error": "ClamAV returned error message: {}".format(reason)}
elif status == "OK":
return {"results": {}}
elif status == "FOUND":
return create_response(attribute, software_version, reason)
else:
return {"error": "ClamAV returned invalid status {}: {}".format(status, reason)}
def introspection():
return mispattributes
def version():
moduleinfo["config"] = moduleconfig
return moduleinfo