diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 230e70b..2e20521 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -25,18 +25,18 @@ jobs: uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - - name: Lint with flake8 + - name: Install dependencies run: | - python -m pip install flake8 - flake8 . --max-complexity=14 --max-line-length=127 - - name: Verify sorted imports + python3 -m pip install --upgrade pip + python3 -m pip install pre-commit + - name: Run pre-commit hooks run: | - python -m pip install isort - isort . -m HANGING_INDENT -l 120 --check-only + pre-commit run --all-files - name: Test install run: | - python -m pip install --upgrade pip - python -m pip install -U . - - name: Test show usage + xcrun python3 -m pip install --upgrade pip + xcrun python3 -m pip install -r requirements.txt + xcrun python3 -m pip install -e ".[test]" + - name: Run pytest run: | - python -m hilda + sudo xcrun python3 -m pytest -k "not set_implementation" \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..7805f42 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,15 @@ +# .pre-commit-config.yaml +repos: + - repo: https://github.com/pre-commit/mirrors-isort + rev: v5.9.3 + hooks: + - id: isort + args: [ '-m', 'HANGING_INDENT', '-l', '120','--check-only' ] + files: \.py$ + + - repo: https://github.com/pycqa/flake8 + rev: "7.0.0" + hooks: + - id: flake8 + args: [ '--max-complexity=14', '--max-line-length=127' ] + files: \.py$ \ No newline at end of file diff --git a/README.md b/README.md index e4cee24..ffe3b54 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,29 @@ - [Description](#description) - [Installation](#installation) - [How to use](#how-to-use) - * [Starting a Hilda shell](#starting-a-hilda-shell) - + [Bare mode](#bare-mode) - + [Remote mode](#remote-mode) - * [Usage](#usage) - * [Symbol objects](#symbol-objects) - * [Globalized symbols](#globalized-symbols) + - [Starting a Hilda shell](#starting-a-hilda-shell) + - [Attach mode](#attach-mode) + - [Launch mode](#launch-mode) + - [Bare mode](#bare-mode) + - [Remote mode](#remote-mode) + - [The connected device is connected via network](#the-connected-device-is-connected-via-network) + - [Startup Files](#startup-files) + - [Usage](#usage) + - [Magic functions](#magic-functions) + - [Shortcuts](#shortcuts) + - [Configurables](#configurables) + - [Attributes](#attributes) + - [Example Usage](#example-usage) + - [UI Configuration](#ui-configuration) + - [Symbol objects](#symbol-objects) + - [Globalized symbols](#globalized-symbols) - [Searching for the right symbol](#searching-for-the-right-symbol) + - [Objective-C Classes](#objective-c-classes) + - [Objective-C Objects](#objective-c-objects) + - [Using snippets](#using-snippets) +- [Contributing](#contributing) - + [Objective-C Classes](#objective-c-classes) - * [Objective-C Objects](#objective-c-objects) - * [Using snippets](#using-snippets) - * [Contributing](#contributing) +Would you like any further adjustments? # Description @@ -66,7 +77,21 @@ Use the attach sub-command in order to start an LLDB shell attached to given pro hilda attach [-p pid] [-n process-name] ``` -After attaching, simply execute `hilda` command to enter the hilda shell. +### Launch mode + +Use the attach sub-command in order to launch given process. + +```shell +hilda launch /path/to/executable \ + --argv arg1 --argv arg2 \ + --envp NAME=Alice --envp AGE=30 \ + --stdin /path/to/input.txt \ + --stdout /path/to/output.txt \ + --stderr /path/to/error.txt \ + --wd /path/to/working/directory \ + --flags 0x01 \ + --stop-at-entry + ``` ### Bare mode @@ -120,6 +145,23 @@ Run the following command: hilda remote HOSTNAME PORT ``` +## Startup Files + +Each command can accept startup files to execute on start. As opposed to snippets, the startup files can accept Hilda +syntax. + +#### Startup File Example + +```python +cfg.objc_verbose_monitor = True +p.bp(ADDRESS) +p.cont() +``` + +```shell +hilda remote HOSTNAME PORT -f startupfile1 -f startupfile2 +``` + ## Usage Upon starting Hilda shell, you are greeted with: @@ -270,17 +312,6 @@ Here is a gist of methods you can access from `p`: evaluate_expression(f'[[{currentDevice} systemName] hasPrefix:@"2"]') - `import_module` - Import & reload given python module (intended mainly for external snippets) -- `set_evaluation_unwind` - - Set whether LLDB will attempt to unwind the stack whenever an expression evaluation error occurs. - Use unwind() to restore when an error is raised in this case. -- `get_evaluation_unwind` - - Get evaluation unwind state. - When this value is True, LLDB will attempt unwinding the stack on evaluation errors. - Otherwise, the stack frame will remain the same on errors to help you investigate the error. -- `set_evaluation_ignore_breakpoints` - - Set whether to ignore breakpoints while evaluating expressions -- `get_evaluation_ignore_breakpoints` - - Get evaluation "ignore-breakpoints" state. - `unwind` - Unwind the stack (useful when get_evaluation_unwind() == False) @@ -293,6 +324,32 @@ Sometimes accessing the python API can be tiring, so we added some magic functio - `%fbp ` - Equivalent to: `p.file_symbol(addressInHex, filename).bp()` +## Shortcuts + +- **F7**: Step Into +- **F8**: Step Over +- **F9**: Continue +- **F10**: Stop + +## Configurables + +The global `cfg` used to configure various settings for evaluation and monitoring. + +### Attributes + +- `evaluation_unwind_on_error`: Whether to unwind on error during evaluation. (Default: `False`) +- `evaluation_ignore_breakpoints`: Whether to ignore breakpoints during evaluation. (Default: `False`) +- `nsobject_exclusion`: Whether to exclude `NSObject` during evaluation, reducing IPython autocomplete results. ( + Default: `False`) +- `objc_verbose_monitor`: When set to `True`, using `monitor()` will automatically print Objective-C method arguments. ( + Default: `False`) + +### Example Usage + +```python +cfg.objc_verbose_monitor = True +``` + ## UI Configuration Hilda contains minimal UI for examining the target state. @@ -650,9 +707,5 @@ This will monitor all XPC related traffic in the given process. Please run the tests as follows before submitting a PR: ```shell -xcrun python3 -m tests aggregated - -# wait for lldb shell prompt - -run_tests +xcrun python3 -m pytest ``` diff --git a/hilda/__main__.py b/hilda/__main__.py index b4d0409..4efb81d 100644 --- a/hilda/__main__.py +++ b/hilda/__main__.py @@ -3,3 +3,7 @@ def main(): cli() + + +if __name__ == '__main__': + main() diff --git a/hilda/cli.py b/hilda/cli.py new file mode 100644 index 0000000..707a246 --- /dev/null +++ b/hilda/cli.py @@ -0,0 +1,89 @@ +import logging +from pathlib import Path +from typing import List, Mapping, Optional + +import click +import coloredlogs + +from hilda import launch_lldb +from hilda._version import version + +DEFAULT_HILDA_PORT = 1234 + +coloredlogs.install(level=logging.DEBUG) + + +@click.group() +def cli(): + pass + + +startup_files_option = click.option('-f', '--startup_files', multiple=True, default=None, help='Files to run on start') + + +def parse_envp(ctx: click.Context, param: click.Parameter, value: List[str]) -> List[str]: + env_list = [] + for item in value: + try: + key, val = item.split('=', 1) + env_list.append(f'{key}={val}') + except ValueError: + raise click.BadParameter(f'Invalid format for --envp: {item}. Expected KEY=VALUE.') + return env_list + + +@cli.command('remote') +@click.argument('hostname', default='localhost') +@click.argument('port', type=click.INT, default=DEFAULT_HILDA_PORT) +@startup_files_option +def remote(hostname: str, port: int, startup_files: Optional[List[str]] = None) -> None: + """ Connect to remote debugserver at given address """ + launch_lldb.remote(hostname, port, startup_files) + + +@cli.command('attach') +@click.option('-n', '--name', help='process name to attach') +@click.option('-p', '--pid', type=click.INT, help='pid to attach') +@startup_files_option +def attach(name: str, pid: int, startup_files: Optional[List[str]] = None) -> None: + """ Attach to given process and start a lldb shell """ + launch_lldb.attach(name=name, pid=pid, startup_files=startup_files) + + +@cli.command('launch') +@click.argument('exec_path') +@click.option('--argv', multiple=True, help='Command line arguments to pass to the process') +@click.option('--envp', multiple=True, callback=parse_envp, help='Environment variables in the form KEY=VALUE') +@click.option('--stdin', type=Path, help='Redirect stdin from this file path') +@click.option('--stdout', type=Path, help='Redirect stdout to this file path') +@click.option('--stderr', type=Path, help='Redirect stderr to this file path') +@click.option('--cwd', type=Path, help='Set the working directory for the process') +@click.option('--flags', type=click.INT, default=0, help='Launch flags (bitmask)') +@click.option('--stop-at-entry', is_flag=True, help='Stop the process at the entry point') +@startup_files_option +def launch(exec_path: str, argv: Optional[List] = None, envp: Optional[Mapping] = None, + stdin: Optional[Path] = None, + stdout: Optional[Path] = None, stderr: Optional[Path] = None, cwd: Optional[Path] = None, + flags: Optional[int] = 0, stop_at_entry: Optional[bool] = False, + startup_files: Optional[List[str]] = None) -> None: + """ Attach to given process and start a lldb shell """ + if not argv: + argv = None + if not envp: + envp = None + launch_lldb.launch(exec_path, argv, envp, stdin, stdout, stderr, cwd, flags, stop_at_entry, + startup_files) + + +@cli.command('bare') +def cli_bare(): + """ Just start a lldb shell """ + commands = [f'command script import {Path(__file__).resolve().parent / "lldb_entrypoint.py"}'] + commands = '\n'.join(commands) + launch_lldb.execute(f'lldb --one-line "{commands}"') + + +@cli.command('version') +def cli_version(): + """Show the version information.""" + click.echo(version) diff --git a/hilda/exceptions.py b/hilda/exceptions.py index 728ee3b..f81bae6 100644 --- a/hilda/exceptions.py +++ b/hilda/exceptions.py @@ -1,7 +1,7 @@ __all__ = ['HildaException', 'SymbolAbsentError', 'EvaluatingExpressionError', 'CreatingObjectiveCSymbolError', 'ConvertingToNsObjectError', 'ConvertingFromNSObjectError', 'DisableJetsamMemoryChecksError', 'GettingObjectiveCClassError', 'AccessingRegisterError', 'AccessingMemoryError', - 'BrokenLocalSymbolsJarError', 'AddingLldbSymbolError'] + 'BrokenLocalSymbolsJarError', 'AddingLldbSymbolError', 'LLDBException'] class HildaException(Exception): @@ -9,6 +9,14 @@ class HildaException(Exception): pass +class LLDBException(Exception): + """ A domain exception for lldb errors. """ + + def __init__(self, message: str): + super().__init__() + self.message = message + + class SymbolAbsentError(HildaException): """ Raise when trying to get a symbol that doesn't exist. """ pass diff --git a/hilda/hilda_client.py b/hilda/hilda_client.py index c0b39d2..ba52a1c 100644 --- a/hilda/hilda_client.py +++ b/hilda/hilda_client.py @@ -1,4 +1,3 @@ -import ast import base64 import builtins import importlib @@ -8,18 +7,19 @@ import os import pickle import struct +import sys import time import typing from collections import namedtuple from contextlib import contextmanager, suppress +from dataclasses import dataclass, field from datetime import datetime, timezone -from functools import cached_property +from functools import cached_property, wraps from pathlib import Path from typing import Any, Callable, List, Optional, Union import hexdump import IPython -import lldb from humanfriendly import prompts from humanfriendly.terminal.html import html_to_ansi from IPython.core.magic import register_line_magic # noqa: F401 @@ -49,44 +49,32 @@ lldb.KEYSTONE_SUPPORT = False print('failed to import keystone. disabling some features') -with open(os.path.join(Path(__file__).resolve().parent, 'hilda_ascii_art.html'), 'r') as f: - hilda_art = f.read() +hilda_art = Path(__file__).resolve().parent.joinpath('hilda_ascii_art.html').read_text() GREETING = f""" {hilda_art} Hilda has been successfully loaded! 😎 -Use the p global to access all features. +Usage: + p Global to access all features. + F7 Step Into. + F8 Step Over. + F9 Continue. + F10 Stop. + Have a nice flight ✈️! Starting an IPython shell... """ -MAGIC_FUNCTIONS = """ -import shlex -from IPython.core.magic import register_line_magic, needs_local_scope - -@register_line_magic -@needs_local_scope -def objc(line, local_ns=None): - p = local_ns['p'] - className = line.strip() - if not className: - p.log_error("Error: className is required.") - return - try: - local_ns[className] = p.objc_get_class(className) - p.log_info(f'{className} class loaded successfully') - except Exception: - p.log_error(f'Error loading class {className}') - - -@register_line_magic -@needs_local_scope -def fbp(line, local_ns=None): - p = local_ns['p'] - module_name, address = shlex.split(line.strip()) - address = int(address, 16) - p.file_symbol(address, module_name).bp() -""" + +def disable_logs() -> None: + logging.getLogger('asyncio').disabled = True + logging.getLogger('parso.cache').disabled = True + logging.getLogger('parso.cache.pickle').disabled = True + logging.getLogger('parso.python.diff').disabled = True + logging.getLogger('humanfriendly.prompts').disabled = True + logging.getLogger('blib2to3.pgen2.driver').disabled = True + logging.getLogger('hilda.launch_lldb').disabled = True + SerializableSymbol = namedtuple('SerializableSymbol', 'address type_ filename') @@ -118,6 +106,20 @@ def __str__(self): return config_str +def stop_is_needed(func: Callable): + """Decorator to check if the process must be stopped before proceeding.""" + + @wraps(func) + def wrapper(self, *args, **kwargs): + is_running = self.process.GetState() == lldb.eStateRunning + if is_running: + self.logger.error(f'Cannot {func.__name__.replace("_", "-")}: Process must be stopped first.') + return + return func(self, *args, **kwargs) + + return wrapper + + class HildaClient: Breakpoint = namedtuple('Breakpoint', 'address options forced callback') @@ -165,7 +167,7 @@ def lsof(self) -> dict: # convert FDs into int return {int(k): v for k, v in result.items()} - def bt(self, should_print=True, depth: Optional[int] = None) -> List: + def bt(self, should_print: bool = False, depth: Optional[int] = None) -> List[Union[str, lldb.SBFrame]]: """ Print an improved backtrace. """ backtrace = [] for i, frame in enumerate(self.thread.frames): @@ -272,6 +274,7 @@ def rebind_symbols(self, image_range=None, filename_expr=''): globals()['symbols'] = self.symbols self._symbols_loaded = True + @stop_is_needed def poke(self, address, buf: bytes): """ Write data at given address @@ -286,6 +289,7 @@ def poke(self, address, buf: bytes): return retval + @stop_is_needed def poke_text(self, address: int, code: str) -> int: """ Write instructions to address. @@ -297,6 +301,7 @@ def poke_text(self, address: int, code: str) -> int: bytecode, count = self._ks.asm(code, as_bytes=True) return self.poke(address, bytecode) + @stop_is_needed def peek(self, address, size: int) -> bytes: """ Read data at given address @@ -315,6 +320,7 @@ def peek(self, address, size: int) -> bytes: return retval + @stop_is_needed def peek_str(self, address: Symbol) -> str: """ Peek a buffer till null termination @@ -323,7 +329,7 @@ def peek_str(self, address: Symbol) -> str: """ return address.po('char *')[1:-1] # strip the "" - def stop(self): + def stop(self, *args) -> None: """ Stop process. """ self.debugger.SetAsync(False) @@ -334,8 +340,10 @@ def stop(self): if not self.process.Stop().Success(): self.log_critical('failed to stop process') + else: + self.log_info('Process Stopped') - def cont(self): + def cont(self, *args) -> None: """ Continue process. """ is_running = self.process.GetState() == lldb.eStateRunning @@ -349,6 +357,8 @@ def cont(self): if not self.process.Continue().Success(): self.log_critical('failed to continue process') + else: + self.log_info('Process Continued') def detach(self): """ @@ -359,8 +369,12 @@ def detach(self): """ if not self.process.Detach().Success(): self.log_critical('failed to detach') + else: + self.log_info('Process Detached') - def disass(self, address, buf, flavor='intel', should_print=True) -> lldb.SBInstructionList: + @stop_is_needed + def disass(self, address: int, buf: bytes, flavor: str = 'intel', + should_print: bool = False) -> lldb.SBInstructionList: """ Print disassembly from a given address :param flavor: @@ -549,14 +563,16 @@ def finish(self): self.thread.StepOutOfFrame(self.frame) self._bp_frame = None - def step_into(self): + @stop_is_needed + def step_into(self, *args): """ Step into current instruction. """ with self.sync_mode(): self.thread.StepInto() if self.ui_manager.active: self.ui_manager.show() - def step_over(self): + @stop_is_needed + def step_over(self, *args): """ Step over current instruction. """ with self.sync_mode(): self.thread.StepOver() @@ -1008,28 +1024,32 @@ def add_lldb_symbol(self, symbol: lldb.SBSymbol) -> Symbol: return value - def interact(self, additional_namespace: typing.Mapping = None) -> None: + def interact(self, additional_namespace: Optional[typing.Mapping] = None, + startup_files: Optional[List[str]] = None) -> None: """ Start an interactive Hilda shell """ if not self._dynamic_env_loaded: self.init_dynamic_environment() print('\n') self.log_info(html_to_ansi(GREETING)) + ipython_config = Config() + ipython_config.IPCompleter.use_jedi = True + ipython_config.BaseIPythonApplication.profile = 'hilda' + ipython_config.InteractiveShellApp.extensions = ['hilda.ipython_extensions.magics', + 'hilda.ipython_extensions.events', + 'hilda.ipython_extensions.keybindings'] + ipython_config.InteractiveShellApp.exec_lines = ["disable_logs()"] + if startup_files is not None: + ipython_config.InteractiveShellApp.exec_files = startup_files + self.log_debug(f'Startup files - {startup_files}') - config = Config() - config.IPCompleter.use_jedi = True - config.InteractiveShellApp.exec_lines = [ - """disable_logs()""", - """IPython.get_ipython().events.register('pre_run_cell', self._ipython_run_cell_hook)""", - MAGIC_FUNCTIONS, - ] - config.BaseIPythonApplication.profile = 'hilda' namespace = globals() namespace.update(locals()) namespace['p'] = self if additional_namespace is not None: namespace.update(additional_namespace) - - IPython.start_ipython(config=config, user_ns=namespace) + sys.argv = ['a'] + IPython.start_ipython(config=ipython_config, user_ns=namespace) + self.detach() @staticmethod def _add_global(name: str, value: Any, reserved_names=None): @@ -1100,35 +1120,6 @@ def _generate_call_expression(self, address, params): return f'((intptr_t(*)({args_type}))({address}))({args_conv})' - def _ipython_run_cell_hook(self, info): - """ - Enable lazy loading for symbols - :param info: IPython's CellInfo object - """ - if info.raw_cell[0] in ['!', '%'] or info.raw_cell.endswith('?'): - return - - for node in ast.walk(ast.parse(info.raw_cell)): - if not isinstance(node, ast.Name): - # we are only interested in names - continue - - if node.id in locals() or node.id in globals() or node.id in dir(builtins): - # That are undefined - continue - - if not hasattr(SymbolsJar, node.id): - # ignore SymbolsJar properties - try: - symbol = getattr(self.symbols, node.id) - except SymbolAbsentError: - pass - else: - self._add_global( - node.id, - symbol if symbol.type_ != lldb.eSymbolTypeObjCMetaClass else self.objc_get_class(node.id) - ) - @staticmethod def _std_string(value): if struct.unpack("b", (value + 23).peek(1))[0] >= 0: diff --git a/hilda/ipython_extensions/events.py b/hilda/ipython_extensions/events.py new file mode 100644 index 0000000..047640f --- /dev/null +++ b/hilda/ipython_extensions/events.py @@ -0,0 +1,53 @@ +import ast +import builtins + +from IPython.terminal.interactiveshell import TerminalInteractiveShell + +from hilda.exceptions import EvaluatingExpressionError, SymbolAbsentError +from hilda.hilda_client import HildaClient +from hilda.lldb_importer import lldb +from hilda.symbols_jar import SymbolsJar + + +class HIEvents: + def __init__(self, ip: TerminalInteractiveShell): + self.shell = ip + self.hilda_client: HildaClient = self.shell.user_ns['p'] + + def pre_run_cell(self, info): + """ + Enable lazy loading for symbols + :param info: IPython's CellInfo object + """ + if info.raw_cell[0] in ['!', '%'] or info.raw_cell.endswith('?'): + return + for node in ast.walk(ast.parse(info.raw_cell)): + if not isinstance(node, ast.Name): + # we are only interested in names + continue + + if node.id in locals() or node.id in globals() or node.id in dir(builtins): + # That are undefined + continue + + if not hasattr(SymbolsJar, node.id): + # ignore SymbolsJar properties + try: + symbol = getattr(self.hilda_client.symbols, node.id) + except SymbolAbsentError: + pass + else: + try: + self.hilda_client._add_global( + node.id, + symbol if symbol.type_ != lldb.eSymbolTypeObjCMetaClass else self.hilda_client.objc_get_class( + node.id) + ) + except EvaluatingExpressionError: + self.hilda_client.log_warning( + f'Process is running. Pause execution in order to resolve "{node.id}"') + + +def load_ipython_extension(ip: TerminalInteractiveShell): + hie = HIEvents(ip) + ip.events.register('pre_run_cell', hie.pre_run_cell) diff --git a/hilda/ipython_extensions/keybindings.py b/hilda/ipython_extensions/keybindings.py new file mode 100644 index 0000000..6f9f828 --- /dev/null +++ b/hilda/ipython_extensions/keybindings.py @@ -0,0 +1,22 @@ +from prompt_toolkit.enums import DEFAULT_BUFFER +from prompt_toolkit.filters import EmacsInsertMode, HasFocus, HasSelection, ViInsertMode +from prompt_toolkit.keys import Keys + + +def load_ipython_extension(ipython): + def register_keybindings(): + hilda = ipython.user_ns['p'] + keys_mapping = {Keys.F7: hilda.step_into, + Keys.F8: hilda.step_over, + Keys.F9: hilda.cont, + Keys.F10: hilda.stop} + + insert_mode = ViInsertMode() | EmacsInsertMode() + registry = ipython.pt_app.key_bindings + + for key, callback in keys_mapping.items(): + registry.add_binding(key, filter=(HasFocus(DEFAULT_BUFFER) & ~HasSelection() & insert_mode))( + callback) + + register_keybindings() + ipython.events.register('shell_initialized', register_keybindings) diff --git a/hilda/ipython_extensions/magics.py b/hilda/ipython_extensions/magics.py new file mode 100644 index 0000000..a21631c --- /dev/null +++ b/hilda/ipython_extensions/magics.py @@ -0,0 +1,40 @@ +import shlex + +from IPython.core.magic import Magics, line_magic, magics_class, needs_local_scope + + +@magics_class +class HIMagics(Magics): + + @line_magic + @needs_local_scope + def objc(self, line, local_ns=None): + """Load an Objective-C class by name into the IPython session.""" + p = local_ns.get('p') + class_name = line.strip() + if not class_name: + p.log_error("Error: className is required.") + return + try: + local_ns[class_name] = p.objc_get_class(class_name) + p.log_info(f'{class_name} class loaded successfully') + except Exception as e: + p.log_error(f'Error loading class {class_name}: {str(e)}') + + @line_magic + @needs_local_scope + def fbp(self, line, local_ns=None): + """Set a file breakpoint in the debugger.""" + p = local_ns.get('p') + try: + module_name, address = shlex.split(line.strip()) + address = int(address, 16) + p.file_symbol(address, module_name).bp() + except ValueError as ve: + p.log_error(f"Error parsing arguments: {str(ve)}") + except Exception as e: + p.log_error(f"Error setting breakpoint: {str(e)}") + + +def load_ipython_extension(ipython): + ipython.register_magics(HIMagics) diff --git a/hilda/launch_lldb.py b/hilda/launch_lldb.py index 93cd88b..973b086 100644 --- a/hilda/launch_lldb.py +++ b/hilda/launch_lldb.py @@ -1,85 +1,189 @@ import logging import os -from pathlib import Path -from typing import Optional +from abc import ABC, abstractmethod +from threading import Thread +from typing import List, Optional -import click -import coloredlogs +from hilda.exceptions import LLDBException +from hilda.hilda_client import HildaClient +from hilda.lldb_importer import lldb -coloredlogs.install(level=logging.DEBUG) +TIMEOUT = 1 +lldb.hilda_client = None +logger = logging.getLogger(__name__) -def disable_logs() -> None: - logging.getLogger('asyncio').disabled = True - logging.getLogger('parso.cache').disabled = True - logging.getLogger('parso.cache.pickle').disabled = True - logging.getLogger('parso.python.diff').disabled = True - logging.getLogger('humanfriendly.prompts').disabled = True - logging.getLogger('blib2to3.pgen2.driver').disabled = True +def hilda(debugger, startup_files: Optional[List[str]] = None): + if lldb.hilda_client is None: + lldb.hilda_client = HildaClient(debugger) -def execute(cmd: str) -> None: - logging.debug(f'executing: {cmd}') - return os.system(cmd) - - -@click.group() -def cli(): - pass - - -def start_remote(hostname: str, port: int, rc_script: str) -> None: - # connect local LLDB client - commands = [f'process connect connect://{hostname}:{port}', - f'command script import {rc_script}'] - commands = '\n'.join(commands) - execute(f'lldb --one-line "{commands}"') - - -def attach(name: Optional[str] = None, pid: Optional[int] = None, rc_script: Optional[str] = None) -> None: - """ Attach to given process and start an lldb shell """ - commands = [] - if name is not None: - commands.append(f'process attach -n {name}') - elif pid is not None: - commands.append(f'process attach -p {pid}') - else: - print('missing either process name or pid for attaching') - return + additional_namespace = {'ui': lldb.hilda_client.ui_manager, 'cfg': lldb.hilda_client.configs} + lldb.hilda_client.interact(additional_namespace=additional_namespace, startup_files=startup_files) - commands.append(f'command script import {os.path.join(Path(__file__).resolve().parent, "lldb_entrypoint.py")}') - if rc_script is not None: - commands.append(f'command script import {rc_script}') - commands = '\n'.join(commands) - execute(f'lldb --one-line "{commands}"') - - -@cli.command('remote') -@click.argument('hostname', default='localhost') -@click.argument('port', type=click.INT, default=1234) -def remote(hostname: str, port: int) -> None: - """ Connect to remote debugserver at given address """ - start_remote(hostname, port, Path(__file__).resolve().parent / 'lldb_entrypoint.py') - - -@cli.command('bare') -def cli_bare(): - """ Just start an lldb shell """ - # connect local LLDB client - commands = [f'command script import {Path(__file__).resolve().parent / "lldb_entrypoint.py"}'] - commands = '\n'.join(commands) - execute(f'lldb --one-line "{commands}"') - - -@cli.command('attach') -@click.option('-n', '--name', help='process name to attach') -@click.option('-p', '--pid', type=click.INT, help='pid to attach') -def cli_attach(name: str, pid: int): - """ Attach to given process and start an lldb shell """ - attach(name=name, pid=pid) +def execute(cmd: str) -> int: + logging.debug(f'executing: {cmd}') + return os.system(cmd) -if __name__ == '__main__': - disable_logs() - cli() +class LLDBListenerThread(Thread, ABC): + + def __init__(self): + super().__init__() + lldb.SBDebugger.Initialize() + self.debugger: lldb.SBDebugger = lldb.SBDebugger.Create() + self.listener: lldb.SBListener = self.debugger.GetListener() + self.error: lldb.SBError = lldb.SBError() + self.debugger.SetAsync(True) + self.target: lldb.SBTarget = self._create_target() + self.process: lldb.SBProcess = self._create_process() + self._check_success() + self.should_quit = False + + @abstractmethod + def _create_target(self) -> lldb.SBTarget: + pass + + @abstractmethod + def _create_process(self) -> lldb.SBProcess: + pass + + def _check_success(self): + if self.error.Success(): + return + raise LLDBException(self.error.description) + + def run(self): + event = lldb.SBEvent() + last_state = lldb.eStateStopped + while not self.should_quit: + if not self.listener.WaitForEvent(TIMEOUT, event): + continue + if not lldb.SBProcess.EventIsProcessEvent(event): + continue + state = self.process.GetStateFromEvent(event) + if state == lldb.eStateDetached: + logger.debug('Process Detached') + self.should_quit = True + elif state == lldb.eStateExited: + logger.debug(f'Process Exited with status {self.process.GetExitStatus()}') + self.should_quit = True + elif state == lldb.eStateRunning and last_state == lldb.eStateStopped: + logger.debug("Process Continued") + elif state == lldb.eStateStopped and last_state == lldb.eStateRunning: + logger.debug('Process Stopped') + last_state = state + + +class LLDBRemote(LLDBListenerThread): + def __init__(self, address: str, port: int = 1234): + self.url_connect = f'connect://{address}:{port}' + super().__init__() + + def _create_target(self) -> lldb.SBTarget: + return self.debugger.CreateTarget('') + + def _create_process(self) -> lldb.SBProcess: + logger.debug(f'Connecting to "{self.url_connect}"') + return self.target.ConnectRemote(self.listener, self.url_connect, None, self.error) + + +class LLDBAttachPid(LLDBListenerThread): + + def __init__(self, pid: int): + self.pid = pid + super().__init__() + + def _create_target(self) -> lldb.SBTarget: + return self.debugger.CreateTargetWithFileAndArch(None, None) + + def _create_process(self) -> lldb.SBProcess: + logger.debug(f'Attaching to {self.pid}') + return self.target.AttachToProcessWithID(self.listener, self.pid, self.error) + + +class LLDBAttachName(LLDBListenerThread): + + def __init__(self, proc_name: str, wait_for: bool = False): + self.proc_name = proc_name + self.wait_for = wait_for + super().__init__() + + def _create_target(self) -> lldb.SBTarget: + return self.debugger.CreateTargetWithFileAndArch(None, None) + + def _create_process(self) -> lldb.SBProcess: + logger.debug(f'Attaching to {self.name}') + return self.target.AttachToProcessWithName(self.listener, self.proc_name, self.wait_for, self.error) + + +class LLDBLaunch(LLDBListenerThread): + def __init__(self, exec_path: str, argv: Optional[List[str]] = None, envp: Optional[List[str]] = None, + stdin: Optional[str] = None, + stdout: Optional[str] = None, stderr: Optional[str] = None, wd: Optional[str] = None, + flags: Optional[int] = 0, stop_at_entry: Optional[bool] = False): + self.exec_path = exec_path + self.stdout = stdout + self.stdin = stdin + self.stderr = stderr + self.flags = flags + self.stop_at_entry = stop_at_entry + self.argv = argv + self.envp = envp + self.working_directory = wd + super().__init__() + + def _create_target(self) -> lldb.SBTarget: + return self.debugger.CreateTargetWithFileAndArch(self.exec_path, lldb.LLDB_ARCH_DEFAULT) + + def _create_process(self) -> lldb.SBProcess: + # Launch(SBTarget self, SBListener listener, char const ** argv, char const ** envp, + # char const * stdin_path, char const * stdout_path, char const * stderr_path, char const * working_directory, + # uint32_t launch_flags, bool stop_at_entry, SBError error) -> SBProcess + logger.debug(f'Lunching process {self.exec_path}') + return self.target.Launch(self.listener, self.argv, self.envp, + self.stdin, self.stdout, self.stderr, self.working_directory, + self.flags, self.stop_at_entry, + self.error) + + +def remote(hostname: str, port: int, startup_files: Optional[List[str]] = None) -> None: + """ Connect to remote process """ + try: + lldb_t = LLDBRemote(hostname, port) + lldb_t.start() + hilda(lldb_t.debugger, startup_files) + except LLDBException as e: + logger.error(e.message) + + +def attach(name: Optional[str] = None, pid: Optional[int] = None, wait_for: bool = False, + startup_files: Optional[List[str]] = None) -> None: + """ Attach to given process and start a lldb shell """ + if (name is not None and pid is not None) or (name is None and pid is None): + raise ValueError('Provide either a process name or a PID, but not both.') + + try: + if name is not None: + lldb_t = LLDBAttachName(name, wait_for) + else: + lldb_t = LLDBAttachPid(pid) + lldb_t.start() + hilda(lldb_t.debugger, startup_files) + except LLDBException as e: + logger.error(e.message) + + +def launch(exec_path: str, argv: Optional[List] = None, envp: Optional[List] = None, + stdin: Optional[str] = None, + stdout: Optional[str] = None, stderr: Optional[str] = None, wd: Optional[str] = None, + flags: Optional[int] = 0, stop_at_entry: Optional[bool] = False, + startup_files: Optional[List[str]] = None) -> None: + """ Launch to given process and start a lldb shell """ + try: + lldb_t = LLDBLaunch(exec_path, argv, envp, stdin, stdout, stderr, wd, flags, stop_at_entry) + lldb_t.start() + hilda(lldb_t.debugger, startup_files) + except LLDBException as e: + logger.error(e.message) diff --git a/hilda/lldb_entrypoint.py b/hilda/lldb_entrypoint.py index a299510..aa92026 100644 --- a/hilda/lldb_entrypoint.py +++ b/hilda/lldb_entrypoint.py @@ -7,12 +7,12 @@ from hilda.hilda_client import HildaClient -coloredlogs.install(level=logging.DEBUG) - lldb.hilda_client = None def hilda(debugger, command, result, internal_dict): + coloredlogs.install(level=logging.DEBUG) + if lldb.hilda_client is None: lldb.hilda_client = HildaClient(debugger) diff --git a/tests/__main__.py b/tests/__main__.py deleted file mode 100644 index 4d72d93..0000000 --- a/tests/__main__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .hilda_tests import main - -if __name__ == '__main__': - main() diff --git a/tests/conftest.py b/tests/conftest.py index cbc5e1c..58834bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,27 @@ import pytest +from hilda.exceptions import LLDBException from hilda.hilda_client import HildaClient +from hilda.launch_lldb import LLDBAttachName from hilda.lldb_importer import lldb +PROCESS = 'sysmond' + + +@pytest.fixture(scope='session') +def lldb_debugger(request): + lldb_t = None + try: + lldb_t = LLDBAttachName(PROCESS) + lldb_t.start() + yield lldb_t.debugger + except LLDBException as e: + pytest.exit(f'{e.message} - Try use sudo') + finally: + if lldb_t: + lldb_t.process.Detach() + lldb_t.join() + @pytest.fixture(scope='function') def hilda_client(lldb_debugger): diff --git a/tests/hilda_tests.py b/tests/hilda_tests.py deleted file mode 100644 index 96432eb..0000000 --- a/tests/hilda_tests.py +++ /dev/null @@ -1,17 +0,0 @@ -from pathlib import Path - -import click - -from hilda import launch_lldb - -PROCESS = 'sysmond' - - -@click.command() -def main(): - """ Start debugserver at remote device and connect using lldb """ - launch_lldb.attach(name=PROCESS, rc_script=str(Path(__file__).resolve().parent / 'lldb_entrypoint.py')) - - -if __name__ == '__main__': - launch_lldb.remote() diff --git a/tests/lldb_entrypoint.py b/tests/lldb_entrypoint.py deleted file mode 100644 index 4bd853d..0000000 --- a/tests/lldb_entrypoint.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/xcrun python3 - -import logging - -import lldb -import pytest - -logging.getLogger('parso.cache').disabled = True -logging.getLogger('parso.cache.pickle').disabled = True -logging.getLogger('parso.python.diff').disabled = True -logging.getLogger('humanfriendly.prompts').disabled = True - -lldb.hilda_client = None - - -class AddDebuggerPlugin: - def __init__(self, debugger): - self.__name__ = 'AddDebuggerPlugin' - self.debugger = debugger - - @pytest.fixture(scope='session') - def lldb_debugger(self, request): - return request.config.pluginmanager.get_plugin('AddDebuggerPlugin').debugger - - -def run_tests(debugger, command, result, internal_dict): - pytest.main(command.split(), plugins=[AddDebuggerPlugin(debugger)]) - - -def __lldb_init_module(debugger, internal_dict): - debugger.SetAsync(True) - debugger.HandleCommand('command script add -f lldb_entrypoint.run_tests run_tests') - print('Use "run_tests" command')