diff --git a/doc/usage.rst b/doc/usage.rst index 9d423d50b..bae067843 100644 --- a/doc/usage.rst +++ b/doc/usage.rst @@ -872,3 +872,14 @@ like this: $ labgrid-client -p example allow sirius/john To remove the allow it is currently necessary to unlock and lock the place. + +Internal console +^^^^^^^^^^^^^^^^ + +Labgrid uses microcom as its console by default. For situations where this is +not suitable, an internal console is provided. To use this, provide the +``--internal`` flag to the ``labgrid client` command. + +When the internal console is used, the console transitions cleanly between use +within a strategy or driver, and interactive use for the user. The console is +not closed and therefore there is no loss of data. diff --git a/labgrid/remote/client.py b/labgrid/remote/client.py index 5ab4f0683..5a1578e3e 100755 --- a/labgrid/remote/client.py +++ b/labgrid/remote/client.py @@ -14,7 +14,6 @@ import signal import sys import shlex -import shutil import json import itertools from textwrap import indent @@ -42,7 +41,7 @@ from ..exceptions import NoDriverFoundError, NoResourceFoundError, InvalidConfigError from .generated import labgrid_coordinator_pb2, labgrid_coordinator_pb2_grpc from ..resource.remote import RemotePlaceManager, RemotePlace -from ..util import diff_dict, flat_dict, dump, atomic_replace, labgrid_version, Timeout +from ..util import diff_dict, flat_dict, dump, atomic_replace, labgrid_version, Timeout, term from ..util.proxy import proxymanager from ..util.helper import processwrapper from ..driver import Mode, ExecutionError @@ -674,7 +673,7 @@ async def acquire(self): try: await self.stub.AcquirePlace(request) await self.sync_with_coordinator() - print(f"acquired place {place.name}") + logging.info("acquired place %s", place.name) except grpc.aio.AioRpcError as e: # check potential failure causes for exporter, groups in sorted(self.resources.items()): @@ -699,6 +698,8 @@ async def release(self): """Release a previously acquired place""" place = self.get_place() if not place.acquired: + if self.args.auto: + return raise UserError(f"place {place.name} is not acquired") _, user = place.acquired.split("/") if user != self.getuser(): @@ -716,7 +717,7 @@ async def release(self): except grpc.aio.AioRpcError as e: raise ServerError(e.details()) - print(f"released place {place.name}") + logging.info("released place %s", place.name) async def release_from(self): """Release a place, but only if acquired by a specific user""" @@ -786,6 +787,21 @@ def _prepare_manager(self): manager.session = self manager.loop = self.loop + def set_initial_state(self, target): + if self.args.state: + strategy = target.get_driver("Strategy") + if self.args.initial_state: + print(f"Setting initial state to {self.args.initial_state}") + strategy.force(self.args.initial_state) + logging.info("Transitioning into state %s", self.args.state) + strategy.transition(self.args.state) + + def set_end_state(self, target): + if self.args.end_state: + strategy = target.get_driver("Strategy") + logging.info("Transitioning into state %s", self.args.end_state) + strategy.transition(self.args.end_state) + def _get_target(self, place): self._prepare_manager() target = None @@ -796,19 +812,7 @@ def _get_target(self, place): print(f"Selected role {self.role} from configuration file") target = self.env.get_target(self.role) if target: - if self.args.state: - strategy = target.get_driver("Strategy") - if self.args.initial_state: - print(f"Setting initial state to {self.args.initial_state}") - strategy.force(self.args.initial_state) - print(f"Transitioning into state {self.args.state}") - strategy.transition(self.args.state) - # deactivate console drivers so we are able to connect with microcom later - try: - con = target.get_active_driver("ConsoleProtocol") - target.deactivate(con) - except NoDriverFoundError: - pass + self.set_initial_state(target) else: target = Target(place.name, env=self.env) RemotePlace(target, name=place.name) @@ -925,78 +929,56 @@ def digital_io(self): drv.set(False) async def _console(self, place, target, timeout, *, logfile=None, loop=False, listen_only=False): - name = self.args.name - from ..resource import NetworkSerialPort - - resource = target.get_resource(NetworkSerialPort, name=name, wait_avail=False) + from ..protocol import ConsoleProtocol - # async await resources - timeout = Timeout(timeout) - while True: - target.update_resources() - if resource.avail or (not loop and timeout.expired): - break - await asyncio.sleep(0.1) - - # use zero timeout to prevent blocking sleeps - target.await_resources([resource], timeout=0.0) + name = self.args.name if not place.acquired: print("place released") return 255 - host, port = proxymanager.get_host_and_port(resource) - - # check for valid resources - assert port is not None, "Port is not set" - - microcom_bin = shutil.which("microcom") - - if microcom_bin is not None: - call = [microcom_bin, "-s", str(resource.speed), "-t", f"{host}:{port}"] - - if listen_only: - call.append("--listenonly") - - if logfile: - call.append(f"--logfile={logfile}") + if self.args.internal or os.environ.get('LG_CONSOLE') == 'internal': + console = target.get_driver(ConsoleProtocol, name=name) + returncode = await term.internal(lambda: self.is_allowed(place), + console, logfile, listen_only) else: - call = ["telnet", host, str(port)] + from ..resource import NetworkSerialPort - logging.info("microcom not available, using telnet instead") + # deactivate console drivers so we are able to connect with microcom + try: + con = target.get_active_driver("ConsoleProtocol") + target.deactivate(con) + except NoDriverFoundError: + pass - if listen_only: - logging.warning("--listenonly option not supported by telnet, ignoring") + resource = target.get_resource(NetworkSerialPort, name=name, + wait_avail=False) - if logfile: - logging.warning("--logfile option not supported by telnet, ignoring") + # async await resources + timeout = Timeout(timeout) + while True: + target.update_resources() + if resource.avail or (not loop and timeout.expired): + break + await asyncio.sleep(0.1) - print(f"connecting to {resource} calling {' '.join(call)}") - try: - p = await asyncio.create_subprocess_exec(*call) - except FileNotFoundError as e: - raise ServerError(f"failed to execute remote console command: {e}") - while p.returncode is None: - try: - await asyncio.wait_for(p.wait(), 1.0) - except asyncio.TimeoutError: - # subprocess is still running - pass + # use zero timeout to prevent blocking sleeps + target.await_resources([resource], timeout=0.0) + host, port = proxymanager.get_host_and_port(resource) + # check for valid resources + assert port is not None, "Port is not set" try: - self._check_allowed(place) - except UserError: - p.terminate() - try: - await asyncio.wait_for(p.wait(), 1.0) - except asyncio.TimeoutError: - # try harder - p.kill() - await asyncio.wait_for(p.wait(), 1.0) - raise - if p.returncode: - print("connection lost", file=sys.stderr) - return p.returncode + returncode = await term.external(lambda: self.is_allowed(place), + host, port, resource, logfile, + listen_only) + except FileNotFoundError as e: + raise ServerError(f"failed to execute remote console command: {e}") + + # Raise an exception if the place was released + self._check_allowed(place) + return returncode + async def console(self, place, target): while True: @@ -1008,7 +990,7 @@ async def console(self, place, target): break if not self.args.loop: if res: - exc = InteractiveCommandError("microcom error") + exc = InteractiveCommandError("console error") exc.exitcode = res raise exc break @@ -1673,6 +1655,7 @@ def main(): place = os.environ.get("LG_PLACE", place) state = os.environ.get("STATE", None) state = os.environ.get("LG_STATE", state) + end_state = os.environ.get('LG_END_STATE', state) initial_state = os.environ.get("LG_INITIAL_STATE", None) token = os.environ.get("LG_TOKEN", None) @@ -1684,6 +1667,13 @@ def main(): type=str, help="coordinator HOST[:PORT] (default: value from env variable LG_COORDINATOR, otherwise 127.0.0.1:20408)", ) + parser.add_argument( + '-a', + '--acquire', + action='store_true', + default=False, + help="acquire place before starting and release after finishing" + ) parser.add_argument("-c", "--config", type=str, default=os.environ.get("LG_ENV"), help="config file") parser.add_argument("-p", "--place", type=str, default=place, help="place name/alias") parser.add_argument("-s", "--state", type=str, default=state, help="strategy state to switch into before command") @@ -1694,6 +1684,13 @@ def main(): default=initial_state, help="strategy state to force into before switching to desired state", ) + parser.add_argument( + '-e', + '--end-state', + type=str, + default=end_state, + help="strategy state to switch into after command" + ) parser.add_argument( "-d", "--debug", action="store_true", default=False, help="enable debug mode (show python tracebacks)" ) @@ -1781,6 +1778,8 @@ def main(): subparser.set_defaults(func=ClientSession.acquire) subparser = subparsers.add_parser("release", aliases=("unlock",), help="release a place") + subparser.add_argument('-a', '--auto', action='store_true', + help="don't raise an error if the place is not acquired") subparser.add_argument( "-k", "--kick", action="store_true", help="release a place even if it is acquired by a different user" ) @@ -1813,6 +1812,8 @@ def main(): subparser.set_defaults(func=ClientSession.digital_io) subparser = subparsers.add_parser("console", aliases=("con",), help="connect to the console") + subparser.add_argument('-i', '--internal', action='store_true', + help="use an internal console instead of microcom") subparser.add_argument( "-l", "--loop", action="store_true", help="keep trying to connect if the console is unavailable" ) @@ -2067,7 +2068,7 @@ def main(): if not role: print(f"RemotePlace {args.place} not found in configuration file", file=sys.stderr) exit(1) - print(f"Selected role {role} from configuration file") + logging.info("Selected role %s from configuration file", role) else: role, args.place = find_any_role_with_place(env.config.get_targets()) if not role: @@ -2101,13 +2102,26 @@ def main(): try: if asyncio.iscoroutinefunction(args.func): + auto_release = False + target = None if getattr(args.func, "needs_target", False): + if args.acquire: + place = session.get_place(args.place) + if not place.acquired: + args.allow_unmatched = True + coro = session.acquire() + session.loop.run_until_complete(coro) + auto_release = True place = session.get_acquired_place() target = session._get_target(place) coro = args.func(session, place, target) else: coro = args.func(session) session.loop.run_until_complete(coro) + session.set_end_state(target) + if auto_release: + coro = session.release() + session.loop.run_until_complete(coro) else: args.func(session) finally: @@ -2142,7 +2156,10 @@ def main(): "This is likely caused by an error in the environment configuration or invalid\nresource information provided by the coordinator.", file=sys.stderr, ) # pylint: disable=line-too-long - + except subprocess.CalledProcessError as exc: + print(f"Command failure: {' '.join(exc.cmd)}") + for line in exc.output.splitlines(): + print(line.decode('utf-8')) exitcode = 1 except ServerError as e: print(f"Server error: {e}", file=sys.stderr) diff --git a/labgrid/util/term.py b/labgrid/util/term.py new file mode 100644 index 000000000..3fbbe8a40 --- /dev/null +++ b/labgrid/util/term.py @@ -0,0 +1,196 @@ +"""Terminal handling, using microcom, telnet or an internal function""" + +import asyncio +import collections +import logging +import os +import sys +import shutil +import termios +import time + +from pexpect import TIMEOUT +from serial.serialutil import SerialException + +EXIT_CHAR = 0x1d # FS (Ctrl + ]) + +async def external(check_allowed, host, port, resource, logfile, listen_only): + """Start an external terminal sessions + + This uses microcom if available, otherwise falls back to telnet. + + Args: + check_allowed (lambda): Function to call to make sure the terminal is + still accessible. No args. Returns True if allowed, False if not. + host (str): Host name to connect to + port (int): Port number to connect to + resource (str): Serial resource to connect to (used to get speed / name) + logfile (str): Logfile to write output too, or None. This is ignored if + telnet is used + listen_only (bool): True to ignore keyboard input (ignored with telnet) + + Returns: + int: Return code from tool + """ + microcom_bin = shutil.which("microcom") + + if microcom_bin is not None: + call = [microcom_bin, "-s", str(resource.speed), "-t", f"{host}:{port}"] + + if listen_only: + call.append("--listenonly") + + if logfile: + call.append(f"--logfile={logfile}") + else: + call = ["telnet", host, str(port)] + + logging.info("microcom not available, using telnet instead") + + if listen_only: + logging.warning("--listenonly option not supported by telnet, ignoring") + + if logfile: + logging.warning("--logfile option not supported by telnet, ignoring") + + if logfile: + call.append(f"--logfile={logfile}") + logging.info("connecting to %s calling %s", resource, ' '.join(call)) + p = await asyncio.create_subprocess_exec(*call) + while p.returncode is None: + try: + await asyncio.wait_for(p.wait(), 1.0) + except asyncio.TimeoutError: + # subprocess is still running + pass + + if check_allowed(): + p.terminate() + try: + await asyncio.wait_for(p.wait(), 1.0) + except asyncio.TimeoutError: + # try harder + p.kill() + await asyncio.wait_for(p.wait(), 1.0) + break + if p.returncode: + print("connection lost", file=sys.stderr) + return p.returncode + + +BUF_SIZE = 1024 + +async def run(check_allowed, cons, log_fd, listen_only): + prev = collections.deque(maxlen=2) + + deadline = None + to_cons = b'' + next_cons = time.monotonic() + txdelay = cons.txdelay + + # Show a message to indicate we are waiting for output from the board + msg = 'Terminal ready...press Ctrl-] twice to exit' + sys.stdout.write(msg) + sys.stdout.flush() + erase_msg = '\b' * len(msg) + ' ' * len(msg) + '\b' * len(msg) + have_output = False + + while True: + activity = bool(to_cons) + try: + data = cons.read(size=BUF_SIZE, timeout=0.001) + if data: + activity = True + if not have_output: + # Erase our message + sys.stdout.write(erase_msg) + sys.stdout.flush() + have_output = True + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + if log_fd: + log_fd.write(data) + log_fd.flush() + + except TIMEOUT: + pass + + except SerialException: + break + + if not listen_only: + data = os.read(sys.stdin.fileno(), BUF_SIZE) + if data: + activity = True + if not deadline: + deadline = time.monotonic() + .5 # seconds + prev.extend(data) + count = prev.count(EXIT_CHAR) + if count == 2: + break + + to_cons += data + + if to_cons and time.monotonic() > next_cons: + cons._write(to_cons[:1]) + to_cons = to_cons[1:] + if txdelay: + next_cons += txdelay + + if deadline and time.monotonic() > deadline: + prev.clear() + deadline = None + if check_allowed(): + break + if not activity: + time.sleep(.001) + + # Blank line to move past any partial output + print() + + +async def internal(check_allowed, cons, logfile, listen_only): + """Start an external terminal sessions + + This uses microcom if available, otherwise falls back to telnet. + + Args: + check_allowed (lambda): Function to call to make sure the terminal is + still accessible. No args. Returns True if allowed, False if not. + cons (str): ConsoleProtocol device to read/write + logfile (str): Logfile to write output too, or None + listen_only (bool): True to ignore keyboard input + + Return: + int: Result code + """ + returncode = 0 + old = None + try: + if not listen_only and os.isatty(sys.stdout.fileno()): + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + new = termios.tcgetattr(fd) + new[3] = new[3] & ~(termios.ICANON | termios.ECHO | termios.ISIG) + new[6][termios.VMIN] = 0 + new[6][termios.VTIME] = 0 + termios.tcsetattr(fd, termios.TCSANOW, new) + + log_fd = None + if logfile: + log_fd = open(logfile, 'wb') + + logging.info('Console start:') + await run(check_allowed, cons, log_fd, listen_only) + + except OSError as err: + print('error', err) + returncode = 1 + + finally: + if old: + termios.tcsetattr(fd, termios.TCSAFLUSH, old) + if log_fd: + log_fd.close() + + return returncode diff --git a/man/labgrid-client.1 b/man/labgrid-client.1 index 150e7be3c..b02d86ee1 100644 --- a/man/labgrid-client.1 +++ b/man/labgrid-client.1 @@ -65,6 +65,9 @@ file and strategy strategy state to force into before switching to desired state, requires a desired state (\fB\-s\fP/\fB\-\-state\fP/\fBLG_STATE\fP) .TP +.B \-a\fP,\fB \-\-acquire +acquire the place before performing the requested operation +.TP .B \-d\fP,\fB \-\-debug enable debugging .TP diff --git a/man/labgrid-client.rst b/man/labgrid-client.rst index 43b76f663..c6660e4d4 100644 --- a/man/labgrid-client.rst +++ b/man/labgrid-client.rst @@ -48,6 +48,8 @@ OPTIONS -i INITIAL_STATE, --initial-state INITIAL_STATE strategy state to force into before switching to desired state, requires a desired state (``-s``/``--state``/``LG_STATE``) +-a, --acquire + acquire the place before performing the requested operation -d, --debug enable debugging -v, --verbose