-
Notifications
You must be signed in to change notification settings - Fork 1
/
builder.py
122 lines (102 loc) · 4.2 KB
/
builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
import sys
import json
import jsonlines
import concurrent.futures
import pandas as pd
import features
import config
def Builder(typeClass):
"""
builder function iterates over all samples and dumps raw features into a json file.
:param typeClass: class of malware to focus on
"""
print(f"{config.Colours.INFO}[*] Building dataset for {typeClass}.{config.Colours.ENDC}")
# Set the path and clear typeClass' download queues and json dump.
path = f"dataset/{typeClass}"
try:
os.remove(f"{path}/dump.jsonl")
except FileNotFoundError:
pass
# Define a base feature set for the class.
featuresSet = set([
'class',
'regkey_read',
'regkey_opened',
'regkey_deleted',
'regkey_written',
"file_failed",
"file_copied",
"file_exists",
"file_opened",
"file_read",
"file_written",
"dll_loaded",
])
# Intialize feature class objects.
APICalls = features.APICalls()
FileActions = features.FileActions()
RegistryActions = features.RegistryActions()
DLLLoads = features.DLLLoads()
# Initialize feature dictionary lists.
APICallsList = []
FileActionsList = []
RegistryActionsList = []
DLLLoadsList = []
# Prase files and build feature dictionaries for each feature class.
for trace in os.listdir(path):
print(f"[~] Parsing {path}/{trace}")
APICallsDict = APICalls.processFeatures(f"{path}/{trace}")
APICallsList.append(APICallsDict)
# Add all the unique API Calls to the feature set.
for call, _ in APICallsDict.items():
featuresSet.add(call)
FileActionsList.append(FileActions.processFeatures(f"{path}/{trace}"))
RegistryActionsList.append(RegistryActions.processFeatures(f"{path}/{trace}"))
DLLLoadsList.append(DLLLoads.processFeatures(f"{path}/{trace}"))
# Dump all the feature dictionaries for the given typeClass into it's local folder as a json list.
with jsonlines.open(f"{path}/dump.jsonl", 'w') as buildFile:
for APICallsDict, FileActionsDict, RegistryActionsDict, DLLLoadsDict in zip(APICallsList, FileActionsList, RegistryActionsList, DLLLoadsList):
# Merges all the dictionary together into a single dictionary.
merged = {"class": typeClass, **APICallsDict, **FileActionsDict, **RegistryActionsDict, **DLLLoadsDict}
for key in featuresSet:
if key not in merged:
merged[key] = 0
buildFile.write(merged)
print(f"{config.Colours.SUCCESS}[+] Dataset build for {typeClass} complete.{config.Colours.ENDC}")
return
def Reader(typeClass):
"""
Reads the dump files for the specified class and returns a pandas dataframe.
:param typeClass: type class for which the data is to be read.
"""
print(f"{config.Colours.HEADER}[+] Initiated dataset read.{config.Colours.ENDC}")
# Check if a data dump file for dynamic features exists.
dumpFile = "dynamic_features.jsonl"
try:
os.remove(dumpFile)
except FileNotFoundError:
pass
print(f"{config.Colours.INFO}[*] Reading dataset for {typeClass}.{config.Colours.ENDC}")
path = f"dataset/{typeClass}/dump.jsonl"
return pd.read_json(path, lines=True)
def BuildDataset():
"""
Download all the files in a multi-threaded implementation to build a local database.
"""
print(f"{config.Colours.HEADER}[+] Initiated dataset build.{config.Colours.ENDC}")
# Multi threaded building process for json dumps.
executor = concurrent.futures.ThreadPoolExecutor(max_workers = len(config.Classes))
for typeClass in config.Classes:
try:
executor.submit(Builder, typeClass)
print(f"[+] Thread started for {typeClass}.")
except:
print(f"{config.Colours.ERROR}[!] Unable to start thread for {typeClass}.{config.Colours.ENDC}")
# Shutdown the thread manager during exit.
executor.shutdown(wait=True)
print(f"{config.Colours.SUCCESS}[+] Dataset build complete.{config.Colours.ENDC}")
return
if __name__ == "__main__":
# Shutdown the thread manager during exit.
BuildDataset()