Skip to content

Commit

Permalink
Merge pull request #16827 from LabNConsulting/chopps/fix-on-error-wit…
Browse files Browse the repository at this point in the history
…h-xdist

improvements for xdist mode
  • Loading branch information
Jafaral authored Sep 14, 2024
2 parents f80b967 + 98aaeab commit 84c5035
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 26 deletions.
38 changes: 27 additions & 11 deletions tests/topotests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
import lib.fixtures
import pytest
from lib.common_config import generate_support_bundle
from lib.micronet_compat import Mininet
from lib.topogen import diagnose_env, get_topogen
from lib.topolog import get_test_logdir, logger
from lib.topotest import json_cmp_result
from munet import cli
from munet.base import Commander, proc_error
from munet.base import BaseMunet, Commander, proc_error
from munet.cleanup import cleanup_current, cleanup_previous
from munet.config import ConfigOptionsProxy
from munet.testing.util import pause_test
Expand Down Expand Up @@ -86,7 +85,7 @@ def pytest_addoption(parser):
parser.addoption(
"--cli-on-error",
action="store_true",
help="Mininet cli on test failure",
help="Munet cli on test failure",
)

parser.addoption(
Expand Down Expand Up @@ -711,7 +710,7 @@ def pytest_runtest_makereport(item, call):
wait_for_procs = []
# Really would like something better than using this global here.
# Not all tests use topogen though so get_topogen() won't work.
for node in Mininet.g_mnet_inst.hosts.values():
for node in BaseMunet.g_unet.hosts.values():
pause = True

if is_tmux:
Expand All @@ -720,13 +719,15 @@ def pytest_runtest_makereport(item, call):
if not isatty
else None
)
Commander.tmux_wait_gen += 1
wait_for_channels.append(channel)
# If we don't have a tty to pause on pause for tmux windows to exit
if channel is not None:
Commander.tmux_wait_gen += 1
wait_for_channels.append(channel)

pane_info = node.run_in_window(
error_cmd,
new_window=win_info is None,
background=True,
background=not isatty,
title="{} ({})".format(title, node.name),
name=title,
tmux_target=win_info,
Expand All @@ -737,9 +738,13 @@ def pytest_runtest_makereport(item, call):
win_info = pane_info
elif is_xterm:
assert isinstance(pane_info, subprocess.Popen)
wait_for_procs.append(pane_info)
# If we don't have a tty to pause on pause for xterm procs to exit
if not isatty:
wait_for_procs.append(pane_info)

# Now wait on any channels
if wait_for_channels or wait_for_procs:
logger.info("Pausing for error command windows to exit")
for channel in wait_for_channels:
logger.debug("Waiting on TMUX channel %s", channel)
commander.cmd_raises([commander.get_exec_path("tmux"), "wait", channel])
Expand All @@ -752,10 +757,10 @@ def pytest_runtest_makereport(item, call):
if error and item.config.option.cli_on_error:
# Really would like something better than using this global here.
# Not all tests use topogen though so get_topogen() won't work.
if Mininet.g_mnet_inst:
cli.cli(Mininet.g_mnet_inst, title=title, background=False)
if BaseMunet.g_unet:
cli.cli(BaseMunet.g_unet, title=title, background=False)
else:
logger.error("Could not launch CLI b/c no mininet exists yet")
logger.error("Could not launch CLI b/c no munet exists yet")

if pause and isatty:
pause_test()
Expand Down Expand Up @@ -800,9 +805,20 @@ def coverage_finish(terminalreporter, config):
def pytest_terminal_summary(terminalreporter, exitstatus, config):
# Only run if we are the top level test runner
is_xdist_worker = "PYTEST_XDIST_WORKER" in os.environ
is_xdist = os.environ["PYTEST_XDIST_MODE"] != "no"
if config.option.cov_topotest and not is_xdist_worker:
coverage_finish(terminalreporter, config)

if (
is_xdist
and not is_xdist_worker
and (
bool(config.getoption("--pause"))
or bool(config.getoption("--pause-at-end"))
)
):
pause_test("pause-at-end")


#
# Add common fixtures available to all tests as parameters
Expand Down
56 changes: 52 additions & 4 deletions tests/topotests/munet/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ def flush(self):
await writer.drain()


async def remote_cli(unet, prompt, title, background):
async def remote_cli(unet, prompt, title, background, remote_wait=False):
"""Open a CLI in a new window."""
try:
if not unet.cli_sockpath:
Expand All @@ -756,6 +756,13 @@ async def remote_cli(unet, prompt, title, background):
unet.cli_sockpath = sockpath
logging.info("server created on :\n%s\n", sockpath)

if remote_wait:
wait_tmux = bool(os.getenv("TMUX", ""))
wait_x11 = not wait_tmux and bool(os.getenv("DISPLAY", ""))
else:
wait_tmux = False
wait_x11 = False

# Open a new window with a new CLI
python_path = await unet.async_get_exec_path(["python3", "python"])
us = os.path.realpath(__file__)
Expand All @@ -765,7 +772,32 @@ async def remote_cli(unet, prompt, title, background):
if prompt:
cmd += f" --prompt='{prompt}'"
cmd += " " + unet.cli_sockpath
unet.run_in_window(cmd, title=title, background=False)

channel = None
if wait_tmux:
from .base import Commander # pylint: disable=import-outside-toplevel

channel = "{}-{}".format(os.getpid(), Commander.tmux_wait_gen)
logger.info("XXX channel is %s", channel)
# If we don't have a tty to pause on pause for tmux windows to exit
if channel is not None:
Commander.tmux_wait_gen += 1

pane_info = unet.run_in_window(
cmd, title=title, background=False, wait_for=channel
)

if wait_tmux and channel:
from .base import commander # pylint: disable=import-outside-toplevel

logger.debug("Waiting on TMUX CLI window")
await commander.async_cmd_raises(
[commander.get_exec_path("tmux"), "wait", channel]
)
elif wait_x11 and isinstance(pane_info, subprocess.Popen):
logger.debug("Waiting on xterm CLI process %s", pane_info)
if hasattr(asyncio, "to_thread"):
await asyncio.to_thread(pane_info.wait) # pylint: disable=no-member
except Exception as error:
logging.error("cli server: unexpected exception: %s", error)

Expand Down Expand Up @@ -906,8 +938,22 @@ def cli(
prompt=None,
background=True,
):
# In the case of no tty a remote_cli will be used, and we want it to wait on finish
# of the spawned cli.py script, otherwise it returns back here and exits async loop
# which kills the server side CLI socket operation.
remote_wait = not sys.stdin.isatty()

asyncio.run(
async_cli(unet, histfile, sockpath, force_window, title, prompt, background)
async_cli(
unet,
histfile,
sockpath,
force_window,
title,
prompt,
background,
remote_wait=remote_wait,
)
)


Expand All @@ -919,12 +965,14 @@ async def async_cli(
title=None,
prompt=None,
background=True,
remote_wait=False,
):
if prompt is None:
prompt = "munet> "

if force_window or not sys.stdin.isatty():
await remote_cli(unet, prompt, title, background)
await remote_cli(unet, prompt, title, background, remote_wait)
return

if not unet:
logger.debug("client-cli using sockpath %s", sockpath)
Expand Down
4 changes: 2 additions & 2 deletions tests/topotests/munet/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -2733,7 +2733,7 @@ def __init__(
),
"format": "stdout HOST [HOST ...]",
"help": "tail -f on the stdout of the qemu/cmd for this node",
"new-window": True,
"new-window": {"background": True, "ns_only": True},
},
{
"name": "stderr",
Expand All @@ -2743,7 +2743,7 @@ def __init__(
),
"format": "stderr HOST [HOST ...]",
"help": "tail -f on the stdout of the qemu/cmd for this node",
"new-window": True,
"new-window": {"background": True, "ns_only": True},
},
]
}
Expand Down
26 changes: 17 additions & 9 deletions tests/topotests/munet/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ def pause_test(desc=""):
asyncio.run(async_pause_test(desc))


