-
Notifications
You must be signed in to change notification settings - Fork 36
/
pops-backup.py
180 lines (147 loc) · 6.66 KB
/
pops-backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import csv
import os
import json
import shutil
import logging
import argparse
from model import *
from dotenv import load_dotenv
from pathlib import Path
from flask import Flask
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
load_dotenv()
app = Flask(__name__)
app.config.from_pyfile("flask.cfg")
def exportassessment(assessment, filetype):
testcases = TestCase.objects(assessmentid=str(assessment.id)).all()
jsonDict = []
for testcase in testcases:
jsonDict.append(testcase.to_json(raw=True))
# Write JSON and if JSON requested, deliver file and return
with open(f"{args.backupdir}/{str(assessment.id)}/export.json", 'w') as f:
json.dump(jsonDict, f, indent=4)
# Otherwise flatten JSON arrays into comma delimited strings
for t, testcase in enumerate(jsonDict):
for field in ["sources", "targets", "tools", "controls", "tags", "redfiles", "bluefiles"]:
jsonDict[t][field] = ",".join(testcase[field])
# Convert the JSON dict to CSV and deliver
with open(f"{args.backupdir}/{str(assessment.id)}/export.csv", 'w', encoding='UTF8', newline='') as f:
if not testcases:
f.write("")
else:
writer = csv.DictWriter(f, fieldnames=jsonDict[0].keys())
writer.writeheader()
writer.writerows(jsonDict)
def exportcampaign(id):
assessment = Assessment.objects(id=id).first()
testcases = TestCase.objects(assessmentid=str(assessment.id)).all()
jsonDict = []
for testcase in testcases:
# Generate a full JSON dump but then filter to only the applicable fields
fullJson = testcase.to_json(raw=True)
campaignJson = {}
for field in ["mitreid", "tactic", "name", "objective", "actions", "tools", "uuid", "tags"]:
campaignJson[field] = fullJson[field]
jsonDict.append(campaignJson)
with open(f"{args.backupdir}/{str(assessment.id)}/campaign.json", 'w') as f:
json.dump(jsonDict, f, indent=4)
def exporttestcases(id):
# Hijack the campaign exporter and inject a "provider" field
exportcampaign(id)
with open(f"{args.backupdir}/{id}/campaign.json", 'r') as f:
jsonDict = json.load(f)
for t, _ in enumerate(jsonDict):
jsonDict[t]["provider"] = "???"
with open(f"{args.backupdir}/{id}/testcases.json", 'w') as f:
json.dump(jsonDict, f, indent=4)
def exportnavigator(id):
# Sanity check to ensure assessment exists and to die if not
_ = Assessment.objects(id=id).first()
navigator = {
"name": Assessment.objects(id=id).first().name,
"domain": "enterprise-attack",
"sorting": 3,
"layout": {
"layout": "flat",
"aggregateFunction": "average",
"showID": True,
"showName": True,
"showAggregateScores": True,
"countUnscored": False
},
"hideDisabled": False,
"techniques": [],
"gradient": {
"colors": [
"#ff6666ff",
"#ffe766ff",
"#8ec843ff"
],
"minValue": 0,
"maxValue": 100
},
"showTacticRowBackground": True,
"tacticRowBackground": "#593196",
"selectTechniquesAcrossTactics": True,
"selectSubtechniquesWithParent": False
}
for technique in Technique.objects().all():
testcases = TestCase.objects(assessmentid=id, mitreid=technique.mitreid).all()
ttp = {
"techniqueID": technique.mitreid
}
if testcases:
count = 0
outcomes = {"Prevented": 0, "Alerted": 0, "Logged": 0, "Missed": 0}
for testcase in testcases:
if testcase.outcome in outcomes.keys():
count += 1
outcomes[testcase.outcome] += 1
if count:
score = int((outcomes["Prevented"] * 3 + outcomes["Alerted"] * 2 +
outcomes["Logged"]) / (count * 3) * 100)
ttp["score"] = score
for tactic in technique.tactics:
tactic = tactic.lower().strip().replace(" ", "-")
tacticTTP = dict(ttp)
tacticTTP["tactic"] = tactic
navigator["techniques"].append(tacticTTP)
with open(f"{args.backupdir}/{id}/navigator.json", 'w') as f:
json.dump(navigator, f, indent=4)
def exportentirebackupid(assessment):
# Clear previous backup artifacts in the backup dir
shutil.rmtree(f"{args.backupdir}/{str(assessment.id)}", ignore_errors=True)
Path(f"{args.backupdir}/{str(assessment.id)}").mkdir(parents=True, exist_ok=True)
exportassessment(assessment, "csv")
exporttestcases(assessment.id)
exportnavigator(assessment.id)
with open(f'{args.backupdir}/{assessment.id}/meta.json', 'w') as f:
json.dump(assessment.to_json(raw=True), f)
#Copy remaining uploaded artifacts across to the backup dir
shutil.copytree(f"{args.files}/{str(assessment.id)}", f"{args.backupdir}/{str(assessment.id)}", dirs_exist_ok=True)
shutil.make_archive(f"{args.backupdir}/{str(assessment.id)}", 'zip', f"{args.backupdir}/{str(assessment.id)}")
if __name__ == "__main__":
try:
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--dir", dest="backupdir", help="The backup directory",required=True)
parser.add_argument("-f", "--files", dest="files", help="The PurpleOps files directory", default="files")
parser.add_argument("-H", "--host", help="Mongodb host. Default: host defined in flask.cfg")
parser.add_argument("-p", "--port", type=int,help="Mongodb port. Default: port defined in flask.cfg")
args = parser.parse_args()
logging.info("Backup started")
if not args.port is None:
app.config["MONGODB_SETTINGS"]["port"] = args.port #32768
if not args.host is None:
app.config["MONGODB_SETTINGS"]["host"] = args.host
db.init_app(app)
if not os.path.isdir(args.files):
raise Exception("Invalid PurpleOps files directory, does not exist")
if os.path.isdir(args.backupdir):
if os.path.abspath(args.backupdir) == os.path.abspath(args.files):
raise Exception("Invalid backup director, can't backup to the PurpleOps files directory")
for a in Assessment.objects().all():
logging.info(f"Exporting: {a.name}")
exportentirebackupid(a)
logging.info("Backup completed")
except:
logging.exception("Error during backup, please fix and try again")