Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various minor enhancements for the 'console' command #1571

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
11 changes: 11 additions & 0 deletions doc/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,14 @@ like this:
$ labgrid-client -p example allow sirius/john

To remove the allow it is currently necessary to unlock and lock the place.

Internal console
^^^^^^^^^^^^^^^^

Labgrid uses microcom as its console by default. For situations where this is
not suitable, an internal console is provided. To use this, provide the
``--internal`` flag to the ``labgrid client` command.

When the internal console is used, the console transitions cleanly between use
within a strategy or driver, and interactive use for the user. The console is
not closed and therefore there is no loss of data.
175 changes: 96 additions & 79 deletions labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import signal
import sys
import shlex
import shutil
import json
import itertools
from textwrap import indent
Expand Down Expand Up @@ -42,7 +41,7 @@
from ..exceptions import NoDriverFoundError, NoResourceFoundError, InvalidConfigError
from .generated import labgrid_coordinator_pb2, labgrid_coordinator_pb2_grpc
from ..resource.remote import RemotePlaceManager, RemotePlace
from ..util import diff_dict, flat_dict, dump, atomic_replace, labgrid_version, Timeout
from ..util import diff_dict, flat_dict, dump, atomic_replace, labgrid_version, Timeout, term
from ..util.proxy import proxymanager
from ..util.helper import processwrapper
from ..driver import Mode, ExecutionError
Expand Down Expand Up @@ -674,7 +673,7 @@ async def acquire(self):
try:
await self.stub.AcquirePlace(request)
await self.sync_with_coordinator()
print(f"acquired place {place.name}")
logging.info("acquired place %s", place.name)
except grpc.aio.AioRpcError as e:
# check potential failure causes
for exporter, groups in sorted(self.resources.items()):
Expand All @@ -699,6 +698,8 @@ async def release(self):
"""Release a previously acquired place"""
place = self.get_place()
if not place.acquired:
if self.args.auto:
return
raise UserError(f"place {place.name} is not acquired")
_, user = place.acquired.split("/")
if user != self.getuser():
Expand All @@ -716,7 +717,7 @@ async def release(self):
except grpc.aio.AioRpcError as e:
raise ServerError(e.details())

print(f"released place {place.name}")
logging.info("released place %s", place.name)

async def release_from(self):
"""Release a place, but only if acquired by a specific user"""
Expand Down Expand Up @@ -786,6 +787,21 @@ def _prepare_manager(self):
manager.session = self
manager.loop = self.loop

def set_initial_state(self, target):
if self.args.state:
strategy = target.get_driver("Strategy")
if self.args.initial_state:
print(f"Setting initial state to {self.args.initial_state}")
strategy.force(self.args.initial_state)
logging.info("Transitioning into state %s", self.args.state)
strategy.transition(self.args.state)

def set_end_state(self, target):
if self.args.end_state:
strategy = target.get_driver("Strategy")
logging.info("Transitioning into state %s", self.args.end_state)
strategy.transition(self.args.end_state)

def _get_target(self, place):
self._prepare_manager()
target = None
Expand All @@ -796,19 +812,7 @@ def _get_target(self, place):
print(f"Selected role {self.role} from configuration file")
target = self.env.get_target(self.role)
if target:
if self.args.state:
strategy = target.get_driver("Strategy")
if self.args.initial_state:
print(f"Setting initial state to {self.args.initial_state}")
strategy.force(self.args.initial_state)
print(f"Transitioning into state {self.args.state}")
strategy.transition(self.args.state)
# deactivate console drivers so we are able to connect with microcom later
try:
con = target.get_active_driver("ConsoleProtocol")
target.deactivate(con)
except NoDriverFoundError:
pass
self.set_initial_state(target)
else:
target = Target(place.name, env=self.env)
RemotePlace(target, name=place.name)
Expand Down Expand Up @@ -925,78 +929,56 @@ def digital_io(self):
drv.set(False)

async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False):
name = self.args.name
from ..resource import NetworkSerialPort

resource = target.get_resource(NetworkSerialPort, name=name, wait_avail=False)
from ..protocol import ConsoleProtocol

# async await resources
timeout = Timeout(timeout)
while True:
target.update_resources()
if resource.avail or (not loop and timeout.expired):
break
await asyncio.sleep(0.1)

# use zero timeout to prevent blocking sleeps
target.await_resources([resource], timeout=0.0)
name = self.args.name

if not place.acquired:
print("place released")
return 255

host, port = proxymanager.get_host_and_port(resource)

# check for valid resources
assert port is not None, "Port is not set"

microcom_bin = shutil.which("microcom")

if microcom_bin is not None:
call = [microcom_bin, "-s", str(resource.speed), "-t", f"{host}:{port}"]

if listen_only:
call.append("--listenonly")

if logfile:
call.append(f"--logfile={logfile}")
if self.args.internal or os.environ.get('LG_CONSOLE') == 'internal':
console = target.get_driver(ConsoleProtocol, name=name)
returncode = await term.internal(lambda: self.is_allowed(place),
console, logfile, listen_only)
else:
call = ["telnet", host, str(port)]
from ..resource import NetworkSerialPort

logging.info("microcom not available, using telnet instead")
# deactivate console drivers so we are able to connect with microcom
try:
con = target.get_active_driver("ConsoleProtocol")
target.deactivate(con)
except NoDriverFoundError:
pass

if listen_only:
logging.warning("--listenonly option not supported by telnet, ignoring")
resource = target.get_resource(NetworkSerialPort, name=name,
wait_avail=False)

if logfile:
logging.warning("--logfile option not supported by telnet, ignoring")
# async await resources
timeout = Timeout(timeout)
while True:
target.update_resources()
if resource.avail or (not loop and timeout.expired):
break
await asyncio.sleep(0.1)

print(f"connecting to {resource} calling {' '.join(call)}")
try:
p = await asyncio.create_subprocess_exec(*call)
except FileNotFoundError as e:
raise ServerError(f"failed to execute remote console command: {e}")
while p.returncode is None:
try:
await asyncio.wait_for(p.wait(), 1.0)
except asyncio.TimeoutError:
# subprocess is still running
pass
# use zero timeout to prevent blocking sleeps
target.await_resources([resource], timeout=0.0)
host, port = proxymanager.get_host_and_port(resource)

# check for valid resources
assert port is not None, "Port is not set"
try:
self._check_allowed(place)
except UserError:
p.terminate()
try:
await asyncio.wait_for(p.wait(), 1.0)
except asyncio.TimeoutError:
# try harder
p.kill()
await asyncio.wait_for(p.wait(), 1.0)
raise
if p.returncode:
print("connection lost", file=sys.stderr)
return p.returncode
returncode = await term.external(lambda: self.is_allowed(place),
host, port, resource, logfile,
listen_only)
except FileNotFoundError as e:
raise ServerError(f"failed to execute remote console command: {e}")

# Raise an exception if the place was released
self._check_allowed(place)
return returncode


async def console(self, place, target):
while True:
Expand All @@ -1008,7 +990,7 @@ async def console(self, place, target):
break
if not self.args.loop:
if res:
exc = InteractiveCommandError("microcom error")
exc = InteractiveCommandError("console error")
exc.exitcode = res
raise exc
break
Expand Down Expand Up @@ -1673,6 +1655,7 @@ def main():
place = os.environ.get("LG_PLACE", place)
state = os.environ.get("STATE", None)
state = os.environ.get("LG_STATE", state)
end_state = os.environ.get('LG_END_STATE', state)
initial_state = os.environ.get("LG_INITIAL_STATE", None)
token = os.environ.get("LG_TOKEN", None)

Expand All @@ -1684,6 +1667,13 @@ def main():
type=str,
help="coordinator HOST[:PORT] (default: value from env variable LG_COORDINATOR, otherwise 127.0.0.1:20408)",
)
parser.add_argument(
'-a',
'--acquire',
action='store_true',
default=False,
help="acquire place before starting and release after finishing"
)
parser.add_argument("-c", "--config", type=str, default=os.environ.get("LG_ENV"), help="config file")
parser.add_argument("-p", "--place", type=str, default=place, help="place name/alias")
parser.add_argument("-s", "--state", type=str, default=state, help="strategy state to switch into before command")
Expand All @@ -1694,6 +1684,13 @@ def main():
default=initial_state,
help="strategy state to force into before switching to desired state",
)
parser.add_argument(
'-e',
'--end-state',
type=str,
default=end_state,
help="strategy state to switch into after command"
)
parser.add_argument(
"-d", "--debug", action="store_true", default=False, help="enable debug mode (show python tracebacks)"
)
Expand Down Expand Up @@ -1781,6 +1778,8 @@ def main():
subparser.set_defaults(func=ClientSession.acquire)

subparser = subparsers.add_parser("release", aliases=("unlock",), help="release a place")
subparser.add_argument('-a', '--auto', action='store_true',
help="don't raise an error if the place is not acquired")
subparser.add_argument(
"-k", "--kick", action="store_true", help="release a place even if it is acquired by a different user"
)
Expand Down Expand Up @@ -1813,6 +1812,8 @@ def main():
subparser.set_defaults(func=ClientSession.digital_io)

subparser = subparsers.add_parser("console", aliases=("con",), help="connect to the console")
subparser.add_argument('-i', '--internal', action='store_true',
help="use an internal console instead of microcom")
subparser.add_argument(
"-l", "--loop", action="store_true", help="keep trying to connect if the console is unavailable"
)
Expand Down Expand Up @@ -2067,7 +2068,7 @@ def main():
if not role:
print(f"RemotePlace {args.place} not found in configuration file", file=sys.stderr)
exit(1)
print(f"Selected role {role} from configuration file")
logging.info("Selected role %s from configuration file", role)
else:
role, args.place = find_any_role_with_place(env.config.get_targets())
if not role:
Expand Down Expand Up @@ -2101,13 +2102,26 @@ def main():

try:
if asyncio.iscoroutinefunction(args.func):
auto_release = False
target = None
if getattr(args.func, "needs_target", False):
if args.acquire:
place = session.get_place(args.place)
if not place.acquired:
args.allow_unmatched = True
coro = session.acquire()
session.loop.run_until_complete(coro)
auto_release = True
place = session.get_acquired_place()
target = session._get_target(place)
coro = args.func(session, place, target)
else:
coro = args.func(session)
session.loop.run_until_complete(coro)
session.set_end_state(target)
if auto_release:
coro = session.release()
session.loop.run_until_complete(coro)
else:
args.func(session)
finally:
Expand Down Expand Up @@ -2142,7 +2156,10 @@ def main():
"This is likely caused by an error in the environment configuration or invalid\nresource information provided by the coordinator.",
file=sys.stderr,
) # pylint: disable=line-too-long

except subprocess.CalledProcessError as exc:
print(f"Command failure: {' '.join(exc.cmd)}")
for line in exc.output.splitlines():
print(line.decode('utf-8'))
exitcode = 1
except ServerError as e:
print(f"Server error: {e}", file=sys.stderr)
Expand Down
Loading
Loading