Skip to content

Commit

Permalink
Unified command execution acress session types (#18)
Browse files Browse the repository at this point in the history
- Added `execute_in_shell` method to each session
- Removed payload option from `upgrade_to_meterpreter`
- Bumped version
  • Loading branch information
SadParad1se authored May 20, 2024
1 parent 42714ab commit 9bf3259
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "snek-sploit"
version = "0.6.0"
version = "0.7.0"
description = "Python RPC client for Metasploit Framework"
authors = [
"Jiří Rája <[email protected]>"
Expand Down
61 changes: 43 additions & 18 deletions snek_sploit/lib/rpc/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,18 @@ def execute(
) -> str:
pass

@abstractmethod
def execute_in_shell(
self,
executable: str,
arguments: List[str],
minimal_execution_time: float,
timeout: float,
success_flags: List[str],
reading_delay: float,
) -> str:
pass

def gather_output(
self,
minimal_execution_time: float = 3,
Expand Down Expand Up @@ -503,16 +515,8 @@ def write(self, data: str) -> bool:
def read(self) -> str:
return self._rpc.shell_read(self.id)

def upgrade_to_meterpreter(self, local_host: str, local_port: int, payload: str = None) -> bool:
# TODO: Custom update_to_meterpreter implementation? https://github.com/rapid7/metasploit-framework/issues/8800
# The current one fails since its unable to use x64 instead of x86 architecture, we could add an optional
# payload argument, which would mitigate this issue partially
if payload is None:
return self._rpc.shell_upgrade(self.id, local_host, local_port)

# TODO: set PAYLOAD_OVERRIDE: linux/x64/meterpreter/reverse_tcp,
# module: post/multi/manage/shell_to_meterpreter
return False
def upgrade_to_meterpreter(self, local_host: str, local_port: int) -> bool:
return self._rpc.shell_upgrade(self.id, local_host, local_port)

def execute(
self,
Expand All @@ -527,12 +531,21 @@ def execute(

return self.gather_output(minimal_execution_time, timeout, success_flags, reading_delay)

def execute_in_shell(
self,
executable: str,
arguments: List[str],
minimal_execution_time: float = 3,
timeout: float = None,
success_flags: List[str] = None,
reading_delay: float = 1,
) -> str:
command = " ".join([executable, *arguments])

return self.execute(command, minimal_execution_time, timeout, success_flags, reading_delay)


class SessionMeterpreter(Session):
# TODO: execute and execute_in_shell are partially finished, since they have to be tested with the x64 payload.
# If it works seamlessly, then good. However, even if it works, try to play with the run_single and channel
# combination a bit more. Also, make sure to remove the process ID, or save it into memory or return it. Right
# now it poisons the command output.
@property
def directory_separator(self):
return self._rpc.meterpreter_directory_separator(self.id)
Expand Down Expand Up @@ -573,17 +586,16 @@ def execute(

def execute_in_shell(
self,
command: str,
executable: str,
arguments: List[str],
minimal_execution_time: float = 3,
timeout: float = None,
success_flags: List[str] = None,
reading_delay: float = 1,
) -> str:
# TODO: Remove the process ID from the output? eg. add postprocessing?
self.clear_buffer()

# TODO: test with custom x64 payload
self.run_single(f"execute -f {command} -c -i -a {' '.join(arguments)}")
self.run_single(f"execute -f {executable} -c -i -a {' '.join(arguments)}")

return self.gather_output(minimal_execution_time, timeout, success_flags, reading_delay)

Expand All @@ -609,6 +621,19 @@ def execute(

return self.gather_output(minimal_execution_time, timeout, success_flags, reading_delay)

def execute_in_shell(
self,
executable: str,
arguments: List[str],
minimal_execution_time: float = 3,
timeout: float = None,
success_flags: List[str] = None,
reading_delay: float = 1,
) -> str:
command = " ".join([executable, *arguments])

return self.execute(command, minimal_execution_time, timeout, success_flags, reading_delay)


class Sessions(ContextBase):
def __init__(self, context: Context):
Expand Down

0 comments on commit 9bf3259

Please sign in to comment.