1+ import sys
2+ import os
3+ import pysvf
4+ from typing import Union
5+
6+ class AndersenPTA :
7+ def __init__ (self , pag : pysvf .SVFIR ):
8+ assert isinstance (pag , pysvf .SVFIR ), "pag is not a valid SVFIR object, the type of pag is {}" .format (type (pag ))
9+ self .pag = pag
10+ self .ander_base = pysvf .AndersenBase (pag )
11+ self .consCG = None
12+ return
13+
14+ '''
15+ // TODO: Implement your Andersen's Algorithm here
16+ /// The solving rules are as follows:
17+ /// p <--Addr-- o => pts(p) = pts(p) ∪ {o}
18+ /// q <--COPY-- p => pts(q) = pts(q) ∪ pts(p)
19+ /// q <--LOAD-- p => for each o ∈ pts(p) : q <--COPY-- o
20+ /// q <--STORE-- p => for each o ∈ pts(q) : o <--COPY-- p
21+ /// q <--GEP, fld-- p => for each o ∈ pts(p) : pts(q) = pts(q) ∪ {o.fld}
22+ /// pts(q) denotes the points-to set of q
23+ '''
24+ def solve_worklist (self ):
25+ pass
26+
27+ # =========== Don't modify the code below in this class ============
28+
29+ def initialize (self ):
30+ self .ander_base .initialize ()
31+ self .consCG = self .ander_base .get_constraint_graph ()
32+
33+ def init_worklist (self ):
34+ self .ander_base .init_worklist ()
35+
36+ def update_call_graph (self ):
37+ return self .ander_base .update_call_graph ()
38+
39+
40+ def push_into_worklist (self , node ):
41+ assert isinstance (node , int ), "node is not a valid int object, the type of node is {}" .format (type (node ))
42+ self .ander_base .push_into_worklist (node )
43+
44+ def finalize (self ):
45+ self .ander_base .finalize ()
46+
47+ def add_pts (self , id : int , ptd : int ) -> bool :
48+ assert isinstance (ptd , int ), "ptd is not a valid int object, the type of ptd is {}" .format (type (ptd ))
49+ assert isinstance (id , int ), "id is not a valid int object, the type of id is {}" .format (type (id ))
50+ return self .ander_base .add_pts (id , ptd )
51+
52+ def union_pts (self , id :int , ptd : Union [int , pysvf .PointsTo , None ]) -> bool :
53+ assert isinstance (id , int ), "id is not a valid int object, the type of id is {}" .format (type (id ))
54+ assert isinstance (ptd , (int , pysvf .PointsTo ,type (None ))), "ptd is not a valid int/PointsTo/None object, the type of ptd is {}" .format (type (ptd ))
55+ if isinstance (ptd , pysvf .PointsTo ):
56+ return self .ander_base .union_pts_2 (id , ptd )
57+ elif isinstance (ptd , int ):
58+ return self .ander_base .union_pts (id , ptd )
59+ else :
60+ raise TypeError ("Invalid type for ptd" )
61+
62+
63+ def get_pts (self , id :int ):
64+ assert isinstance (id , int ), "id is not a valid int object, the type of id is {}" .format (type (id ))
65+ return self .ander_base .get_pts (id )
66+
67+ def is_worklist_empty (self ) -> bool :
68+ return self .ander_base .is_worklist_empty ()
69+
70+ def pop_from_worklist (self ):
71+ return self .ander_base .pop_from_worklist ()
72+
73+ def push_into_worklist (self , node_id ):
74+ assert isinstance (node_id , int ), "node_id is not a valid int object, the type of node_id is {}" .format (type (node_id ))
75+ self .ander_base .push_into_worklist (node_id )
76+
77+
78+ def add_copy_edge (self , src_id , dst_id ) -> bool :
79+ assert isinstance (src_id , int ), "src_id is not a valid int object, the type of src_id is {}" .format (type (src_id ))
80+ assert isinstance (dst_id , int ), "dst_id is not a valid int object, the type of dst_id is {}" .format (type (dst_id ))
81+ print (f"Adding copy edge from { src_id } to { dst_id } " )
82+ return self .consCG .add_copy_edge (src_id , dst_id )
83+
84+ def alias (self , node1 : int , node2 : int ) -> bool :
85+ assert isinstance (node1 , int ), "node1 is not a valid int object, the type of node1 is {}" .format (type (node1 ))
86+ assert isinstance (node2 , int ), "node2 is not a valid int object, the type of node2 is {}" .format (type (node2 ))
87+ return self .ander_base .alias (node1 , node2 )
88+
89+
90+ def analyze (self ):
91+ self .initialize ()
92+ self .init_worklist ()
93+ while 1 :
94+ reanalyze = False
95+ self .solve_worklist ()
96+ if self .update_call_graph ():
97+ reanalyze = True
98+ if reanalyze is False :
99+ break
100+ self .finalize ()
101+
102+
103+
104+
105+
106+ class ICFGTraversal :
107+ def __init__ (self , pag ):
108+ assert isinstance (pag , pysvf .SVFIR ), "pag is not a valid SVFIR object, the type of pag is {}" .format (type (pag ))
109+ self .pag = pag
110+ self .icfg = pag .get_icfg ()
111+ self .paths = set ()
112+ self .path = []
113+ self .visited = set ()
114+ self .callstack = []
115+ self .sources = set ()
116+ self .sinks = set ()
117+ self .source_names = set ()
118+ self .sink_names = set ()
119+ self .ander = AndersenPTA (pag )
120+
121+ '''
122+ /// TODO: Implement your code to parse the two lines to identify sources and sinks from `SrcSnk.txt` for your
123+ /// reachability analysis The format in SrcSnk.txt is in the form of
124+ /// line 1 for sources "{ api1 api2 api3 }"
125+ /// line 2 for sinks "{ api1 api2 api3 }"
126+ '''
127+ def read_srcsnk_from_file (self , filename ):
128+ pass
129+
130+ '''
131+ /// TODO: Implement your context-sensitive ICFG traversal here to traverse each program path
132+ /// by matching calls and returns while maintaining a `callstack`.
133+ /// Sources and sinks are identified by implementing and calling `readSrcSnkFromFile`
134+ /// Each path including loops, qualified by a `callstack`, should only be traversed once using a `visited` set.
135+ /// You will need to collect each path from src to snk and then add the path to the `paths` set.
136+ /// Add each path (a sequence of node IDs) as a string into std::set<std::string> paths
137+ /// in the format "START->1->2->4->5->END", where -> indicate an ICFGEdge connects two ICFGNode IDs
138+ '''
139+ def reachability (self , cur_node , sink ):
140+ assert isinstance (cur_node , pysvf .ICFGNode ), "cur_node is not a valid ICFGNode object, the type of cur_node is {}" .format (type (cur_node ))
141+ assert isinstance (sink , pysvf .ICFGNode ), "sink is not a valid ICFGNode object, the type of sink is {}" .format (type (sink ))
142+ pass
143+
144+ '''
145+ /// TODO: Checking aliases of the two variables at source and sink. For example:
146+ /// src instruction: actualRet = source();
147+ /// snk instruction: sink(actualParm,...);
148+ /// return true if actualRet is aliased with any parameter at the snk node (e.g., via ander->alias(..,..))
149+ '''
150+ def alias_check (self , src , snk ) -> bool :
151+ assert isinstance (src , pysvf .CallICFGNode ), "src is not a valid CallICFGNode object, the type of src is {}" .format (type (src ))
152+ assert isinstance (snk , pysvf .CallICFGNode ), "snk is not a valid CallICFGNode object, the type of snk is {}" .format (type (snk ))
153+ pass
154+
155+ #=========== Don't modify the code below in this class ============
156+
157+ def identify_sources (self ):
158+ for callsite in self .pag .get_call_sites ():
159+ fun = callsite .get_called_function ()
160+ if fun .get_name () in self .source_names :
161+ self .sources .add (callsite )
162+ return self .sources
163+
164+ def identify_sinks (self ):
165+ for callsite in self .pag .get_call_sites ():
166+ fun = callsite .get_called_function ()
167+ if fun .get_name () in self .sink_names :
168+ self .sinks .add (callsite )
169+ return self .sinks
170+
171+
172+
173+ '''
174+ // Start taint checking.
175+ // There is a tainted flow from p@source to q@sink
176+ // if (1) alias(p,q)==true and (2) source reaches sink on ICFG.
177+ '''
178+ def taint_checking (self ):
179+ self .read_srcsnk_from_file ("SrcSnk.txt" )
180+ self .ander .analyze ()
181+ for src in self .identify_sources ():
182+ for snk in self .identify_sinks ():
183+ if self .alias_check (src , snk ):
184+ self .reachability (src , snk )
185+
186+
187+
188+ def get_paths (self ):
189+ return self .paths
190+
191+
192+
193+ def check_icfg_case (module_name , result , expected ):
194+ assert len (result ) == len (expected ), f"Wrong paths generated - { module_name } failed!"
195+ for path in result :
196+ assert path in expected , f"Wrong paths generated - { module_name } failed!"
197+ print (f"Test case { module_name } passed!" )
198+
199+
200+ def test_icfg (module_name_vec ):
201+ pag = pysvf .get_pag (module_name_vec ) # Build Program Assignment Graph (SVFIR)
202+ icfg = pag .get_icfg () # Get ICFG
203+ gt = ICFGTraversal (pag ) # Create ICFG Traversal object
204+
205+ config_path = os .path .join (os .path .dirname (__file__ ), "./SrcSnk.txt" )
206+ gt .read_srcsnk_from_file (config_path )
207+
208+ for src in gt .identify_sources ():
209+ for snk in gt .identify_sinks ():
210+ gt .reachability (src , snk )
211+
212+ module_name = os .path .basename (module_name_vec )
213+ if module_name == "test1.ll" :
214+ expected = {"START->3->4->5->END" }
215+ check_icfg_case (module_name , gt .get_paths (), expected )
216+ elif module_name == "test2.ll" :
217+ expected = {
218+ "START->3->4->5->6->7->8->9->END" ,
219+ "START->3->4->5->6->7->END" ,
220+ "START->5->6->7->8->9->END" ,
221+ "START->5->6->7->END"
222+ }
223+ check_icfg_case (module_name , gt .get_paths (), expected )
224+ elif module_name == "test3.ll" :
225+ expected = {"START->6->7->8->1->5->2->9->10->END" }
226+ check_icfg_case (module_name , gt .get_paths (), expected )
227+ elif module_name == "test4.ll" :
228+ expected = {"START->12->13->14->3->8->9->1->7->2->10->11->4->15->16->END" }
229+ check_icfg_case (module_name , gt .get_paths (), expected )
230+ # Add further test cases as needed...
231+ elif module_name == "test5.ll" :
232+ expected = {
233+ "START->6->7->8->9->10->1->5->2->11->14->END" ,
234+ "START->6->7->8->9->12->1->5->2->13->16->END" ,
235+ }
236+ check_icfg_case (module_name , gt .get_paths (), expected )
237+ elif module_name == "test6.ll" :
238+ expected = {
239+ "START->12->13->14->15->16->3->8->9->1->7->2->10->11->4->17->20->END" ,
240+ "START->12->13->14->15->18->3->8->9->1->7->2->10->11->4->19->22->END" ,
241+ }
242+ check_icfg_case (module_name , gt .get_paths (), expected )
243+ elif module_name == "test7.ll" :
244+ expected = {"START->17->1->7->END" }
245+ check_icfg_case (module_name , gt .get_paths (), expected )
246+ elif module_name == "test8.ll" :
247+ expected = {
248+ "START->6->7->8->9->10->1->5->2->11->14->END" ,
249+ "START->6->7->8->9->12->1->5->2->13->16->END" ,
250+ }
251+ check_icfg_case (module_name , gt .get_paths (), expected )
252+ elif module_name == "test9.ll" :
253+ expected = {"START->7->8->9->10->11->14->END" }
254+ check_icfg_case (module_name , gt .get_paths (), expected )
255+ elif module_name == "test10.ll" :
256+ expected = {
257+ "START->3->4->5->6->7->9->11->END" ,
258+ "START->3->4->5->6->8->10->14->17->END" ,
259+ }
260+ check_icfg_case (module_name , gt .get_paths (), expected )
261+
262+ else :
263+ print (f"Test case { module_name } not found!" )
264+
265+
266+ def test_pta (module_name_vec ):
267+ pag = pysvf .get_pag (module_name_vec ) # Build Program Assignment Graph (SVFIR)
268+ andersen_pta = AndersenPTA (pag )
269+ andersen_pta .analyze () # Run Andersen pointer analysis
270+ del andersen_pta
271+
272+
273+ def test_taint (module_name_vec ):
274+ pag = pysvf .get_pag (module_name_vec ) # Build Program Assignment Graph (SVFIR)
275+
276+ taint = ICFGTraversal (pag )
277+ taint .taint_checking () # Perform taint analysis
278+
279+ module_name_vec = os .path .basename (module_name_vec )
280+ print (taint .get_paths ())
281+ if module_name_vec == "test1.ll" :
282+ expected = {"START->6->1->5->2->7->8->9->10->END" }
283+ assert taint .get_paths () == expected , " \n wrong paths generated - test1 failed !"
284+ print ("\n test1 passed !" )
285+ elif module_name_vec == "test4.ll" :
286+ expected = {"START->6->1->5->2->7->8->9->10->11->13->14->END" }
287+ assert taint .get_paths () == expected , " \n wrong paths generated - test4 failed !"
288+ print ("\n test2 passed !" )
289+ elif module_name_vec == "test2.ll" or module_name_vec == "test3.ll" :
290+ expected = set ()
291+ assert taint .get_paths () == expected , " \n wrong paths generated - test2 or test3 failed !"
292+ print ("\n test2 or test3 passed !" )
293+
294+
295+ print (f"###################### Tainted Information Flow ({ len (taint .get_paths ())} found) ######################" )
296+ print ("---------------------------------------------" )
297+ for path in taint .get_paths ():
298+ origin_path = path
299+ prefix = "START->"
300+ suffix = "->END"
301+
302+ if path .startswith (prefix ):
303+ path = path [len (prefix ):]
304+ if path .endswith (suffix ):
305+ path = path [:- len (suffix )]
306+
307+ tokens = path .split ("->" )
308+ src_id = int (tokens [0 ])
309+ dst_id = int (tokens [- 1 ])
310+ src_node = pag .get_icfg ().get_gnode (src_id )
311+ dst_node = pag .get_icfg ().get_gnode (dst_id )
312+
313+ print (
314+ f"{ origin_path } \n Source: { src_node .to_string ()} \n Sink: { dst_node .to_string ()} \n ---------------------------------------------" )
315+
316+ if not taint .get_paths ():
317+ print ("No tainted information flow found" )
318+
319+
320+ def main ():
321+ pta_enabled = False
322+ taint_enabled = False
323+ icfg_enabled = False
324+ module_name_vec = ""
325+
326+ args = sys .argv [1 :]
327+
328+ for arg in args :
329+ if arg == "-pta" :
330+ pta_enabled = True
331+ elif arg == "-taint" :
332+ taint_enabled = True
333+ elif arg == "-icfg" :
334+ icfg_enabled = True
335+ else :
336+ module_name_vec = arg
337+
338+ # Default to taint analysis if none specified
339+ if not (pta_enabled or taint_enabled or icfg_enabled ):
340+ assert False , "No analysis specified. Please specify -pta, -taint, or -icfg."
341+
342+ assert (pta_enabled + taint_enabled + icfg_enabled ) == 1 , "Only one analysis can be enabled."
343+
344+ if module_name_vec == "" :
345+ assert False , "No module specified. Please specify a module to analyze."
346+
347+ if pta_enabled :
348+ test_pta (module_name_vec )
349+ elif taint_enabled :
350+ test_taint (module_name_vec )
351+ elif icfg_enabled :
352+ test_icfg (module_name_vec )
353+
354+
355+ if __name__ == "__main__" :
356+ main ()
0 commit comments