forked from mnemonic-no/act-workers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
attack.py
executable file
·379 lines (284 loc) · 13.2 KB
/
attack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python3
"""
Worker for Mitre ATT&CK, using the STIX implementation available here:
https://github.com/mitre/cti
ATT&CK Property STIX Object type ACT object
=========================================================
Technique attack-pattern technique
Group intrusion-set threatActor
Software malware or tool tool
Mitigation course-of-action n/a
"""
import os
from logging import error, warning, info
import traceback
import act
from stix2 import parse, Filter, MemoryStore
import worker
MITRE_ENTERPRISE_ATTACK_URL = "https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json"
MITRE_PRE_ATTACK_URL = "https://raw.githubusercontent.com/mitre/cti/master/pre-attack/pre-attack.json"
MITRE_MOBILE_ATTACK_URL = "https://raw.githubusercontent.com/mitre/cti/master/mobile-attack/mobile-attack.json"
DEFAULT_NOTIFY_CACHE = os.path.join(os.environ["HOME"], "act-mitre-attack-notify.cache")
def parseargs():
""" Parse arguments """
parser = worker.parseargs('Mitre ATT&CK worker')
parser.add_argument('--smtphost', dest='smtphost', help="SMTP host used to send revoked/deprecated objects")
parser.add_argument('--sender', dest='sender', help="Sender address used to send revoked/deprecated objects")
parser.add_argument('--recipient', dest='recipient', help="Recipient address used to send revoked/deprecated objects")
parser.add_argument('--notifycache', dest='notifycache', help="Cache for revoked/deprecated objects", default=DEFAULT_NOTIFY_CACHE)
args = parser.parse_args()
return args
def get_attack(url, proxy_string, timeout):
"""Fetch Mitre ATT&CK JSON data in Stix2 format and return a Stix2 memory store"""
attack = worker.fetch_json(url, proxy_string, timeout)
# Create memory store
mem = MemoryStore()
# Add all objects to the memory store
for obj in parse(attack, allow_custom=True).objects:
mem.add(obj)
return mem
def add_fact(client, source_type, source_values, fact_type, destination_type, destination_values, link_type="linked"):
"""
Add facts for all combinations of source_values and destination_values,
using the specified source_type, fact_type, destination_type and
link_type.
Args:
client(act.Act): ACT instance
source_type(str): ACT object source type
source_values(str[]): List of source values
destination_type(str): ACT object destination type
destination_values(str[]): List of destination values
link_type(str): linked|bidirectional
link_type == linked, means a fact with a specified source and destination.
link_type == bidirectional, means a fact where source/destination have a two way direction
"""
# Ensure source/destination values lists, if not enclose in a list with a single value
if isinstance(destination_values, str):
destination_values = [destination_values]
if isinstance(source_values, str):
source_values = [source_values]
for source_value in source_values:
try:
for destination_value in destination_values:
fact = None
if source_type == destination_type and source_value == destination_value:
continue # Do not link to itself
if link_type == "linked":
fact = client.fact(fact_type)\
.source(source_type, source_value)\
.destination(destination_type, destination_value)
elif link_type == "bidirectional":
fact = client.fact(fact_type)\
.bidirectional(source_type, source_value)\
.bidirectional(destination_type, destination_value)
else:
error("Illegal link_type: %s" % link_type)
continue
if client.act_baseurl: # Add fact toplatform
fact.add()
else:
print(fact.json()) # Print fact to stdout, if baseurl is NOT set
except act.base.ResponseError as e:
error(e)
continue
def get_techniques(attack):
"""
extract objects/facts related to ATT&CK techniques
Args:
attack (stix2): Stix attack instance
"""
notify = []
facts = []
# ATT&CK concept STIX Object type ACT object
# =========================================================
# Technique attack-pattern technique
# Filter out ATT&CK techniques (attack-pattern) from bundle
for technique in attack.query([Filter("type", "=", "attack-pattern")]):
if getattr(technique, "revoked", None):
# Object is revoked, add to notification list but do not add to facts that should be added to the platform
notify.append(technique)
continue
if getattr(technique, "x_mitre_deprecated", None):
# Object is revoked, add to notification list AND continue to add to facts that should be added to the platform
notify.append(technique)
# Mitre ATT&CK Tactics are implemented in STIX as kill chain phases with kill_chain_name "mitre-attack"
tactics = [
tactic.phase_name
for tactic in technique.kill_chain_phases
if tactic.kill_chain_name == "mitre-attack"
]
facts.append(("tactic", tactics, "usesTechnique", "technique", technique.name, "linked"))
return (facts, notify)
def get_groups(attack):
"""
extract objects/facts related to ATT&CK Groups
Args:
attack (stix2): Stix attack instance
"""
notify = []
facts = []
# ATT&CK concept STIX Object type ACT object
# =========================================================
# Group intrusion-set threatActor
#
# Filter out ATT&CK groups (intrusion-set) from bundle
for group in attack.query([Filter("type", "=", "intrusion-set")]):
if getattr(group, "revoked", None):
# Object is revoked, add to notification list but do not add to facts that should be added to the platform
notify.append(group)
continue
if getattr(group, "x_mitre_deprecated", None):
# Object is revoked, add to notification list AND continue to add to facts that should be added to the platform
notify.append(group)
if getattr(group, "aliases", None):
facts.append(("threatActor",
group.name,
"threatActorAlias",
"threatActor",
group.aliases,
"bidirectional"))
# ATT&CK concept STIX Properties
# ==========================================================================
# Software relationship where relationship_type == "uses",
# points to a target object with type== "malware" or "tool"
uses_tools = [
tool.name.lower()
for tool in attack.related_to(group, relationship_type="uses")
if tool.type in ("malware", "tool")
]
# ATT&CK concept STIX Properties
# ==========================================================================
# Technqiues relationship where relationship_type == "uses", points to
# a target object with type == "attack-pattern"
uses_techniques = [
tech.name
for tech in attack.related_to(group, relationship_type="uses")
if tech.type in ("attack-pattern")
]
facts.append(("threatActor", group.name, "usesTechnique", "technique", uses_techniques, "linked"))
facts.append(("threatActor", group.name, "usesTool", "tool", uses_tools, "linked"))
return (facts, notify)
def get_software(attack):
"""
extract objects/facts related to ATT&CK Software
Insert to ACT if client.baseurl is set, if not, print to stdout
Args:
attack (stix2): Stix attack instance
"""
notify = []
facts = []
for software in attack.query([Filter("type", "in", ["tool", "malware"])]):
if getattr(software, "revoked", None):
# Object is revoked, add to notification list but do not add to facts that should be added to the platform
notify.append(group)
continue
if getattr(software, "x_mitre_deprecated", None):
# Object is revoked, add to notification list AND continue to add to facts that should be added to the platform
notify.append(software)
if hasattr(software, "x_mitre_aliases"):
aliases = [tool.lower() for tool in software.x_mitre_aliases]
facts.append(("tool", software.name.lower(), "toolAlias", "tool", aliases, "bidirectional"))
# ATT&CK concept STIX Properties
# ==========================================================================
# Technqiues relationship where relationship_type == "uses", points to
# a target object with type == "attack-pattern"
uses_techniques = [
tech.name
for tech in attack.related_to(software, relationship_type="uses")
if tech.type in ("attack-pattern")
]
facts.append(("tool", software.name, "usesTechnique", "technique", uses_techniques, "linked"))
return (facts, notify)
def notify_cache(filename):
"""
Read notify cache from filename
Args:
filename(str): Cache filename
"""
cache = {}
try:
with open(filename) as f:
for line in f:
if line:
cache[line.strip()] = True
except FileNotFoundError:
warning("Cache file {} not found, will be created if necessary".format(filename))
return cache
def add_to_cache(filename, entry):
"""
Add entry to cache
Args:
filename(str): Cache filename
entry(str): Cache entry
"""
with open(filename, "a") as f:
f.write(entry.strip())
f.write("\n")
def send_notification(notify, notifycache, smtphost, sender, recipient):
"""
Process revoked objects
Args:
notify(attack[]): Array of revoked/deprecated Stix objects
notifycache(str): Filename of notify cache
smtphost(str): SMTP host used to notify of revoked/deprecated objects
sender(str): sender address used to notify of revoked/deprecated objects
recipient(str): recipient address used to notify of revoked/deprecated objects
smtphost, sender AND recipient must be set to notify of revoked/deprecated objects
"""
body = url + "\n\n"
warning("[{}]".format(url))
for obj in notify:
# Add object to cache, so we will not be notified on the same object on the next run
add_to_cache(notifycache, obj.id)
if getattr(obj, "revoked", None):
text = "revoked: {}:{}".format(obj.type, obj.name)
elif getattr(obj, "x_mitre_deprecated", None):
text = "deprecated: {}:{}".format(obj.type, obj.name)
else:
text = "ERROR obj is not deprecated or revoked: {}:{}".format(obj.type, obj.name)
body += text + "\n"
warning(text)
if smtphost and recipient and sender:
worker.sendmail(smtphost, sender, recipient, "Revoked/deprecated objects from MITRE/ATT&CK", body)
info("Email sent to {}".format(recipient))
else:
error("--smtphost, --recipient and --sender must be set to send revoked/deprecated objects on email")
def main():
args = parseargs()
client = act.Act(
args.act_baseurl,
args.user_id,
args.loglevel,
args.logfile,
"mitre-attack")
for url in (MITRE_ENTERPRISE_ATTACK_URL, MITRE_MOBILE_ATTACK_URL, MITRE_PRE_ATTACK_URL):
cache = notify_cache(args.notifycache)
# Get attack dataset as Stix Memory Store
attack = get_attack(url, args.proxy_string, args.timeout)
(techniques, techniques_notify) = get_techniques(attack)
(groups, groups_notify) = get_groups(attack)
(software, software_notify) = get_software(attack)
# Add facts to platform
facts = techniques + groups + software
for (source_type, source_values, fact_type, destination_type, destination_values, link_type) in facts:
add_fact(client,
source_type,
source_values,
fact_type,
destination_type,
destination_values,
link_type)
# Get revoked objects, excluding those in cache
notify = [
notify
for notify in techniques_notify + groups_notify + software_notify
if notify.id not in cache
]
if notify:
send_notification(notify, args.notifycache, args.smtphost, args.sender, args.recipient)
if __name__ == '__main__':
try:
main()
except Exception as e:
error("Unhandled exception: {}".format(traceback.format_exc()))
raise