From 63aea5c6f2f4c0a0ff604297106bb659c83bfd99 Mon Sep 17 00:00:00 2001 From: Alexander Wagner Date: Thu, 2 Mar 2023 21:37:07 +0100 Subject: [PATCH] analysis/datastub/export: Download source from apt --- analysis/datastub/export.py | 108 +++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 7 deletions(-) diff --git a/analysis/datastub/export.py b/analysis/datastub/export.py index a1e4eefe..cb08db78 100644 --- a/analysis/datastub/export.py +++ b/analysis/datastub/export.py @@ -184,6 +184,97 @@ def getAsmFileInfo(addr, asm_dump): ************************************************************************* """ +DOWNLOADED_PACKAGE_SOURCES = list() + + +def searchSourceInPackages(bin_file_path, ip): + def search_in_directory(filename, filepath): + command = f"find -iname {filename}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + filepath_chunks = 1 + while len(lines) > 1: + filepath_chunks += 1 + filename = "/".join(filepath.split("/")[-filepath_chunks:]) + lines = [line for line in lines if filename in line] + if len(lines) == 1: + return lines[0] + else: + return None + + if bin_file_path not in DOWNLOADED_PACKAGE_SOURCES: + # Identify source + command = f"dpkg -S {bin_file_path}" + debug(4, f"exec: {command}") + try: + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + except subprocess.CalledProcessError: + debug(0, f"dpkg failed for {bin_file_path} @ {hex(ip)}") + return None, 0 + lines = output.splitlines() + assert len(lines) == 1 + package = lines[0].split(":")[0] + + # Download source package + command = f"apt-get source {package}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + DOWNLOADED_PACKAGE_SOURCES.append(bin_file_path) + + # Use gdb to get filename for address + command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line *{hex(ip)}' {bin_file_path}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + assert len(lines) == 1 + if "No line number information available" in lines[0]: + return None, 0 + line = lines[0].split("starts at address ")[1].split(" and ends at")[0] + line = line.split("<", 1)[1] + line = line[::-1].split(">", 1)[1] + if "+" in line: + line = line.split("+", 1)[1] + functionname = line[::-1] + filelinenumber = int(lines[0].split(" ")[1]) + + command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line {functionname}' {bin_file_path}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + if len(lines) >= 1: + debug(0, f"Warning several lines received from {command}: {lines}") + lines = lines[:1] + filepath = lines[0].split('"')[1] + filename = filepath.split("/")[-1] + + src_file_path = search_in_directory(filename, filepath) + if src_file_path is not None: + return src_file_path, filelinenumber + + # Check if there are any tar.xz with the source code + command = f"ls **/*.tar.xz | xargs -n 1 -i bash -c 'tar -tf {str('{}')} | grep {filename} | wc -l | xargs echo {str('{}')}'" + debug(4, f"exec: {command}") + output = subprocess.check_output(command, shell=True).decode("utf-8") + lines = output.splitlines() + parts_list = [parts for line in lines if int((parts := line.split(" "))[1])] + assert len(parts_list) == 1 + [tarball, cnt] = parts_list[0] + + # Extract files and remove tarball + command = f"tar -xvf {tarball}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + command = f"rm {tarball}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + src_file_path = search_in_directory(filename, filepath) + assert src_file_path is not None + return src_file_path, filelinenumber + def export_ip(ip, datafs, imgmap, info_map): if ip is None or ip == 0: @@ -211,7 +302,6 @@ def export_ip(ip, datafs, imgmap, info_map): asm_dump = "" try: debug(1, "[ASM] objdump %s", (str(bin_file_path))) - # asm_dump = subprocess.check_output(["objdump", "-Dj", ".text", bin_file_path], universal_newlines=True) with datafs.create_file(asm_file_path) as f: subprocess.call( [ @@ -243,15 +333,19 @@ def export_ip(ip, datafs, imgmap, info_map): debug(1, "[ASM] unavailable for %s in %s", (hex(addr), bin_file_path)) # Search for leak in source code src_file_path, src_line_nr = getSourceFileInfo(hex(addr), bin_file_path) + if src_file_path is None: + src_file_path, src_line_nr = searchSourceInPackages(bin_file_path, addr) + debug( + 1, + "[SRC] available in package sources for %s in %s", + (hex(addr), bin_file_path), + ) if src_file_path is not None and os.path.exists(src_file_path): datafs.add_file(src_file_path) + elif src_file_path is None: + debug(1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path)) else: - if src_file_path is None: - debug( - 1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path) - ) - else: - debug(1, "[SRC] source file %s missing", (src_file_path)) + debug(1, "[SRC] source file %s missing", (src_file_path)) ip_info = IpInfoShort( asm_file_path, asm_line_nr, src_file_path, src_line_nr )