def retry(retry_timeout, initial_wait=0, expected=True):
def retry(retry_timeout, initial_wait=0, retry_sleep=2, expected=True):
"""decorator: retry while functions return is not None or raises an exception.
* `retry_timeout`: Retry for at least this many seconds; after waiting
initial_wait seconds
* `initial_wait`: Sleeps for this many seconds before first executing function
* `retry_sleep`: The time to sleep between retries.
* `expected`: if False then the return logic is inverted, except for exceptions,
(i.e., a non None ends the retry loop, and returns that value)
"""

def _retry(func):
@functools.wraps(func)
def func_retry(*args, **kwargs):
retry_sleep = 2

# Allow the wrapped function's args to override the fixtures
_retry_sleep = float(kwargs.pop("retry_sleep", retry_sleep))
_retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
_expected = kwargs.pop("expected", expected)
_initial_wait = kwargs.pop("initial_wait", initial_wait)
Expand All @@ -82,13 +82,21 @@ def func_retry(*args, **kwargs):
while True:
seconds_left = (retry_until - datetime.datetime.now()).total_seconds()
try:
ret = func(*args, **kwargs)
if _expected and ret is None:
try:
ret = func(*args, seconds_left=seconds_left, **kwargs)
except TypeError as error:
if "seconds_left" not in str(error):
raise
ret = func(*args, **kwargs)

logging.debug("Function returned %s", ret)

positive_result = ret is None
if _expected == positive_result:
logging.debug("Function succeeds")
return ret
logging.debug("Function returned %s", ret)
except Exception as error:
logging.info("Function raised exception: %s", str(error))
logging.info('Function raised exception: "%s"', error)
ret = error

if seconds_left < 0:
Expand All @@ -99,10 +107,10 @@ def func_retry(*args, **kwargs):

logging.info(
"Sleeping %ds until next retry with %.1f retry time left",
retry_sleep,
_retry_sleep,
seconds_left,
)
time.sleep(retry_sleep)
time.sleep(_retry_sleep)

func_retry._original = func # pylint: disable=W0212
return func_retry
Expand Down

0 comments on commit 84c5035

Please sign in to comment.