Skip to content

Commit

Permalink
analysis/datastub/export: Download source from apt
Browse files Browse the repository at this point in the history
  • Loading branch information
aewag committed Mar 13, 2023
1 parent 424664c commit 63aea5c
Showing 1 changed file with 101 additions and 7 deletions.
108 changes: 101 additions & 7 deletions analysis/datastub/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,97 @@ def getAsmFileInfo(addr, asm_dump):
*************************************************************************
"""

DOWNLOADED_PACKAGE_SOURCES = list()


def searchSourceInPackages(bin_file_path, ip):
def search_in_directory(filename, filepath):
command = f"find -iname {filename}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
filepath_chunks = 1
while len(lines) > 1:
filepath_chunks += 1
filename = "/".join(filepath.split("/")[-filepath_chunks:])
lines = [line for line in lines if filename in line]
if len(lines) == 1:
return lines[0]
else:
return None

if bin_file_path not in DOWNLOADED_PACKAGE_SOURCES:
# Identify source
command = f"dpkg -S {bin_file_path}"
debug(4, f"exec: {command}")
try:
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
except subprocess.CalledProcessError:
debug(0, f"dpkg failed for {bin_file_path} @ {hex(ip)}")
return None, 0
lines = output.splitlines()
assert len(lines) == 1
package = lines[0].split(":")[0]

# Download source package
command = f"apt-get source {package}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

DOWNLOADED_PACKAGE_SOURCES.append(bin_file_path)

# Use gdb to get filename for address
command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line *{hex(ip)}' {bin_file_path}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
assert len(lines) == 1
if "No line number information available" in lines[0]:
return None, 0
line = lines[0].split("starts at address ")[1].split(" and ends at")[0]
line = line.split("<", 1)[1]
line = line[::-1].split(">", 1)[1]
if "+" in line:
line = line.split("+", 1)[1]
functionname = line[::-1]
filelinenumber = int(lines[0].split(" ")[1])

command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line {functionname}' {bin_file_path}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
if len(lines) >= 1:
debug(0, f"Warning several lines received from {command}: {lines}")
lines = lines[:1]
filepath = lines[0].split('"')[1]
filename = filepath.split("/")[-1]

src_file_path = search_in_directory(filename, filepath)
if src_file_path is not None:
return src_file_path, filelinenumber

# Check if there are any tar.xz with the source code
command = f"ls **/*.tar.xz | xargs -n 1 -i bash -c 'tar -tf {str('{}')} | grep {filename} | wc -l | xargs echo {str('{}')}'"
debug(4, f"exec: {command}")
output = subprocess.check_output(command, shell=True).decode("utf-8")
lines = output.splitlines()
parts_list = [parts for line in lines if int((parts := line.split(" "))[1])]
assert len(parts_list) == 1
[tarball, cnt] = parts_list[0]

# Extract files and remove tarball
command = f"tar -xvf {tarball}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

command = f"rm {tarball}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

src_file_path = search_in_directory(filename, filepath)
assert src_file_path is not None
return src_file_path, filelinenumber


def export_ip(ip, datafs, imgmap, info_map):
if ip is None or ip == 0:
Expand Down Expand Up @@ -211,7 +302,6 @@ def export_ip(ip, datafs, imgmap, info_map):
asm_dump = ""
try:
debug(1, "[ASM] objdump %s", (str(bin_file_path)))
# asm_dump = subprocess.check_output(["objdump", "-Dj", ".text", bin_file_path], universal_newlines=True)
with datafs.create_file(asm_file_path) as f:
subprocess.call(
[
Expand Down Expand Up @@ -243,15 +333,19 @@ def export_ip(ip, datafs, imgmap, info_map):
debug(1, "[ASM] unavailable for %s in %s", (hex(addr), bin_file_path))
# Search for leak in source code
src_file_path, src_line_nr = getSourceFileInfo(hex(addr), bin_file_path)
if src_file_path is None:
src_file_path, src_line_nr = searchSourceInPackages(bin_file_path, addr)
debug(
1,
"[SRC] available in package sources for %s in %s",
(hex(addr), bin_file_path),
)
if src_file_path is not None and os.path.exists(src_file_path):
datafs.add_file(src_file_path)
elif src_file_path is None:
debug(1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path))
else:
if src_file_path is None:
debug(
1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path)
)
else:
debug(1, "[SRC] source file %s missing", (src_file_path))
debug(1, "[SRC] source file %s missing", (src_file_path))
ip_info = IpInfoShort(
asm_file_path, asm_line_nr, src_file_path, src_line_nr
)
Expand Down

0 comments on commit 63aea5c

Please sign in to comment.