forked from Fraunhofer-AISEC/archie
-
Notifications
You must be signed in to change notification settings - Fork 0
/
calculate_trigger.py
146 lines (134 loc) · 5.14 KB
/
calculate_trigger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from faultclass import build_filters
import logging
logger = logging.getLogger(__name__)
def find_tb_info_row(tb_id, goldenrun_tb_info):
idx = goldenrun_tb_info.index[goldenrun_tb_info["id"] == tb_id]
return idx[0]
def allign_fault_to_instruction(address, tbinfo_size, tbinfo_assembler, tbinfo_id):
asm_addresses = []
"Start searching for instruction addresses"
split = tbinfo_assembler.split("[ ")
for sp in split[1:]:
"Find end of address"
s = sp.split("]")
"Convert and append to list"
asm_addresses.append(int("0x" + s[0].strip(), 0))
asm_addresses.append(tbinfo_id + tbinfo_size)
for i in range(0, len(asm_addresses) - 1, 1):
if address >= asm_addresses[i] and address < asm_addresses[i + 1]:
return asm_addresses[i]
def search_for_fault_location(
filter_lists,
trigger_position,
fault_address,
trigger_occurences,
goldenrun_tb_exec,
goldenrun_tb_info,
):
"""Automatically search for trigger instruction"""
found_tbs = []
logger.info(
"search trigger for faulting instruction address {}".format(fault_address)
)
for index, tb in goldenrun_tb_info.iterrows():
if fault_address >= tb["id"] and (fault_address < tb["id"] + tb["size"]):
found_tbs.append(tb["id"])
idx = None
first = 0
for tbs in found_tbs:
tmp = goldenrun_tb_exec.index[goldenrun_tb_exec["tb"] == tbs]
if first == 0:
first = 1
idx = tmp
else:
idx = idx.union(tmp)
"""Identify desired occurrence"""
if trigger_occurences >= len(idx):
return -1
idx = idx[trigger_occurences]
idtbinfo = find_tb_info_row(goldenrun_tb_exec.at[idx, "tb"], goldenrun_tb_info)
ins = allign_fault_to_instruction(
fault_address,
goldenrun_tb_info.at[idtbinfo, "size"],
goldenrun_tb_info.at[idtbinfo, "assembler"],
goldenrun_tb_info.at[idtbinfo, "id"],
)
is_first_instruction = 0
trigger_position = trigger_position * (-1)
while trigger_position != 0:
idtbinfo = find_tb_info_row(goldenrun_tb_exec.at[idx, "tb"], goldenrun_tb_info)
if is_first_instruction == 1:
"""Is current tb to short for trigger position"""
if trigger_position > goldenrun_tb_info.at[idtbinfo, "ins_count"]:
idx = idx - 1
trigger_position = (
trigger_position - goldenrun_tb_info.at[idtbinfo, "ins_count"]
)
else:
for filt in filter_lists:
if filt[0] == goldenrun_tb_info.at[idtbinfo, "id"]:
ins = filt[len(filt) - trigger_position]
trigger_position = 0
break
else:
tb_id = goldenrun_tb_exec.at[idx, "tb"]
for filt in filter_lists:
"""found matching filter"""
if filt[0] == tb_id:
for i in range(0, len(filt), 1):
if filt[i] == ins:
is_first_instruction = 1
"""Case ins is in the current tb"""
if i >= trigger_position:
i -= trigger_position
ins = filt[i]
trigger_position = 0
else:
"""Case ins is not in the current tb"""
trigger_position -= i
idx -= 1
break
logger.info(
"Found trigger for faulting instruction address {} at {}".format(
fault_address, ins
)
)
return ins
def calculate_trigger_addresses(fault_list, goldenrun_tb_exec, goldenrun_tb_info):
""""""
"check every fault list"
cachelist = []
lists = build_filters(goldenrun_tb_info)
for list in lists:
list = list.reverse()
for faults in fault_list:
for fault in faults["faultlist"]:
if fault.trigger.address >= 0 or fault.trigger.hitcounter == 0:
continue
found = False
for tdict in cachelist:
if (
tdict["faultaddress"] == fault.address
and tdict["triggerhitcounter"] == fault.trigger.hitcounter
and tdict["triggeraddress"] == fault.trigger.address
):
fault.trigger.address = tdict["answer"]
found = True
break
if found is True:
continue
tbs = search_for_fault_location(
lists,
fault.trigger.address,
fault.address,
fault.trigger.hitcounter,
goldenrun_tb_exec,
goldenrun_tb_info,
)
d = {}
d["faultaddress"] = fault.address
d["triggerhitcounter"] = fault.trigger.hitcounter
d["triggeraddress"] = fault.trigger.address
d["answer"] = tbs
cachelist.insert(0, d)
fault.trigger.address = tbs