diff --git a/examples/debug-information/Makefile b/examples/debug-information/Makefile new file mode 100644 index 0000000..7c407bb --- /dev/null +++ b/examples/debug-information/Makefile @@ -0,0 +1,15 @@ +# Variables +CC = g++ +CFLAGS = -Wall -g +TARGET = exe +SRC = debug.cc + +# Default rule +all: $(TARGET) + +$(TARGET): $(SRC) + $(CC) $(CFLAGS) -o $@ $< + +.PHONY: clean +clean: + rm -f $(TARGET) \ No newline at end of file diff --git a/examples/debug-information/debug.cc b/examples/debug-information/debug.cc new file mode 100644 index 0000000..5e700a7 --- /dev/null +++ b/examples/debug-information/debug.cc @@ -0,0 +1,19 @@ +#include + +typedef struct Input { + int x; + int y; +} Input; + +int product(Input input) { + int result = input.x * input.y; + return result; +} + +int main() { + int a = 5; + int b = 3; + int result = product({a, b}); + std::cout << "The product is: " << result << std::endl; + return 0; +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 91dec54..c109a31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ ] dependencies = [ "capstone >= 5.0.1", - "lief >=0.13.2", + "lief >=0.14.0", "apsw >= 3.43.1.0", "sh >= 2.0.6", ] @@ -39,9 +39,9 @@ dev = [ "isort >= 5.12.0", "flake8 >= 6.1.0", "flake8-print >= 5.0.0", - "pyright >= 1.1.325", + "pyright >= 1.1.349", "pytest >= 7.4.0", - "mypy >= 1.0.0", + "mypy >= 1.8.0", "coverage[toml] >= 7.3", ] diff --git a/sqlelf/elf.py b/sqlelf/elf.py index 1539eeb..cca7665 100644 --- a/sqlelf/elf.py +++ b/sqlelf/elf.py @@ -9,6 +9,12 @@ import capstone # type: ignore import lief +# ELF.pyi has no matching py file since it's a c extension +# pyright: reportMissingModuleSource=false +# https://github.com/microsoft/pyright/issues/5950 +import lief.ELF + +from sqlelf import lief_ext from sqlelf._vendor.elftools.common.utils import bytes2str from sqlelf._vendor.elftools.dwarf.descriptions import describe_form_class from sqlelf._vendor.elftools.dwarf.die import DIE as DIE_t @@ -101,7 +107,7 @@ def register_generator( def register_dynamic_entries_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the .dynamic section virtual table.""" @@ -109,9 +115,13 @@ def dynamic_entries_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name - for entry in binary.dynamic_entries: # type: ignore - yield {"path": binary_name, "tag": entry.tag.name, "value": entry.value} + binary_name = binary.path + for entry in binary.dynamic_entries: + yield { + "path": binary_name, + "tag": entry.tag.__name__, + "value": entry.value, + } generator = Generator.make_generator( ["path", "tag", "value"], @@ -128,17 +138,17 @@ def dynamic_entries_generator() -> Iterator[dict[str, Any]]: def register_headers_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF headers virtual table,""" def headers_generator() -> Iterator[dict[str, Any]]: for binary in binaries: yield { - "path": binary.name, - "type": binary.header.file_type.name, - "machine": binary.header.machine_type.name, - "version": binary.header.identity_version.name, + "path": binary.path, + "type": binary.header.file_type.__name__, + "machine": binary.header.machine_type.__name__, + "version": binary.header.identity_version.__name__, "entry": binary.header.entrypoint, "is_pie": binary.is_pie, } @@ -158,7 +168,7 @@ def headers_generator() -> Iterator[dict[str, Any]]: def register_instructions_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the instructions virtual table. @@ -168,7 +178,7 @@ def instructions_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path for section in binary.sections: if section.has(lief.ELF.SECTION_FLAGS.EXECINSTR): @@ -208,20 +218,20 @@ def instructions_generator() -> Iterator[dict[str, Any]]: ) -def mode(binary: lief.Binary) -> int: +def mode(binary: lief_ext.Binary) -> int: if binary.header.identity_class == lief.ELF.ELF_CLASS.CLASS64: return cast(int, capstone.CS_MODE_64) - raise RuntimeError(f"Unknown mode for {binary.name}") + raise RuntimeError(f"Unknown mode for {binary.path}") -def arch(binary: lief.Binary) -> int: +def arch(binary: lief_ext.Binary) -> int: if binary.header.machine_type == lief.ELF.ARCH.x86_64: return cast(int, capstone.CS_ARCH_X86) - raise RuntimeError(f"Unknown machine type for {binary.name}") + raise RuntimeError(f"Unknown machine type for {binary.path}") def register_sections_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF sections virtual table.""" @@ -229,14 +239,14 @@ def sections_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path for section in binary.sections: yield { "path": binary_name, "name": section.name, "offset": section.offset, "size": section.size, - "type": section.type.name, + "type": section.type.__name__, "content": bytes(section.content), } @@ -262,7 +272,7 @@ def coerce_section_name(name: str | None) -> str | None: def register_strings_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF strings virtual table. @@ -278,7 +288,7 @@ def strings_generator() -> Iterator[dict[str, Any]]: ] # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path for strtab in strtabs: # The first byte is always the null byte in the STRTAB # Python also treats the final null in the string by creating @@ -330,7 +340,7 @@ def split_with_index(str: str, delimiter: str) -> list[tuple[int, str]]: def register_symbols_generator( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF symbols virtual table.""" @@ -338,13 +348,15 @@ def symbols_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path for symbol in symbols(binary): # The section index can be special numbers like 65521 or 65522 # that refer to special sections so they can't be indexed section_name: str | None = next( ( - section.name + # technically name can be bytes, for now avoid this possibility + # https://github.com/lief-project/LIEF/issues/965#issuecomment-1718702335 + cast(str, section.name) for shndx, section in enumerate(binary.sections) if shndx == symbol.shndx ), @@ -376,7 +388,7 @@ def symbols_generator() -> Iterator[dict[str, Any]]: and symbol.symbol_version.symbol_version_auxiliary else None ), - "type": symbol.type.name, + "type": symbol.type.__name__, "value": symbol.value, } @@ -412,7 +424,7 @@ def symbols_generator() -> Iterator[dict[str, Any]]: def register_version_requirements( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF version requirements virtual table. @@ -423,8 +435,8 @@ def version_requirements_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name - symbol_version_req = binary.symbols_version_requirement # type: ignore + binary_name = binary.path + symbol_version_req = binary.symbols_version_requirement for version_requirement in symbol_version_req: file = version_requirement.name for aux_requirement in version_requirement.get_auxiliary_symbols(): @@ -449,7 +461,7 @@ def version_requirements_generator() -> Iterator[dict[str, Any]]: def register_version_definitions( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the ELF version requirements virtual table. @@ -460,8 +472,8 @@ def version_definitions_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name - symbol_version_def = binary.symbols_version_definition # type: ignore + binary_name = binary.path + symbol_version_def = binary.symbols_version_definition for version_definition in symbol_version_def: flags = version_definition.flags for aux_definition in version_definition.auxiliary_symbols: @@ -486,7 +498,7 @@ def version_definitions_generator() -> Iterator[dict[str, Any]]: def register_dwarf_dies( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the DWARF DIE (Debugging Information Entry) virtual table.""" @@ -524,7 +536,7 @@ def dwarf_dies_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path # A bit annoying but we must re-open the file # since we are using a different library here with open(binary_name, "rb") as f: @@ -568,7 +580,7 @@ def dwarf_dies_generator() -> Iterator[dict[str, Any]]: def register_dwarf_dies_graph( - binaries: list[lief.Binary], connection: apsw.Connection, cache_flags: CacheFlag + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag ) -> None: """Create the DWARF DIE (Debugging Information Entry) graph virtual table.""" @@ -576,7 +588,7 @@ def dwarf_dies_graph_generator() -> Iterator[dict[str, Any]]: for binary in binaries: # super important that these accessors are pulled out of the tight loop # as they can be costly - binary_name = binary.name + binary_name = binary.path # A bit annoying but we must re-open the file # since we are using a different library here with open(binary_name, "rb") as f: @@ -612,7 +624,7 @@ def dwarf_dies_graph_generator() -> Iterator[dict[str, Any]]: ) -def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]: +def symbols(binary: lief_ext.Binary) -> Sequence[lief.ELF.Symbol]: """Use heuristic to either get static symbols or dynamic symbol table Always return the dynamic symbol table first and then the static symbols @@ -623,7 +635,7 @@ def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]: will not include version information. """ static_symbols: Sequence[lief.ELF.Symbol] = binary.static_symbols # type: ignore - dynamic_symbols = list(binary.dynamic_symbols) # type: ignore + dynamic_symbols = list(binary.dynamic_symbols) dynamic_symbol_names = set(map(lambda s: s.name, dynamic_symbols)) all_symbols = dynamic_symbols + [ s for s in static_symbols if s.name not in dynamic_symbol_names @@ -633,7 +645,7 @@ def symbols(binary: lief.Binary) -> Sequence[lief.ELF.Symbol]: def register_virtual_tables( connection: apsw.Connection, - binaries: list[lief.Binary], + binaries: list[lief_ext.Binary], cache_flags: CacheFlag = CacheFlag.INSTRUCTIONS | CacheFlag.SYMBOLS, ) -> None: """Register the virtual table modules. diff --git a/sqlelf/lief_ext.py b/sqlelf/lief_ext.py new file mode 100644 index 0000000..d030461 --- /dev/null +++ b/sqlelf/lief_ext.py @@ -0,0 +1,38 @@ +# pyright: strict +from typing import TYPE_CHECKING, Any, Optional + +# ELF.pyi has no matching py file since it's a c extension +# pyright: reportMissingModuleSource=false +# https://github.com/microsoft/pyright/issues/5950 +import lief.ELF + +# Let's make sure type checking works for this proxy class +# https://stackoverflow.com/questions/71365594/how-to-make-a-proxy-object-with-typing-as-underlying-object-in-python +if TYPE_CHECKING: + base = lief.ELF.Binary +else: + base = object + + +class Binary(base): + """Proxy the lief.Binary object to add a path attribute. + + As of https://github.com/lief-project/LIEF/issues/839 the name + attribute in lief.Binary was removed. Rather than passing around + a tuple let's create a nice proxy class. + """ + + def __init__(self, path: str): + self.path = path + self.__binary: Optional[lief.ELF.Binary] = lief.ELF.parse( # pyright: ignore + path + ) + + if not TYPE_CHECKING: + + def __getattr__(self, attr: str) -> Any: + return getattr(self.__binary, attr) + + @staticmethod + def is_elf(path: str) -> bool: + return lief.is_elf(path) diff --git a/sqlelf/sql.py b/sqlelf/sql.py index 44f20d7..c55f732 100644 --- a/sqlelf/sql.py +++ b/sqlelf/sql.py @@ -10,7 +10,7 @@ import lief import sh # type: ignore -from sqlelf import elf +from sqlelf import elf, lief_ext @dataclass @@ -52,9 +52,9 @@ def execute( pass -def find_libraries(binary: lief.Binary) -> Dict[str, str]: +def find_libraries(binary: lief_ext.Binary) -> Dict[str, str]: """Use the interpreter in a binary to determine the path of each linked library""" - interpreter = binary.interpreter # type: ignore + interpreter = binary.interpreter # interpreter can be none/empty if it is a static linked binary # or a dynamic linked binary itself if not interpreter: @@ -66,7 +66,7 @@ def find_libraries(binary: lief.Binary) -> Dict[str, str]: # so we return an empty dictionary # This can happen if we are building binaries wth Nix return {} - resolution = interpreter_cmd("--list", binary.name) + resolution = interpreter_cmd("--list", binary.path) result = OrderedDict() # TODO: Figure out why `--list` and `ldd` produce different outcomes # specifically for the interpreter. @@ -98,8 +98,8 @@ def make_sql_engine( libraries needed by each binary cache_flags: bit flag that controls which tables to cache """ - binaries: list[lief.Binary] = [ - lief.parse(filename) for filename in filenames if lief.is_elf(filename) + binaries: list[lief_ext.Binary] = [ + lief_ext.Binary(filename) for filename in filenames if lief.is_elf(filename) ] connection = apsw.Connection(":memory:") @@ -117,7 +117,9 @@ def make_sql_engine( for library in sub_list ] ) - binaries = binaries + [lief.parse(library) for library in shared_libraries_set] + binaries = binaries + [ + lief_ext.Binary(library) for library in shared_libraries_set + ] elf.register_virtual_tables(connection, binaries, cache_flags) return SQLEngine(connection) diff --git a/tests/test_sql.py b/tests/test_sql.py index 2f22538..473f0f2 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -1,21 +1,21 @@ +from dataclasses import dataclass from unittest.mock import patch -import lief import sh # type: ignore -from sqlelf import sql +from sqlelf import lief_ext, sql def test_simple_binary_real() -> None: - binary = lief.parse("/bin/ls") + binary = lief_ext.Binary("/bin/ls") result = sql.find_libraries(binary) assert len(result) > 0 @patch("sh.Command") def test_simple_binary_mocked(Command: sh.Command) -> None: - binary = lief.parse("/bin/ls") - interpreter = binary.interpreter # type: ignore + binary = lief_ext.Binary("/bin/ls") + interpreter = binary.interpreter expected_return_value = """ linux-vdso.so.1 (0x00007ffc5d8ff000) /lib/x86_64-linux-gnu/libnss_cache.so.2 (0x00007f6995d92000) @@ -42,46 +42,61 @@ def test_simple_binary_mocked(Command: sh.Command) -> None: def test_find_libraries_no_interpreter() -> None: - binary = lief.parse("/bin/ls") - binary.interpreter = "" # type: ignore + binary = lief_ext.Binary("/bin/ls") + binary.interpreter = "" result = sql.find_libraries(binary) assert len(result) == 0 def test_find_libraries_missing_interpreter() -> None: - binary = lief.parse("/bin/ls") - binary.interpreter = "/nix/store/something/ld-linux.so.2" # type: ignore + binary = lief_ext.Binary("/bin/ls") + binary.interpreter = "/nix/store/something/ld-linux.so.2" result = sql.find_libraries(binary) assert len(result) == 0 -def test_simple_select_header() -> None: - # TODO(fzakaria): Figure out a better binary to be doing that we control - engine = sql.make_sql_engine(["/bin/ls"]) - result = list(engine.execute("SELECT * FROM elf_headers LIMIT 1")) - assert len(result) == 1 - assert "path" in result[0] - assert "type" in result[0] - assert "version" in result[0] - assert "machine" in result[0] - assert "entry" in result[0] - +def test_all_selects() -> None: + """This test gets all the tables that should be created by sqlelf + as they are prefixed with elf_ and tries to fetch all columns -def test_simple_select_version_requirements() -> None: - # TODO(fzakaria): Figure out a better binary to be doing that we control + This is a pretty good way to get a quick exhausting test over all + the functionality.""" + # Generate all the SELECT statements for us + select_all_sql = """SELECT 'SELECT * FROM ' || name || ' LIMIT 1' as 'sql' + FROM sqlite_schema where name LIKE 'elf_%' AND type = 'table'""" engine = sql.make_sql_engine(["/bin/ls"]) - result = list(engine.execute("SELECT * FROM elf_version_requirements LIMIT 1")) - assert len(result) == 1 - assert "path" in result[0] - assert "file" in result[0] - assert "name" in result[0] - - -def test_select_zero_rows() -> None: + results = list(engine.execute(select_all_sql)) + for result in results: + assert len(list(engine.execute(result["sql"]))) == 1 + + +@dataclass +class TestCase: + table: str + columns: list[str] + + +def test_simple_selects() -> None: + test_cases = [ + TestCase( + "elf_headers", ["path", "type", "version", "machine", "entry", "is_pie"] + ), + TestCase( + "elf_instructions", + ["path", "section", "mnemonic", "address", "operands", "size"], + ), + TestCase("elf_version_requirements", ["path", "file", "name"]), + ] # TODO(fzakaria): Figure out a better binary to be doing that we control engine = sql.make_sql_engine(["/bin/ls"]) - result = list(engine.execute("SELECT * FROM elf_headers LIMIT 0")) - assert len(result) == 0 + for test_case in test_cases: + result = list(engine.execute(f"SELECT * FROM {test_case.table} LIMIT 1")) + assert len(result) == 1 + assert all(column in result[0] for column in test_case.columns) + + # also test selecting a LIMIT of 0 as that can require special handling + result = list(engine.execute(f"SELECT * FROM {test_case.table} LIMIT 0")) + assert len(result) == 0 def test_non_existent_file() -> None: