Skip to content

Commit

Permalink
Add --no-obfuscator flag
Browse files Browse the repository at this point in the history
  • Loading branch information
mrexodia committed Oct 22, 2024
1 parent b97db9f commit 07b84fb
Showing 1 changed file with 116 additions and 42 deletions.
158 changes: 116 additions & 42 deletions obfuscator/test.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
import os
import sys
import pefile
import argparse
import subprocess
from glob import glob

from icicle import *

# Section flags
IMAGE_SCN_MEM_SHARED = 0x10000000
IMAGE_SCN_MEM_EXECUTE = 0x20000000
IMAGE_SCN_MEM_READ = 0x40000000
IMAGE_SCN_MEM_WRITE = 0x80000000

def page_align(size: int) -> int:
return (size + 0xFFF) & ~0xFFF

class RISCVM:
def __init__(self, riscvm_run_address: int, riscvm_run_data: bytes, *, stack_size = 0x5000, heap_size = 0x5000):
def __init__(self, *, stack_size = 0x50000, heap_size = 0x50000):
self.emu = Icicle("x86_64", shadow_stack=False, jit=True)

# Allocate and write the riscvm_run function code
self.riscvm_run_address = riscvm_run_address
code_begin = riscvm_run_address & ~0xFFF
code_size = len(riscvm_run_data) + 0x1000
self.emu.mem_map(code_begin, code_size, MemoryProtection.ExecuteRead)
self.emu.mem_write(riscvm_run_address, riscvm_run_data)

# Allocate the stack
self.stack_begin = 0x10000
self.stack_size = stack_size
self.emu.mem_map(self.stack_begin, self.stack_size, MemoryProtection.ReadWrite)

# Allocate the heap
self.heap_begin = 0x20000
self.heap_begin = self.stack_begin + self.stack_size + 0x10000
self.heap_size = heap_size
self.emu.mem_map(self.heap_begin, self.heap_size, MemoryProtection.ReadWrite)

def run(self, payload: bytes):
print(f"Stack: {hex(self.stack_begin)} - {hex(self.stack_begin + self.stack_size)}")
print(f"Heap: {hex(self.heap_begin)} - {hex(self.heap_begin + self.heap_size)}")

def run(self, riscvm_run_address: int, payload: bytes):
# Reset instruction count
self.emu.icount = 0

Expand All @@ -41,7 +47,9 @@ def run(self, payload: bytes):
self.emu.mem_write(vm_bytecode, payload)

# Write VM context
a0_offset = 8 * 11
self.emu.mem_write(vm_context, vm_bytecode.to_bytes(8, "little")) # riscvm.pc
self.emu.mem_write(vm_context + a0_offset, b"\x11\x22\x33\x44\x55\x66\x77\x88") # riscvm.a0

# Set up stack
rsp = self.stack_begin + self.stack_size - 0x18
Expand Down Expand Up @@ -71,7 +79,7 @@ def preserved(value):
"r13": preserved(0x66778899AABBCCDD),
"r14": preserved(0x778899AABBCCDDEE),
"r15": preserved(0x8899AABBCCDDEEFF),
"rip": volatile(self.riscvm_run_address),
"rip": volatile(riscvm_run_address),
}
for reg, (_, value) in regvalues.items():
self.emu.reg_write(reg, value)
Expand All @@ -87,62 +95,128 @@ def preserved(value):
if value != expected:
print(f"Expected value {hex(expected)} for non-volatile register {reg}, got {hex(value)}")

a0 = int.from_bytes(self.emu.mem_read(vm_context + 8 * 11, 8), "little") # riscvm.a0
a0 = int.from_bytes(self.emu.mem_read(vm_context + a0_offset, 8), "little") # riscvm.a0
return a0
print(f"status: {status}, exception: ({self.emu.exception_code}, {hex(self.emu.exception_value)})")
print(f"RIP: {hex(rip)}")
return -1

def map_shellcode(self, address: int, shellcode: bytes):
code_begin = address & ~0xFFF
code_size = len(shellcode) + 0x1000
self.emu.mem_map(code_begin, code_size, MemoryProtection.ExecuteRead)
self.emu.mem_write(address, shellcode)

def map_image(self, pe: pefile.PE, *, image_base: int = 0):
#assert pe.FILE_HEADER.Machine == self.pe.FILE_HEADER.Machine, "Architecture mismatch"

image_size = pe.OPTIONAL_HEADER.SizeOfImage
section_alignment = pe.OPTIONAL_HEADER.SectionAlignment
assert section_alignment == 0x1000, f"Unsupported section alignment {hex(section_alignment)}"

if image_base == 0:
image_base = pe.OPTIONAL_HEADER.ImageBase

self.emu.mem_map(image_base, image_size, MemoryProtection.NoAccess)
mapped_image = pe.get_memory_mapped_image(ImageBase=image_base)
self.emu.mem_write(image_base, mapped_image)

