Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/rshell_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: MIT
import logging
logging.basicConfig(level=logging.DEBUG)
from mfd_connect.rshell import RShellConnection

# LINUX
conn = RShellConnection(ip="10.10.10.10") # start and connect to rshell server
# conn = RShellConnection(ip="10.10.10.10", server_ip="10.10.10.11") # connect to rshell server
conn.execute_command("ls")
# print(conn.execute_command("end")) # stop client
conn.disconnect(True)
234 changes: 234 additions & 0 deletions mfd_connect/rshell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: MIT
import logging
import sys
import time
import typing
from ipaddress import IPv4Address, IPv6Address
from subprocess import CalledProcessError

import requests
from mfd_common_libs import add_logging_level, log_levels, TimeoutCounter
from mfd_typing.cpu_values import CPUArchitecture
from mfd_typing.os_values import OSBitness, OSName, OSType

from mfd_connect.local import LocalConnection
from mfd_connect.pathlib.path import CustomPath, custom_path_factory
from mfd_connect.process.base import RemoteProcess

from .base import Connection, ConnectionCompletedProcess

if typing.TYPE_CHECKING:
from pydantic import (
BaseModel, # from pytest_mfd_config.models.topology import ConnectionModel
)


logger = logging.getLogger(__name__)
add_logging_level(level_name="MODULE_DEBUG", level_value=log_levels.MODULE_DEBUG)
add_logging_level(level_name="CMD", level_value=log_levels.CMD)
add_logging_level(level_name="OUT", level_value=log_levels.OUT)


class RShellConnection(Connection):
def __init__(
self,
ip: str | IPv4Address | IPv6Address,
server_ip: str | IPv4Address | IPv6Address | None = "127.0.0.1",
model: "BaseModel | None" = None,
cache_system_data: bool = True,
connection_timeout: int = 60,
):
"""
Initialize RShellConnection.

:param ip: The IP address of the RShell server.
:param server_ip: The IP address of the server to connect to (optional).
:param model: The Pydantic model to use for the connection (optional).
:param cache_system_data: Whether to cache system data (default: True).
"""
super().__init__(model=model, cache_system_data=cache_system_data)
self._ip = ip
self.server_ip = server_ip if server_ip else "127.0.0.1"
self.server_process = None
if server_ip == "127.0.0.1":
# start Rshell server
self.server_process = self._run_server()
time.sleep(5)
timeout = TimeoutCounter(connection_timeout)
while not timeout:
logger.log(level=log_levels.MODULE_DEBUG, msg="Checking RShell server health")
status_code = requests.get(f"http://{self.server_ip}/health/{self._ip}", proxies={"no_proxy":"*"}).status_code
if status_code == 200:
logger.log(level=log_levels.MODULE_DEBUG, msg="RShell server is healthy")
break
time.sleep(5)
else:
raise TimeoutError("Connection of Client to RShell server timed out")


def disconnect(self, stop_client: bool = False) -> None:
"""
Disconnect connection.

Stop local RShell server if established.

:param stop_client: Whether to stop the RShell client (default: False).
"""
if stop_client:
logger.log(level=log_levels.MODULE_DEBUG, msg="Stopping RShell client")
self.execute_command("end")
if self.server_process:
logger.log(level=log_levels.MODULE_DEBUG, msg="Stopping RShell server")
self.server_process.kill()
logger.log(level=log_levels.MODULE_DEBUG, msg="RShell server stopped")
logger.log(level=log_levels.MODULE_DEBUG, msg=self.server_process.stdout_text)

def _run_server(self) -> RemoteProcess:
"""Run RShell server locally."""
conn = LocalConnection()
server_file = conn.path(__file__).parent / "rshell_server.py"
return conn.start_process(f"{conn.modules().sys.executable} {server_file}")

def execute_command(
self,
command: str,
*,
input_data: str | None = None,
cwd: str | None = None,
timeout: int | None = None,
env: dict | None = None,
stderr_to_stdout: bool = False,
discard_stdout: bool = False,
discard_stderr: bool = False,
skip_logging: bool = False,
expected_return_codes=...,
shell: bool = False,
custom_exception: type[CalledProcessError] | None = None,
) -> ConnectionCompletedProcess:
"""
Execute a command on the remote server.

:param command: The command to execute.
:param timeout: The timeout for the command execution (optional).
:return: The result of the command execution.
"""
if input_data is not None:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Input data is not supported for RShellConnection and will be ignored.",
)

if cwd is not None:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="CWD is not supported for RShellConnection and will be ignored.",
)

if env is not None:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Environment variables are not supported for RShellConnection and will be ignored.",
)

if env is not None:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Environment variables are not supported for RShellConnection and will be ignored.",
)
Comment on lines +134 to +138
Copy link

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check for env is not None is duplicated. The second occurrence (lines 134-138) should be removed as it's identical to lines 128-132.

Suggested change
if env is not None:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Environment variables are not supported for RShellConnection and will be ignored.",
)

Copilot uses AI. Check for mistakes.


if stderr_to_stdout:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Redirecting stderr to stdout is not supported for RShellConnection and will be ignored.",
)

if discard_stdout:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Discarding stdout is not supported for RShellConnection and will be ignored.",
)

if discard_stderr:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Discarding stderr is not supported for RShellConnection and will be ignored.",
)

if skip_logging:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Skipping logging is not supported for RShellConnection and will be ignored.",
)

