diff --git a/.vscode/settings.json b/.vscode/settings.json index d9d540c..02589be 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,5 +3,12 @@ "editor.formatOnSave": true, "[python]": { "editor.defaultFormatter": "ms-python.black-formatter" + }, + "files.exclude": { + "**/.direnv": true, + "**/.mypy_cache": true, + "**/.pytest_cache": true, + "**/.ruff_cache": true, + "**/*.egg-info": true } } \ No newline at end of file diff --git a/sqlelf/elf.py b/sqlelf/elf.py index cca7665..420f916 100644 --- a/sqlelf/elf.py +++ b/sqlelf/elf.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from dataclasses import dataclass from enum import Flag, auto from typing import Any, Callable, Iterator, Sequence, Tuple, cast @@ -59,6 +60,7 @@ class CacheFlag(Flag): VERSION_DEFINITIONS = auto() DWARF_DIE = auto() DWARF_DIE_CALL_GRAPH = auto() + DWARF_DEBUG_LINES = auto() @classmethod def from_string(cls: type[CacheFlag], str: str) -> CacheFlag: @@ -624,6 +626,82 @@ def dwarf_dies_graph_generator() -> Iterator[dict[str, Any]]: ) +def register_dwarf_debug_lines( + binaries: list[lief_ext.Binary], connection: apsw.Connection, cache_flags: CacheFlag +) -> None: + """Create the DWARF debug_lines virtual table.""" + + def dwarf_debug_lines_generator() -> Iterator[dict[str, Any]]: + for binary in binaries: + # super important that these accessors are pulled out of the tight loop + # as they can be costly + binary_name = binary.path + # A bit annoying but we must re-open the file + # since we are using a different library here + with open(binary_name, "rb") as f: + elf_file = ELFFile(f) + if not elf_file.has_dwarf_info(): + continue + # get_dwarf_info returns a DWARFInfo context object, which is the + # starting point for all DWARF-based processing in pyelftools. + dwarf_info = elf_file.get_dwarf_info() + for CU in dwarf_info.iter_CUs(): + debug_lines = dwarf_info.line_program_for_CU(CU) + if debug_lines is None: + continue + file_entries = debug_lines.header["file_entry"] + directory_entries = debug_lines.header["include_directory"] + # The line program, when decoded, returns a list of line program + # entries. Each entry contains a state, which we'll use to build + # a reverse mapping of filename -> #entries. + lp_entries = debug_lines.get_entries() + for lpe in lp_entries: + # We skip LPEs that don't have an associated file. + # This can happen if instructions in the compiled binary + # don't correspond directly to any original source file. + if not lpe.state or lpe.state.file == 0: + continue + + # File and directory indices are 1-indexed. + file_entry = file_entries[lpe.state.file - 1] + dir_index = file_entry["dir_index"] + directory = ( + directory_entries[dir_index - 1] + if dir_index > 0 + else "".encode() + ) + + filename = os.path.join(directory, file_entry.name) + + yield { + "path": binary_name, + "filename": bytes2str(filename), + "address": lpe.state.address, + "line": lpe.state.line, + "column": lpe.state.column, + "cu_offset": CU.cu_offset, + } + + generator = Generator.make_generator( + ["path", "filename", "address", "line", "column", "cu_offset"], + dwarf_debug_lines_generator, + ) + + register_generator( + connection, + generator, + "dwarf_debug_lines", + CacheFlag.DWARF_DEBUG_LINES, + cache_flags, + ) + + if CacheFlag.DWARF_DEBUG_LINES in cache_flags: + connection.execute( + """CREATE INDEX dwarf_debug_lines_cu_offset_idx + ON dwarf_debug_lines (cu_offset);""" + ) + + def symbols(binary: lief_ext.Binary) -> Sequence[lief.ELF.Symbol]: """Use heuristic to either get static symbols or dynamic symbol table @@ -668,6 +746,7 @@ def register_virtual_tables( register_version_definitions, register_dwarf_dies, register_dwarf_dies_graph, + register_dwarf_debug_lines, ] for register_function in register_table_functions: register_function(binaries, connection, cache_flags) diff --git a/tests/test_sql.py b/tests/test_sql.py index 473f0f2..f0cc2b5 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -63,29 +63,32 @@ def test_all_selects() -> None: the functionality.""" # Generate all the SELECT statements for us select_all_sql = """SELECT 'SELECT * FROM ' || name || ' LIMIT 1' as 'sql' - FROM sqlite_schema where name LIKE 'elf_%' AND type = 'table'""" + FROM sqlite_schema + WHERE (name LIKE 'elf_%' OR name LIKE 'dwarf_%') + AND type = 'table'""" engine = sql.make_sql_engine(["/bin/ls"]) results = list(engine.execute(select_all_sql)) + assert len(results) > 0 for result in results: assert len(list(engine.execute(result["sql"]))) == 1 @dataclass -class TestCase: +class SimpleSQLTestCase: table: str columns: list[str] def test_simple_selects() -> None: test_cases = [ - TestCase( + SimpleSQLTestCase( "elf_headers", ["path", "type", "version", "machine", "entry", "is_pie"] ), - TestCase( + SimpleSQLTestCase( "elf_instructions", ["path", "section", "mnemonic", "address", "operands", "size"], ), - TestCase("elf_version_requirements", ["path", "file", "name"]), + SimpleSQLTestCase("elf_version_requirements", ["path", "file", "name"]), ] # TODO(fzakaria): Figure out a better binary to be doing that we control engine = sql.make_sql_engine(["/bin/ls"])