diff --git a/cfg_generator.py b/cfg_generator.py index d36514c..848cc23 100644 --- a/cfg_generator.py +++ b/cfg_generator.py @@ -1,42 +1,126 @@ from models.inner_instruction import * from models.cfg import * +def generate_cfg_block(block, info_provider, class_name, method_name, recursive=False): -def generate_cfg(method, info_provider, recursive=False): - print('Current generate CFG of method: %s in class: %s' % (method.method_name, method.class_name)) + # cfg_blocks = [] + cfg_block = CFGBlock(block.identify) - cfg_name = method.class_name + ': ' + method.method_name - cfg = CFG(cfg_name) - for i in range(len(method.instructions)): - instruction = method.instructions[i] - if instruction.goto_insns: + # wait_for_follow = [] + + for i in range((len(block.instructions))): + instruction = block.instructions[i] + # if cfg_block is None: + # cfg_block = CFGBlock(hex(instruction.address)) + # if len(wait_for_follow) > 0: + # for fol_block in wait_for_follow: + # fol_block.goto_block(cfg_block) + # wait_for_follow = [] + + if instruction.goto_insns: # 如果有调用函数 basic_info, imp_name = instruction.goto_insns if basic_info == '$Function': # function if filter_oc_function(imp_name): continue - if recursive: + if recursive and basic_info != class_name and method_name != method_name: # 如果需要更进一步解析(防止递归) recursive_function = info_provider(basic_info, imp_name) if recursive_function is not None: recursive_cfg = generate_cfg(recursive_function, info_provider, True) - for recursive_cfg_node in recursive_cfg.nodes: - cfg.add_node(recursive_cfg_node) + cfg_block.add_node(recursive_cfg) + # cfg_block.goto_block(recursive_cfg.entry) # 该块进入调用的函数 + # cfg_blocks.append(cfg_block) + + # for rec_block in recursive_cfg.all_blocks: # 将该函数的块都加到当前 CFG 中 + # cfg_blocks.append(rec_block) + # if rec_block.out: + # wait_for_follow.append(rec_block) + + # cfg_block = None + # cfg_block.add_node(recursive_cfg) + # cfg_blocks.append(cfg_block) continue cfg_node = CFGNode(CFGNodeTypeFunction) cfg_node.function_name = imp_name - else: # method + else: if recursive: recursive_method = info_provider(basic_info, imp_name) if recursive_method is not None: recursive_cfg = generate_cfg(recursive_method, info_provider, True) - for recursive_cfg_node in recursive_cfg.nodes: - cfg.add_node(recursive_cfg_node) + cfg_block.add_node(recursive_cfg) continue cfg_node = CFGNode(CFGNodeTypeMethod) cfg_node.class_name = basic_info cfg_node.method_name = imp_name - cfg.add_node(cfg_node) + cfg_block.add_node(cfg_node) + + # if cfg_block is not None: + # cfg_blocks.append(cfg_block) + + return cfg_block + + +def generate_cfg(method, info_provider, recursive=False): + print('Current generate CFG of method: %s in class: %s' % (method.method_name, method.class_name)) + + wait_blocks_queue = [] + + cfg_name = method.class_name + ': ' + method.method_name + cfg = CFG(cfg_name) + + wait_blocks_queue.append(method.entry_block) + + while len(wait_blocks_queue) > 0: + block = wait_blocks_queue[0] + wait_blocks_queue = wait_blocks_queue[1:] + + cfg_block = generate_cfg_block(block, info_provider, method.class_name, + method.method_name, recursive) + cfg.add_block(cfg_block) + if cfg.entry is None: + cfg.entry = cfg_block + + if not block.is_return: # 含有 ret 的基本块应该肯定不会有 followed 的块 + if block.jump_to_block is not None and block.jump_to_block in method.all_blocks: + cfg_block.goto_block(block.jump_to_block) + if cfg.get_block(block.jump_to_block) is None: + wait_blocks_queue.append(method.all_blocks[block.jump_to_block]) + + if block.jump_condition and block.next_block is not None: + cfg_block.goto_block(block.next_block) + if cfg.get_block(block.next_block) is None: + wait_blocks_queue.append(method.all_blocks[block.next_block]) return cfg + # for i in range(len(method.instructions)): + # instruction = method.instructions[i] + # if instruction.goto_insns: + # basic_info, imp_name = instruction.goto_insns + # if basic_info == '$Function': # function + # if filter_oc_function(imp_name): + # continue + # if recursive: + # recursive_function = info_provider(basic_info, imp_name) + # if recursive_function is not None: + # recursive_cfg = generate_cfg(recursive_function, info_provider, True) + # for recursive_cfg_node in recursive_cfg.nodes: + # cfg.add_node(recursive_cfg_node) + # continue + # cfg_node = CFGNode(CFGNodeTypeFunction) + # cfg_node.function_name = imp_name + # else: # method + # if recursive: + # recursive_method = info_provider(basic_info, imp_name) + # if recursive_method is not None: + # recursive_cfg = generate_cfg(recursive_method, info_provider, True) + # for recursive_cfg_node in recursive_cfg.nodes: + # cfg.add_node(recursive_cfg_node) + # continue + # cfg_node = CFGNode(CFGNodeTypeMethod) + # cfg_node.class_name = basic_info + # cfg_node.method_name = imp_name + # cfg.add_node(cfg_node) + # return cfg + def filter_oc_function(function): if function.startswith('_objc_'): diff --git a/interpreters/inner_Interpreter.py b/interpreters/inner_Interpreter.py index cd67b25..5b65807 100644 --- a/interpreters/inner_Interpreter.py +++ b/interpreters/inner_Interpreter.py @@ -4,7 +4,8 @@ from capstone.arm64 import * SELF_POINTER = -0x1000000 -SUPER_POINTER = -0x2000000 +CURRENT_SELECTOR = -0x2000000 + class Register: @@ -50,19 +51,30 @@ def clear(self): class Interpreter: - def __init__(self, memory_provider=None, handle_strange_add=None): + def __init__(self, memory_provider=None, handle_strange_add=None, parameters=[]): self.gen_regs = [Register(i) for i in range(31)] self.float_regs = [FloatRegister(i) for i in range(32)] - self.gen_regs[0].value = SELF_POINTER + self.wzr = Register(-1) self.xzr = Register(-1) self.wsp = Register(-1) self.sp = Register(-1) self.pc = Register(-1) - self.memory = {hex(0-0x30): SUPER_POINTER} + self.memory = {} + # self.memory = {hex(0-0x30): SUPER_POINTER} # 父指针好像在 self.memory_provider = memory_provider self.handle_strange_add = handle_strange_add + if len(parameters) <= 4: + for i in range(len(parameters)): + self.gen_regs[i].value = parameters[i] + else: + for i in range(4): + self.gen_regs[i].value = parameters[i] + # 超过 4 个参数再说 + # for i in range(len(parameters) - 4): + # self.memory[hex()] + # Jump related self.compare_flag = 0 # 0 is equal and -1 is small and 1 is bigger self.should_jump = False @@ -94,7 +106,7 @@ def clear_regs(self): self.wsp.clear() self.sp.clear() self.pc.clear() - self.memory = {hex(0-0x30):SUPER_POINTER} + # self.memory = {hex(0-0x30):SUPER_POINTER} def interpret_code(self, codes, begin=0, end=-1): i = begin @@ -264,7 +276,7 @@ def handle_add(self, insn): if insn.operands[1].type == ARM64_OP_REG and insn.operands[2].type == ARM64_OP_REG: reg_name = insn.reg_name(insn.operands[1].reg) register = self.get_register(reg_name) - if register.value < 0: + if register.value < 0: # 在取 ivar 的时候,会遇到这种问题,因为现在对于 SELF 指针的定义为一个负数的常量 reg_name_2 = insn.reg_name(insn.operands[2].reg) register_2 = self.get_register(reg_name_2) dest = insn.operands[0] diff --git a/models/cfg.py b/models/cfg.py index 4e71e25..e26c178 100644 --- a/models/cfg.py +++ b/models/cfg.py @@ -1,41 +1,125 @@ +from graphviz import Digraph, Graph + CFGNodeTypeFunction = 0 CFGNodeTypeMethod = 1 class CFG: - def __init__(self, name=''): + def __init__(self, name='', entry=None): + self.name = name + self.entry = entry + self.outs = [] + self.all_blocks = [] + + def add_block(self, block): + self.all_blocks.append(block) + if block.out: + self.outs.append(block) + + def get_block(self, name): + for block in self.all_blocks: + if block.name == name: + return block + return None + # if the data flow between the no.0 node and no.1 node + # the index should be 0 + # def modify_data_flow(self, index, data_flow): + # node_count = len(self.nodes) + # if node_count - 1 > index: + # if index < len(self.data_flows): + # self.data_flows[index] = data_flow + # else: + # for i in range(index): + # self.data_flows.append(None) + # self.data_flows.append(data_flow) + # else: + # print('Data flow index error!') + + def describe(self): + for block in self.all_blocks: + block.describe() + + def graphviz_obj(self): + + cfg_view = Digraph(self.name) + for block in self.all_blocks: + for node in block.nodes: + if type(node) == CFGNode: + node_name = block.name + str(block.nodes.index(node)) + node_label = node.describe(False) + cfg_view.node(node_name, node_label) + elif type(node) == CFG: + pass + + return cfg_view + + # block_nodes = {} + # cfg_view = Digraph(self.name) + # # cfg_view.node('entry', 'entry') + # + # for block in self.all_blocks: + # block_view = Digraph('cluster' + block.name) + # if len(block.nodes) == 0: + # node_name = 'node' + block.name + # block_view.node(node_name, '', shape='plaintext') + # block_nodes[block.name] = (node_name, node_name) + # else: + # start_name = None + # end_name = None + # before_name = None + # for node in block.nodes: + # if type(node) == CFG: + # pass + # # recursive_cfg = node.graphviz_obj() + # # block_view.subgraph(recursive_cfg) + # + # elif type(node) == CFGNode: + # node_name = block.name + str(block.nodes.index(node)) + # node_label = node.describe(False) + # block_view.node(node_name, node_label, shape='box') + # if before_name is not None: + # block_view.edge(before_name, node_name) + # before_name = node_name + # if start_name is None: + # start_name = node_name + # if block == self.all_blocks[-1]: + # end_name = end_name + # block_nodes[block.name] = (start_name, end_name) + # + # cfg_view.subgraph(block_view) + # + # # cfg_view.edge('entry', 'cluster' + self.entry.name) + # + # for block in self.all_blocks: + # _, first_name = block_nodes[block.name] + # for follow in block.follow_blocks: + # second_name, _ = block_nodes[follow] + # cfg_view.edge(first_name, second_name) + # return cfg_view + + def view(self): + self.graphviz_obj().view() + +class CFGBlock: + + def __init__(self, name): self.name = name - self.nodes = [] - self.data_flows = [] + self.out = False # if this block contains `ret` instruction + self.nodes = [] # node 包括 node 或者 cfg + self.follow_blocks = [] # the blocks follow this block (name) def add_node(self, node): self.nodes.append(node) - # if the data flow between the no.0 node and no.1 node - # the index should be 0 - def modify_data_flow(self, index, data_flow): - node_count = len(self.nodes) - if node_count - 1 > index: - if index < len(self.data_flows): - self.data_flows[index] = data_flow - else: - for i in range(index): - self.data_flows.append(None) - self.data_flows.append(data_flow) - else: - print('Data flow index error!') + def goto_block(self, block): + self.follow_blocks.append(block) def describe(self): - for i in range(1, len(self.nodes) - 1): + for i in range(len(self.nodes) - 1): self.nodes[i].describe() - if (i - 1 < len(self.data_flows) and - self.data_flows[i - 1] is not None and - not self.data_flows[i - 1].isEmpty): - self.data_flows[i - 1].describe() - else: - print('||') - print('\/') + print('||') + print('\/') if len(self.nodes) > 0: self.nodes[-1].describe() @@ -54,11 +138,14 @@ def node_info(self): else: return self.class_name, self.method_name - def describe(self): + def describe(self, verbose=True): if self.type == CFGNodeTypeFunction: print('<%s>' % self.function_name) + describe = '<' + self.function_name + '>' else: print('<%s: %s>' % (self.class_name, self.method_name)) + describe = '<' + self.class_name + ': ' + self.method_name + '>' + return describe class CFGDataFlow: diff --git a/models/class_storage.py b/models/class_storage.py index 25bb681..7f1a65b 100644 --- a/models/class_storage.py +++ b/models/class_storage.py @@ -46,6 +46,18 @@ def __init__(self, name): self.arguments_type = [] # empty means no argument +BlockMethodTypeStack = 0 +BlockMethodTypeGlobal = 1 +BlockMethodTypeMalloc = 2 + + +class BlockMethodData: + + def __init__(self, type=BlockMethodTypeStack): + self.type = type + self.invoke = 0 + + MethodDataTypeClass = 0 MethodDataTypeInstance = 1 diff --git a/models/inner_instruction.py b/models/inner_instruction.py index 36ec85c..ea1ff3a 100644 --- a/models/inner_instruction.py +++ b/models/inner_instruction.py @@ -20,23 +20,67 @@ def list_all(cls): print(_method_insn.class_name + ": " + _method_insn.method_name) -class MethodInstructions: +class MethodBasicBlockStorage: - def __init__(self, class_name, method_name): - self.class_name = class_name - self.method_name = method_name + _method_basic_block_instructions = [] + + @classmethod + def insert_instructions(cls, instruction): + cls._method_basic_block_instructions.append(instruction) + + @classmethod + def get_instructions(cls, identify): + for _basic_block in cls._method_basic_block_instructions: + if _basic_block.identify == identify: + return _basic_block + return None + + +class MethodBasicBlockInstructions: + + def __init__(self, identify): + self.identify = identify self.instructions = [] + self.jump_to_block = None # identify + self.jump_condition = True + + self.is_return = False + self.next_block = None # identify + def insert_instruction(self, instruction): self.instructions.append(instruction) def describe(self): - for ins in self.instructions: + for index in range(len(self.instructions)): + ins = self.instructions[index] ins_str = ins.instruction if ins.goto_insns: class_name, method_name = ins.goto_insns ins_str += (" (" + class_name + ": " + method_name + ")") - print(ins_str) + if index == len(self.instructions) - 1 and self.jump_to_block is not None: + print(ins_str + " {Jump to ==> %s}" % self.jump_to_block) + else: + print(ins_str) + + +class MethodInstructions: + + def __init__(self, class_name, method_name): + self.class_name = class_name + self.method_name = method_name + self.entry_block = None + self.all_blocks = {} # + + def describe(self): + print("<%s: %s>" % (self.class_name, self.method_name)) + current_block = self.entry_block + while current_block: + current_block.describe() + if current_block.next_block is not None: + current_block = self.all_blocks[current_block.next_block] + else: + break class Instruction: @@ -44,6 +88,10 @@ class Instruction: def __init__(self, instruction): self.instruction = instruction self.goto_insns = None + self.block_data = None def goto(self, class_name, method_name): self.goto_insns = (class_name, method_name) + + def block_callback(self, block_data): + self.block_data = block_data diff --git a/models/mach_o/fat.py b/models/mach_o/fat.py index bfb68be..96010b8 100644 --- a/models/mach_o/fat.py +++ b/models/mach_o/fat.py @@ -29,7 +29,6 @@ def __init__(self): @classmethod def parse_from_bytes(cls, _bytes): - print(_bytes) fh = cls() fh.magic = parse_int(_bytes[0:4]) fh.nfat_arch = parse_int(_bytes[4:8], False) diff --git a/models/mach_object.py b/models/mach_object.py index 292c966..767340c 100644 --- a/models/mach_object.py +++ b/models/mach_object.py @@ -7,6 +7,7 @@ from models.class_storage import * SELF_POINTER = -0x1000000 +CURRENT_SELECTOR = -0x2000000 RETURN_VALUE = -0x3000000 return_code_with_type = { @@ -27,7 +28,7 @@ def __init__(self, _bytes): header = self.aple_header() self.nfat_arch = header.nfat_arch - for i in range(0, self.nfat_arch): + for i in range(1, self.nfat_arch): # 现在只调试 64-bit 的 Mach-O 信息 arch = self.aple_arch(i) mach_bytes = self.bytes[arch.offset:arch.offset + arch.size] mach_object = MachObject(mach_bytes, arch.offset) @@ -80,15 +81,20 @@ def __init__(self, _bytes, _offset=0x0): # self.properties = {} # self.methods 中的方法均为开发人员实现的方法,包括类中的方法和分类中的方法 - self.methods = {} # impaddr: (class, method) + self.methods = {} # impaddr: (class, method) / impaddr: (block, block) self.methods_type = [] # < method_data > self.class_datas = {} # data_address: < name, super_name, methods > self.cat_datas = {} # data_address: < name, class_name, methods > + # 解析 Block + self.block_methods = {} # data_address: + self.cfstrings = {} self.parse_dylib_class() # print(self.dylibs['0x10207e840']) + for key in self.dylibs: + print(key, self.symbols[hex(self.dylibs[key])]) self.parse_symtab() # 修改成兼容 32-bit 和 64-bit self.parse_methname() @@ -96,7 +102,9 @@ def __init__(self, _bytes, _offset=0x0): self.parse_cstring() self.parse_methtype() - # 兼容 32-bit 和 64-bit + self.parse_block() # 解析 Block 需要依赖 dylib + + # 兼容 32-bit 和 64-bit if self.is_64_bit: self.parse_functions64() else: @@ -206,6 +214,30 @@ def get_return_type_from_function(self, name): # self.ivar_refs[hex(ivar_ref)] = len(self.ivar_list) - 1 # count += 1 + def parse_block(self): + + block_class_names = ['__NSConcreteStackBlock', '__NSConcreteGlobalBlock', '__NSConcreteMallocBlock'] + for dylib_addr in self.dylibs: + dylib_name = self.symbols[hex(self.dylibs[dylib_addr])] + if dylib_name in block_class_names: + type = block_class_names.index(dylib_name) + block_address = int(dylib_addr, 16) + block_address_start = block_address - (self.offset if not self.is_64_bit else 0x100000000) + block_address_end = block_address_start + (ObjcBlock.OB_TOTAL_SIZE if not self.is_64_bit else + ObjcBlock64.OB_TOTAL_SIZE) + block_bytes = self.bytes[block_address_start:block_address_end] + if self.is_64_bit: + ob = ObjcBlock64.parse_from_bytes(block_bytes) + else: + ob = ObjcBlock.parse_from_bytes(block_bytes) + + block_data = BlockMethodData(type) + block_data.invoke = ob.invoke + + self.block_methods[dylib_addr] = block_data + self.methods[hex(block_data.invoke)] = '$Block', dylib_addr + print(self.methods[hex(block_data.invoke)]) + def parse_cfstring(self): cfstring, _ = self._sections["cfstring"] base_address = cfstring.addr @@ -444,8 +476,6 @@ def parse_cat_methods_and_data(self): else: objc_property = ObjcProperty.parse_from_bytes(op_bytes) - print(hex(objc_property.name)) - print(hex(objc_property.attributes)) property_name = self.symbols[hex(objc_property.name)] # print(class_name) # print(property_name) diff --git a/models/objc_runtime.py b/models/objc_runtime.py index 73d93d4..4da25e6 100644 --- a/models/objc_runtime.py +++ b/models/objc_runtime.py @@ -507,3 +507,63 @@ def parse_from_bytes(cls, _bytes): def get_size(self): return ObjcIvar64.OI_TOTAL_SIZE + + +class ObjcBlock(ObjcBase): + + OB_TOTAL_SIZE = 20 + OB_ISA_RANGE = (0, 4) + OB_FLAGS_RANGE = (4, 4) + OB_RESERVED_RANGE = (8, 4) + OB_INVOKE_RANGE = (12, 4) + OB_DESCRIPTOR_RANGE = (16, 4) + + def __init__(self): + self.isa = 0 + self.flags = 0 + self.reserved = 0 + self.invoke = 0 + self.descriptor = 0 + + @classmethod + def parse_from_bytes(cls, _bytes): + ob = cls() + ob.isa = parse_int(_bytes[0:4]) + ob.flags = parse_int(_bytes[4:8]) + ob.reserved = parse_int(_bytes[8:12]) + ob.invoke = parse_int(_bytes[12:16]) + ob.descriptor = parse_int(_bytes[16:20]) + return ob + + def get_size(self): + return ObjcBlock.OB_TOTAL_SIZE + + +class ObjcBlock64(ObjcBase): + + OB_TOTAL_SIZE = 32 + OB_ISA_RANGE = (0, 8) + OB_FLAGS_RANGE = (8, 4) + OB_RESERVED_RANGE = (12, 4) + OB_INVOKE_RANGE = (16, 8) + OB_DESCRIPTOR_RANGE = (24, 8) + + def __init__(self): + self.isa = 0 + self.flags = 0 + self.reserved = 0 + self.invoke = 0 + self.descriptor = 0 + + @classmethod + def parse_from_bytes(cls, _bytes): + ob = cls() + ob.isa = parse_int(_bytes[0:8]) + ob.flags = parse_int(_bytes[8:12]) + ob.reserved = parse_int(_bytes[12:16]) + ob.invoke = parse_int(_bytes[16:24]) + ob.descriptor = parse_int(_bytes[24:32]) + return ob + + def get_size(self): + return ObjcBlock64.OB_TOTAL_SIZE diff --git a/static_analysis.py b/static_analysis.py index 756f5c2..d332cc2 100644 --- a/static_analysis.py +++ b/static_analysis.py @@ -139,34 +139,109 @@ def _slice_by_function_for_arm64(arch, mode, machine_code, base_addr, slice_addr return all_functions -def _analyse_method(method, mach_info): +def _slice_basic_block(method): - def memory_provider(address): - try: - return mach_info.get_memory_content(address, 8) - except Exception as _: - return 0 + slice_address = [] + current_slice_address = method[0].address - inter = Interpreter(memory_provider) - if hex(method[0].address) not in mach_info.methods: # pass the functions - return None - - class_name, method_name = mach_info.methods[hex(method[0].address)] - print('Current analyse <%s: %s>' % (class_name, method_name)) - - class_data = None - for data in mach_info.class_datas.values(): - if data.name == class_name: - class_data = data - instruction_block = MethodInstructions(class_name, method_name) + slice_address.append(hex(current_slice_address)) for i in range(len(method)): - inter.interpret_code(method, begin=i, end=i+1) cs_insn = method[i] - if cs_insn.address == 0x1000079d0: - inter.current_state() - # print(cs_insn.mnemonic) - # print(len(class_data.ivars)) + if (cs_insn.id == ARM64_INS_B or + cs_insn.id == ARM64_INS_CBZ or + cs_insn.id == ARM64_INS_CBNZ or + cs_insn.id == ARM64_INS_TBZ or + cs_insn.id == ARM64_INS_TBNZ): + address_op = cs_insn.operands[-1] + if address_op.type == ARM64_OP_IMM: + j_address = address_op.imm + + if method[0].address <= j_address <= method[-1].address: + slice_address.append(hex(j_address)) + if i < len(method) - 1: + slice_address.append(hex(method[i + 1].address)) + + slice_address = list({}.fromkeys(slice_address).keys()) + + basic_blocks = [] + current_basic_block = [] + # current_basic_block_address = None + + for cs_insn in method: + if hex(cs_insn.address) in slice_address: + if len(current_basic_block) != 0: + # basic_blocks[hex(current_basic_block_address)] = current_basic_block + basic_blocks.append(current_basic_block) + + current_basic_block = [] + # current_basic_block_address = cs_insn.address + current_basic_block.append(cs_insn) + if len(current_basic_block) != 0: + basic_blocks.append(current_basic_block) + # basic_blocks[hex(current_basic_block_address)] = current_basic_block + return basic_blocks + + +def _contain_return_of_block(block): + for insn in block: + if insn.id == ARM64_INS_RET: + return True + return False + + +def _analyse_reachable_of_basic_blocks(basic_blocks): + reachable = [] + wait_to_analyse = [] + + reachable.append(basic_blocks[0][0].address) + wait_to_analyse.append(basic_blocks[0]) + + while len(wait_to_analyse) > 0: + # pop + block = wait_to_analyse[0] + wait_to_analyse = wait_to_analyse[1:] + + if _contain_return_of_block(block): # 如果这个 block 包含 return 语句,就直接不分析了 + continue + + last_id = block[-1].id + if last_id != ARM64_INS_B or (last_id == ARM64_INS_B and len(block[-1].mnemonic) > 1): + block_index = basic_blocks.index(block) + if block_index < len(basic_blocks) - 1: + next_block = basic_blocks[block_index + 1] + reachable.append(next_block[0].address) + wait_to_analyse.append(next_block) + + if (last_id == ARM64_INS_B or + last_id == ARM64_INS_CBZ or + last_id == ARM64_INS_CBNZ or + last_id == ARM64_INS_TBZ or + last_id == ARM64_INS_TBNZ): + address_op = block[-1].operands[-1] + if address_op.type == ARM64_OP_IMM: + j_address = address_op.imm + if (basic_blocks[0][0].address <= j_address <= basic_blocks[-1][-1].address and + j_address not in reachable): + reachable.append(j_address) + # 找那个 block + for next_block in basic_blocks: + if next_block[0].address == j_address: + wait_to_analyse.append(next_block) + break + return reachable + + +def _analyse_basic_block(block_instruction, identify, mach_info, class_data, class_name, method_name, inter): + basic_block = MethodBasicBlockInstructions(identify) + for i in range(len(block_instruction)): + inter.interpret_code(block_instruction, begin=i, end=i+1) + cs_insn = block_instruction[i] + # if cs_insn.address == 0x100007ed0: + # print(cs_insn.mnemonic, cs_insn.op_str) # inter.current_state() + # print(cs_insn.reg_name(cs_insn.operands[1].value.mem.base), + # cs_insn.reg_name(cs_insn.operands[1].value.mem.index), + # cs_insn.operands[1].value.mem.disp) insn_str = hex(cs_insn.address) + '\t' + cs_insn.bytes.hex() + '\t' + cs_insn.mnemonic + '\t' + cs_insn.op_str instruction = Instruction(insn_str) if cs_insn.id == ARM64_INS_BL or cs_insn.id == ARM64_INS_BLR: @@ -179,6 +254,9 @@ def memory_provider(address): function_name = mach_info.symbols[hex(_function)] if function_name == "_objc_msgSendSuper2": instruction.goto(class_data.super, method_name) + elif function_name == "_dispatch_once": # 实际上就可以把这个指令换成 Block 内部的指令 + instruction.goto('$Function', function_name) + instruction.block_callback(inter.gen_regs[1].value) elif function_name == "_objc_msgSend": reg0_value = inter.gen_regs[0].value reg1_value = inter.gen_regs[1].value @@ -186,7 +264,9 @@ def memory_provider(address): obj_name = class_name elif reg0_value <= RETURN_VALUE: obj_name = _g_return_types[RETURN_VALUE - reg0_value] - # if cs_insn.address == 0x10000a60c: + elif reg0_value < SELF_POINTER: + obj_name = "PARAMETERS_" + str(SELF_POINTER - reg0_value - 1) + # if cs_insn.address == 0x100007e8c: # print(hex(reg0_value)) # print(obj_name) # return value @@ -223,13 +303,15 @@ def memory_provider(address): obj_name = mach_info.symbols[hex(static_name)] except Exception as e: print("Some error happens during analysis in get value in register 0 (Instance)") + print(str(e)) print("Current instruction address is %s" % hex(cs_insn.address)) - break + obj_name = 'id' try: meth_name = mach_info.symbols[hex(reg1_value)] except Exception as e: print("Some error happens during analysis in get value in register 1 (Method)") + print(str(e)) print("Current instruction address is %s" % hex(cs_insn.address)) break @@ -251,13 +333,99 @@ def memory_provider(address): if not return_type == 'void': _g_return_types.append(return_type) inter.modify_regs('0', RETURN_VALUE - (len(_g_return_types) - 1)) - instruction_block.insert_instruction(instruction) - return instruction_block + elif cs_insn.id == ARM64_INS_B: + address_op = cs_insn.operands[-1] + if len(cs_insn.mnemonic) == 1: + basic_block.jump_condition = False + else: + basic_block.jump_condition = True + if address_op.type == ARM64_OP_IMM: + basic_block.jump_to_block = hex(address_op.imm) + elif cs_insn.id == ARM64_INS_CBZ or cs_insn.id == ARM64_INS_CBNZ or cs_insn.id == ARM64_INS_TBZ or cs_insn.id == ARM64_INS_TBNZ: + address_op = cs_insn.operands[-1] + basic_block.jump_condition = True + if address_op.type == ARM64_OP_IMM: + basic_block.jump_to_block = hex(address_op.imm) + elif cs_insn.id == ARM64_INS_RET: + basic_block.insert_instruction(instruction) + basic_block.is_return = True + return basic_block + basic_block.insert_instruction(instruction) + return basic_block + + +def _analyse_method(method, mach_info): + + def memory_provider(address): + try: + return mach_info.get_memory_content(address, 8) + except Exception as _: + return 0 + + if hex(method[0].address) not in mach_info.methods: # pass the functions + return None + class_name, method_name = mach_info.methods[hex(method[0].address)] + parameters = [SELF_POINTER, CURRENT_SELECTOR] + parameters_count = method_name.count(':') # OC 的方法通过统计冒号个数来获得参数个数 + for p in range(parameters_count): + parameters.append(SELF_POINTER - p - 1) + inter = Interpreter(memory_provider, parameters=parameters) + + print('Current analyse <%s: %s>' % (class_name, method_name)) + + class_data = None + for data in mach_info.class_datas.values(): + if data.name == class_name: + class_data = data + method_instructions = MethodInstructions(class_name, method_name) + # last_address = method[-1].address + + # 拆分成基本块 + basic_blocks_instructions = _slice_basic_block(method) + # 判断可达的块 + reachable_blocks_queue = _analyse_reachable_of_basic_blocks(basic_blocks_instructions) + + # def convert_to_hex(i): + # return hex(i) + # print(list(map(convert_to_hex, reachable_blocks_queue))) + + for block_instructions in basic_blocks_instructions: + if block_instructions[0].address in reachable_blocks_queue: # if this block can be reached + block = _analyse_basic_block(block_instructions, hex(block_instructions[0].address), mach_info, class_data, + class_name, method_name, inter) + + method_instructions.all_blocks[block.identify] = block + MethodBasicBlockStorage.insert_instructions(block) + + # 如果挨近的下一个块是可到达的,则添加下一个块 + current_index = basic_blocks_instructions.index(block_instructions) + while current_index < len(basic_blocks_instructions) - 1: + next_block_instructions = basic_blocks_instructions[current_index + 1] + next_block_address = next_block_instructions[0].address + if next_block_address in reachable_blocks_queue: + block.next_block = hex(next_block_address) # set next block + break + current_index += 1 + + # 如果当前块 return 了,则结束 + if block.is_return: + if method_instructions.entry_block is None: + method_instructions.entry_block = block + continue + + # 如果入口块是空的,添加入口块 + if method_instructions.entry_block is None: + method_instructions.entry_block = block + + return method_instructions + def static_analysis(binary_file): mach_o_file = open(binary_file, 'rb') mach_container = MachContainer(mach_o_file.read()) for mach_info in mach_container.mach_objects: + # print(mach_info.methods) + # print(mach_info.methods_type[0]) arch = CS_ARCH_ALL mode = CS_MODE_32 if mach_info.cpu_type == CPU_TYPE_ARM: @@ -273,147 +441,33 @@ def static_analysis(binary_file): slice_addresses = list(mach_info.methods.keys()) slice_addresses += list(mach_info.functions.keys()) - # address = mach_info.get_method_address('PDDCrashManager', 'setup') - # method = _disasm_specified_function(arch, mode, mach_info.text, int(address, 16), mach_info.text_addr, slice_addresses) - - # instruction = _analyse_method(method, mach_info) - - # print("Begin Decompiling...") - # methods = _slice_by_function_for_arm64(arch, mode, mach_info.text, mach_info.text_addr, slice_addresses) - # for method in methods: - # if hex(method[0].address) in mach_info.methods: - # class_name, method_name = mach_info.methods[hex(method[0].address)] - # print(class_name + ": " + method_name) - # - # print("Decompile Complete!") -# print("Begin Analysing all methods ...") -# methods_instructions = [] -# for method in methods: -# def memory_provider(address): -# try: -# return mach_info.get_memory_content(address, 8) -# except Exception as _: -# return 0 -# inter = Interpreter(memory_provider) -# if hex(method[0].address) not in mach_info.methods: # pass the functions -# continue -# class_name, method_name = mach_info.methods[hex(method[0].address)] -# -# print('Current analyse <%s: %s>' % (class_name, method_name)) -# -# class_data = None -# for data in mach_info.class_datas.values(): -# if data.name == class_name: -# class_data = data -# instruction_block = MethodInstructions(class_name, method_name) -# for i in range(len(method)): -# inter.interpret_code(method, begin=i, end=i+1) -# cs_insn = method[i] -# # if cs_insn.address == 0x100020a68: -# # print(len(class_data.ivars)) -# # inter.current_state() -# insn_str = hex(cs_insn.address) + '\t' + cs_insn.bytes.hex() + '\t' + cs_insn.mnemonic + '\t' + cs_insn.op_str -# instruction = Instruction(insn_str) -# if cs_insn.id == ARM64_INS_BL or cs_insn.id == ARM64_INS_BLR: -# operand = cs_insn.operands[0] -# if operand.type == ARM64_OP_IMM: -# try: -# _function = mach_info.functions[hex(operand.imm)] -# except Exception as e: -# continue -# function_name = mach_info.symbols[hex(_function)] -# if function_name == "_objc_msgSendSuper2": -# instruction.goto(class_data.super, method_name) -# elif function_name == "_objc_msgSend": -# reg0_value = inter.gen_regs[0].value -# reg1_value = inter.gen_regs[1].value -# if reg0_value == SELF_POINTER: -# obj_name = class_name -# elif reg0_value <= RETURN_VALUE: -# obj_name = _g_return_types[RETURN_VALUE - reg0_value] -# # if cs_insn.address == 0x10000a60c: -# # print(hex(reg0_value)) -# # print(obj_name) -# # return value -# elif reg0_value < 0: -# obj_name = class_data.super -# else: -# obj_name_key = hex(reg0_value) -# if obj_name_key in mach_info.symbols: # Outter classes -# obj_name = mach_info.symbols[obj_name_key] -# obj_name_index = obj_name.find('$') -# obj_name = obj_name[obj_name_index + 2:] -# elif obj_name_key in mach_info.class_datas: # Inner classes -# obj_data = mach_info.class_datas[obj_name_key] -# obj_name = obj_data.name -# else: -# if class_data != None and hex(reg0_value) in mach_info.ivar_refs: -# ivar = class_data.ivars[mach_info.ivar_refs[hex(reg0_value)]] -# obj_name = class_name + "->" + ivar.name -# elif class_data != None and reg0_value < len(class_data.ivars): # guess ivars -# # print(method_name) -# # print('ivars: ' + hex(reg0_value)) -# ivar = class_data.ivars[reg0_value] -# obj_name = class_name + "->" + ivar.name -# # print(hex(cs_insn.address)) -# # print(obj_name) -# elif hex(reg0_value) in mach_info.cfstrings: -# obj_name = "NSString" -# else: # static vars -# # print(hex(cs_insn.address)) -# # print(hex(reg0_value)) -# # inter.current_state() -# try: -# static_name = mach_info.statics[hex(reg0_value)] -# obj_name = mach_info.symbols[hex(static_name)] -# except Exception as e: -# print("Some error happens during analysis in get value in register 0 (Instance)") -# print("Current instruction address is %s" % hex(cs_insn.address)) -# break -# -# try: -# meth_name = mach_info.symbols[hex(reg1_value)] -# except Exception as e: -# print("Some error happens during analysis in get value in register 1 (Method)") -# print("Current instruction address is %s" % hex(cs_insn.address)) -# break -# -# return_type = mach_info.get_return_type_from_method(obj_name, meth_name) -# # if obj_name == 'UIScreen': -# # print(meth_name) -# # print(return_type) -# # 返回值这一块还得处理 -# # if return_type == 'id' or return_type == 'UILabel': # Now is id -# if not return_type == 'void': -# _g_return_types.append(return_type) -# inter.modify_regs('0', RETURN_VALUE - (len(_g_return_types) - 1)) -# # if cs_insn.address == 0x10000a5f0: -# # print(hex(RETURN_VALUE - (len(_g_return_types) - 1))) -# instruction.goto(obj_name, meth_name) -# else: -# instruction.goto("$Function", function_name) -# return_type = mach_info.get_return_type_from_function(function_name) -# if not return_type == 'void': -# _g_return_types.append(return_type) -# inter.modify_regs('0', RETURN_VALUE - (len(_g_return_types) - 1)) -# instruction_block.insert_instruction(instruction) -# # if method_name == 'headerView': -# # instruction_block.describe() -# # instruction_block.describe() -# MethodStorage.insert_instructions(instruction_block) -# methods_instructions.append(instruction_block) -# -# -# def cfg_info_provider(basic_info, imp_name): -# if basic_info == '$Function': -# return None -# else: -# method = MethodStorage.get_instructions(basic_info, imp_name) -# return method -# # MethodStorage.list_all() + address = mach_info.get_method_address('ABKWelcomeViewController', 'viewDidLoad') + + def cfg_provider(class_name, imp_name): + instruction = MethodStorage.get_instructions(class_name, imp_name) + if instruction is None: + address = mach_info.get_method_address(class_name, imp_name) + if address is not None: + method = _disasm_specified_function(arch, mode, mach_info.text, int(address, 16), mach_info.text_addr, slice_addresses) + instruction = _analyse_method(method, mach_info) + MethodStorage.insert_instructions(instruction) + return instruction + + # address = mach_info.get_method_address('PDDCrashManager', 'extractDataFromCrashReport:keyword:') + # address = mach_info.get_method_address('PDDSafeSwizzleManager', 'init') + if address is not None: + method = _disasm_specified_function(arch, mode, mach_info.text, int(address, 16), mach_info.text_addr, slice_addresses) + instruction = _analyse_method(method, mach_info) + instruction.describe() + MethodStorage.insert_instructions(instruction) + # MethodStorage.list_all() + + + # method_instructions = MethodStorage.get_instructions('ABKWelcomeViewController', 'viewDidLoad') -# cfg = generate_cfg(method_instructions, cfg_info_provider, True) -# cfg.describe() + cfg = generate_cfg(instruction, cfg_provider, True) + # cfg.describe() + cfg.view() # # for method_instructions in methods_instructions: # # generate_cfg(method_instructions, None) #