if expected_return_codes is not None:
Copy link

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter expected_return_codes uses ellipsis (...) as default value but is checked against None. This will always be False since ellipsis is not None.

Copilot uses AI. Check for mistakes.

logger.log(
level=log_levels.MODULE_DEBUG,
msg="Expected return codes are not supported for RShellConnection and will be ignored.",
)

if shell:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Shell execution is not supported for RShellConnection and will be ignored.",
)

if custom_exception:
logger.log(
level=log_levels.MODULE_DEBUG,
msg="Custom exceptions are not supported for RShellConnection and will be ignored.",
)
timeout_string = " " if timeout is None else f" with timeout {timeout} seconds"
logger.log(level=log_levels.CMD, msg=f"Executing >{self._ip}> '{command}', {timeout_string}")
Comment on lines +181 to +182
Copy link

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message formatting is incorrect. It should be f\"Executing >{self._ip}> '{command}'{timeout_string}\" to properly concatenate the timeout information.

Suggested change
timeout_string = " " if timeout is None else f" with timeout {timeout} seconds"
logger.log(level=log_levels.CMD, msg=f"Executing >{self._ip}> '{command}', {timeout_string}")
logger.log(level=log_levels.CMD, msg=f"Executing >{self._ip}> '{command}'{timeout_string}")

Copilot uses AI. Check for mistakes.


response = requests.post(
f"http://{self.server_ip}/execute_command",
data={"command": command, "timeout": timeout, "ip": self._ip},proxies={"no_proxy":"*"},
)
completed_process = ConnectionCompletedProcess(
args=command,
stdout=response.text,
return_code=int(response.headers.get("rc", -1)),
)
logger.log(
level=log_levels.MODULE_DEBUG,
msg=f"Finished executing '{command}', rc={completed_process.return_code}",
)
if skip_logging:
return completed_process

stdout = completed_process.stdout
if stdout:
logger.log(level=log_levels.OUT, msg=f"stdout>>\n{stdout}")

return completed_process


def path(self, *args, **kwargs) -> CustomPath:
"""Path represents a filesystem path."""
if sys.version_info >= (3, 12):
kwargs["owner"] = self
return custom_path_factory(*args, **kwargs)

return CustomPath(*args, owner=self, **kwargs)

def get_os_name(self) -> OSName:
raise NotImplementedError

def get_os_type(self) -> OSType:
raise NotImplementedError

def get_os_bitness(self) -> OSBitness:
raise NotImplementedError

def get_cpu_architecture(self) -> CPUArchitecture:
raise NotImplementedError

def restart_platform(self) -> None:
raise NotImplementedError

def shutdown_platform(self) -> None:
raise NotImplementedError

def wait_for_host(self, timeout: int = 60) -> None:
raise NotImplementedError
118 changes: 118 additions & 0 deletions mfd_connect/rshell_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: MIT
"""This is a sample demonstration HTTP client application
that can run on UEFI shell with the help of Python UEFI
interpreter.
Make sure that the Python UEFI interpreter is compiled with
Socket module support.
"""

__version__ = "1.0.0"

try:
import httplib as client
except ImportError:
from http import client
import sys
import os
import time

# get http server ip
http_server = sys.argv[1]
if len(sys.argv) >2:
source_address = sys.argv[2]
else:
source_address = None

os_name = os.name


def _sleep(interval):
"""This API simulates the sleep function for EFI shell
as the sleep API from time module is not working on
EFI shell
:param interval time period the system to be in idle
"""
start_ts = time.time()
while time.time() < start_ts + interval:
pass


time.sleep = _sleep


def _get_command():
# construct the list of tests by interacting with server
conn.request("GET", "getCommandToExecute")
rsp = conn.getresponse()
status = rsp.status
_id = rsp.getheader("CommandID")
if status == 204:
return None

print("Waiting for command from server: ")
data_received = rsp.read()
print(data_received)
test_list = data_received.split(b",")

return test_list[0], _id # return only the first command


while True:
# Connect to server
source_address_parameter = (source_address, 80) if source_address else None
conn = client.HTTPConnection(http_server, source_address=source_address_parameter)
# get the command from server
_command = _get_command()
if not _command:
conn.close()
time.sleep(5)
continue
cmd_str, _id = _command
cmd_str = cmd_str.decode("utf-8")
cmd_name = cmd_str.split(" ")[0]
if cmd_name == "end":
print("No more commands available to run")
conn.close()
exit(0)

print("Executing", cmd_str)

out = cmd_name + ".txt"
cmd = cmd_str + " > " + out

time.sleep(5)
rc = os.system(cmd) # execute command on machine
print("Executed the command")
time.sleep(5)

print("Posting the results to server")
# send response to server
try:
if os_name == "edk2":
encoding = "utf-16"
else:
encoding = "utf-8"

f = open(out, "r", encoding=encoding)

conn.request(
"POST",
"post_result",
body=f.read(),
headers={"Content-Type": "text/plain", "Connection": "keep-alive", "CommandID": _id, "rc": rc},
)
f.close()
os.system("del " + out)
except Exception as exp:
conn.request(
"POST",
"exception",
body=cmd + str(exp),
headers={"Content-Type": "text/plain", "Connection": "keep-alive", "CommandID": _id},
)

print("output posted to server")
conn.close()
print("closed the connection")
time.sleep(1)
Loading