for section in pe.sections:
name = section.Name.rstrip(b"\0")
mask = section_alignment - 1
rva = (section.VirtualAddress_adj + mask) & ~mask
va = image_base + rva
size = page_align(section.Misc_VirtualSize)
flags = section.Characteristics
assert flags & IMAGE_SCN_MEM_SHARED == 0, "Shared sections are not supported"
assert flags & IMAGE_SCN_MEM_READ != 0, "Non-readable sections are not supported"
execute = flags & IMAGE_SCN_MEM_EXECUTE
write = flags & IMAGE_SCN_MEM_WRITE
protect = MemoryProtection.ReadOnly
if write:
if execute:
protect = MemoryProtection.ExecuteReadWrite
else:
protect = MemoryProtection.ReadWrite
elif execute:
protect = MemoryProtection.ExecuteRead
self.emu.mem_protect(va, size, protect)
print(f"Mapping section '{name.decode()}' {hex(rva)}[{hex(rva)}] -> {hex(va)} as {protect}")

header_size = pe.sections[0].VirtualAddress_adj
self.emu.mem_protect(image_base, header_size, MemoryProtection.ReadOnly)

return image_base

def find_export(pe: pefile.PE, name: str) -> int:
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if symbol.name.decode() == name:
return symbol.address
raise ValueError(f"Could not find export: {name}")

def main():
parser = argparse.ArgumentParser()
parser.add_argument("riscvm", help="Path to riscvm.exe to obfuscate")
parser.add_argument("--obfuscator", help="Path to the obfuscator binary", default="build/obfuscate")
parser.add_argument("--no-transform", help="Disable transformations", action="store_true")
parser.add_argument("--no-obfuscator", help="Directly map the executable, skipping obfuscation", action="store_true")
args = parser.parse_args()

riscvm = RISCVM()

if not os.path.exists(args.riscvm):
raise FileNotFoundError(f"riscvm executable does not exist: {args.riscvm}")

if not os.path.exists(args.obfuscator):
raise FileNotFoundError(f"Obfuscator does not exist: {args.obfuscator}")

temp_dir = os.path.dirname(args.obfuscator)
riscvm_run_clean = os.path.join(temp_dir, "test_riscvm_run.clean.bin")
riscvm_run_obfuscated = os.path.join(temp_dir, "test_riscvm_run.obfuscated.bin")
riscvm_run_path = os.path.join(temp_dir, "test_riscvm_run.bin")
obfuscate_args = [
args.obfuscator,
args.riscvm,
"-output", riscvm_run_obfuscated,
"-clean-output", riscvm_run_clean
]
riscvm_run_path = riscvm_run_clean if args.no_transform else riscvm_run_obfuscated

p = subprocess.Popen(
obfuscate_args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout, _ = p.communicate(0)
exit_code = p.wait()
if exit_code != 0:
print(f"Obfuscation failed!\n{stdout.decode()}")
sys.exit(1)

with open(riscvm_run_path, "rb") as f:
riscvm_run_data = f.read()
riscvm = RISCVM(0x140001104, riscvm_run_data)
if args.no_obfuscator:
pe = pefile.PE(args.riscvm)
riscvm_run_rva = find_export(pe, "riscvm_run")
image_base = riscvm.map_image(pe)
riscvm_run_address = image_base + riscvm_run_rva
else:
if not os.path.exists(args.obfuscator):
raise FileNotFoundError(f"Obfuscator does not exist: {args.obfuscator}")

temp_dir = os.path.dirname(args.obfuscator)
riscvm_run_clean = os.path.join(temp_dir, "test_riscvm_run.clean.bin")
riscvm_run_obfuscated = os.path.join(temp_dir, "test_riscvm_run.obfuscated.bin")
riscvm_run_path = os.path.join(temp_dir, "test_riscvm_run.bin")
obfuscate_args = [
args.obfuscator,
args.riscvm,
"-output", riscvm_run_obfuscated,
"-clean-output", riscvm_run_clean
]
riscvm_run_path = riscvm_run_clean if args.no_transform else riscvm_run_obfuscated

p = subprocess.Popen(
obfuscate_args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout, _ = p.communicate(0)
exit_code = p.wait()
if exit_code != 0:
print(f"Obfuscation failed!\n{stdout.decode()}")
sys.exit(1)

with open(riscvm_run_path, "rb") as f:
riscvm_run_data = f.read()
riscvm_run_address = 0x140001104
riscvm.map_shellcode(riscvm_run_address, riscvm_run_data)

total = 0
success = 0
for test in glob(os.path.join(os.path.dirname(__file__), "..", "riscvm", "isa-tests", "rv64*")):
basename = os.path.basename(test)
_, ext = os.path.splitext(basename)
if ext != "":
continue
print(f"=== {basename} ===")
with open(test, "rb") as f:
f.seek(0x1190)
payload = f.read()
total += 1
result = riscvm.run(payload)
result = riscvm.run(riscvm_run_address, payload)
if result == 0:
print(f" SUCCESS (icount: {riscvm.emu.icount})")
success += 1
Expand Down

0 comments on commit 07b84fb

Please sign in to comment.