diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0e97b1a90..be750d709 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -69,7 +69,7 @@ jobs: uses: actions/setup-python@v4 - name: Install test dependencies run: | - sudo apt install -y podman python3-pytest flake8 qemu-system-x86 + sudo apt install -y podman python3-pytest python3-paramiko flake8 qemu-system-x86 - name: Run tests run: | # podman needs (parts of) the environment but will break when diff --git a/plans/all.fmf b/plans/all.fmf index b815afaf4..80ad68476 100644 --- a/plans/all.fmf +++ b/plans/all.fmf @@ -11,6 +11,7 @@ prepare: - podman - pytest - python3-flake8 + - python3-paramiko execute: how: tmt script: pytest -s -vv diff --git a/test/test_smoke.py b/test/test_smoke.py index 0d3855f7f..a25d60011 100644 --- a/test/test_smoke.py +++ b/test/test_smoke.py @@ -65,6 +65,8 @@ def test_smoke(output_path, config_json): generated_img = pathlib.Path(output_path) / "qcow2/disk.qcow2" assert generated_img.exists(), f"output file missing, dir content: {os.listdir(os.fspath(output_path))}" with VM(generated_img) as test_vm: - # TODO: replace with 'test_vm.run("true")' once user creation via - # blueprints works - test_vm.wait_ssh_ready() + exit_status, _ = test_vm.run("true", user="test", password="password") + assert exit_status == 0 + exit_status, output = test_vm.run("echo hello") + assert exit_status == 0 + assert output == "hello" diff --git a/test/vm.py b/test/vm.py index 78938e13e..51bb7682a 100644 --- a/test/vm.py +++ b/test/vm.py @@ -1,10 +1,14 @@ import pathlib import subprocess import sys +from io import StringIO from testutil import get_free_port, wait_ssh_ready +from paramiko.client import AutoAddPolicy, SSHClient + + class VM: MEM = "2000" QEMU = "qemu-system-x86_64" @@ -64,3 +68,25 @@ def __enter__(self): def __exit__(self, type, value, tb): self.force_stop() + + def run(self, cmd, user, password): + if not self._qemu_p: + self.start() + client = SSHClient() + client.set_missing_host_key_policy(AutoAddPolicy) + client.connect( + "localhost", self._ssh_port, user, password, + allow_agent=False, look_for_keys=False) + chan = client.get_transport().open_session() + chan.get_pty() + chan.exec_command(cmd) + stdout_f = chan.makefile() + output = StringIO() + while True: + out = stdout_f.readline() + if not out: + break + self._log(out) + output.write(out) + exit_status = stdout_f.channel.recv_exit_status() + return exit_status, output.getvalue()