-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_structures.py
148 lines (120 loc) · 5.1 KB
/
data_structures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
Define data structures to hold information regarding the following components of the binary view:
- binary view itself
- MLIL function
- MLIL basic block
- MLIL instruction
All structures defined here will be serve as a schema for the neo4j graph
"""
import threading
################################################################################################################
# UUID #
################################################################################################################
class UUID:
"""
UUID object, used to synchronize the UUID of any new object created in the graph.
Neo4j should take care of the sync locks of multithreading, making this thread-safe.
"""
_singleton_exists = False
_lock = threading.Lock()
_modulu_helper = 0
def __init__(self, driver, chunk_size=100):
"""
:param driver: (Neo4j_Bolt_Driver) connector to the relevant DB
"""
if not self._singleton_exists:
self._singleton_exists = True
else:
print("ERROR, cant create more then one UUID object")
return
self._driver = driver
self._chunk_size = chunk_size # how many _uuid's to supply to a consumer of this class in a single call
self._modulu_helper = chunk_size
#self._lock.acquire()
with driver.session() as session:
uuid = session.run("MATCH (n:UUID)"
"return n.uuid")
if not uuid.peek():
session.run("CREATE (:UUID {uuid: 1})")
self._uuid = 1
else:
self._uuid = self._get_current_uuid()
#self._lock.release()
def _get_current_uuid(self):
"""
internal function - receives the current _uuid pointed by the _uuid node
:return: _uuid: (INT) the current _uuid
"""
with self._driver.session() as session:
# self._lock.acquire()
uuid = session.run("MATCH (n:UUID)"
"RETURN n.uuid")
# self._lock.release()
return int(uuid.peek()['n.uuid'])
def _set_current_uuid(self, new_uuid):
"""
internal function - sets the current _uuid pointed by the _uuid node
:return: _uuid: (INT) the current _uuid
"""
with self._driver.session() as session:
# self._lock.acquire()
uuid = session.run("MATCH (n:UUID)"
"SET n.uuid = {new_uuid}"
"RETURN n.uuid",
new_uuid=new_uuid)
# self._lock.release()
return int(uuid.peek()['n.uuid'])
def _get_chunk(self, size):
"""
Create a thread safe way to give a chunk of _uuid's to a thread for consumption
:param size: (INT) how many _uuid's are needed?
:return: chunk_start: (INT)
"""
chunk_start = self._get_current_uuid()
self._set_current_uuid(chunk_start + size)
return chunk_start
def get_uuid(self):
"""
returns a _uuid for a consumer of the class to use.
responsible for fetching more chunks from the DB if the current chunk is complete.
:return: _uuid: (INT)
"""
if self._modulu_helper == self._chunk_size:
# consumed the current chunk, get another one from DB
self._modulu_helper = 0
self._uuid = self._get_chunk(self._chunk_size)
return self._uuid
else:
self._modulu_helper += 1
self._uuid += 1
return self._uuid
################################################################################################################
# MLIL BASIC BLOCK #
################################################################################################################
"""
HASH - xxhash of the basic block, created by digesting all tokens from all instructions within the basic block
"""
mlil_basic_block = dict(
HASH=None,
NodeLabel='BasicBlock',
)
################################################################################################################
# MLIL INSTRUCTION #
################################################################################################################
"""
HASH - xxhash of the instruction tokens
"""
mlil_instruction = dict(
HASH=None,
NodeLabel='Instruction',
)
################################################################################################################
# MLIL EXPRESSION #
################################################################################################################
"""
HASH - xxhash of the expression tokens
"""
mlil_expression = dict(
HASH=None,
NodeLabel='Expression'
)