diff --git a/libvirt/tests/cfg/migration/migration_with_vtpm/migration_with_external_tpm.cfg b/libvirt/tests/cfg/migration/migration_with_vtpm/migration_with_external_tpm.cfg new file mode 100644 index 0000000000..7807974850 --- /dev/null +++ b/libvirt/tests/cfg/migration/migration_with_vtpm/migration_with_external_tpm.cfg @@ -0,0 +1,50 @@ +- migration_with_vtpm.migration_with_external_tpm: + type = migration_with_external_tpm + migration_setup = 'yes' + storage_type = 'nfs' + setup_local_nfs = 'yes' + disk_type = "file" + disk_source_protocol = "netfs" + mnt_path_name = ${nfs_mount_dir} + # Console output can only be monitored via virsh console output + only_pty = True + take_regular_screendumps = no + # Extra options to pass after + virsh_migrate_extra = "" + # SSH connection time out + ssh_timeout = 60 + start_vm = "no" + virsh_migrate_dest_state = running + virsh_migrate_src_state = running + # Local URI + virsh_migrate_connect_uri = "qemu:///system" + server_ip = "${migrate_dest_host}" + server_user = "root" + server_pwd = "${migrate_dest_pwd}" + client_ip = "${migrate_source_host}" + client_user = "root" + client_pwd = "${migrate_source_pwd}" + status_error = "no" + transport_type = "ssh" + virsh_migrate_desturi = "qemu+ssh://${migrate_dest_host}/system" + # vtpm setting + func_supported_since_libvirt_ver = (9, 0, 0) + no pseries, s390-virtio + q35: + tpm_dict = {'tpm_model': 'tpm-crb', 'backend': {'backend_type': 'external', 'source': {'type': 'unix', 'mode': 'connect', 'path': '/var/tmp/guest-swtpm.sock'}}} + aarch64: + tpm_dict = {'tpm_model': 'tpm-tis', 'backend': {'backend_type': 'external', 'source': {'type': 'unix', 'mode': 'connect', 'path': '/var/tmp/guest-swtpm.sock'}}} + statedir = "/var/tmp/mytpm" + swtpm_setup_path = '/usr/bin/swtpm_setup' + swtpm_path = '/usr/bin/swtpm' + tpm_cmd = "tpm2_getrandom --hex 16" + tpm_security_contexts= "user_tmp_t" + + variants: + - persistent_and_p2p: + virsh_migrate_options = "--live --p2p --verbose --undefinesource --persistent" + - persistent_and_non_p2p: + virsh_migrate_options = "--live --verbose" + - transient_and_non_p2p: + virsh_migrate_options = "--live --verbose" + transient_vm = "yes" diff --git a/libvirt/tests/src/migration/migration_with_vtpm/migration_with_external_tpm.py b/libvirt/tests/src/migration/migration_with_vtpm/migration_with_external_tpm.py new file mode 100644 index 0000000000..943baae6c6 --- /dev/null +++ b/libvirt/tests/src/migration/migration_with_vtpm/migration_with_external_tpm.py @@ -0,0 +1,233 @@ +import os +import shutil + +from avocado.utils import process + +from virttest import remote +from virttest import utils_package +from virttest import utils_misc +from virttest import virsh + +from virttest.libvirt_xml import vm_xml +from virttest.utils_libvirt import libvirt_vmxml +from virttest.utils_test import libvirt + +from provider.migration import base_steps + + +def check_tpm_security_context(params, test, expected_contexts, on_remote=False): + """ + Check TPM state file security contexts + + :param params: dict, test parameters + :param test: test object + :param expected_contexts: expected tpm security context + :param on_remote: True to check context on remote + """ + statedir = params.get("statedir") + + test.log.debug("Check tpm security context: (on_remote: %s)", on_remote) + cmd = "ls -hZR %s/tpm2-00.permall" % statedir + if on_remote: + cmd_result = remote.run_remote_cmd(cmd, params) + else: + cmd_result = process.run(cmd, ignore_status=True, shell=True) + if cmd_result.exit_status: + test.error("Fail to run '%s'." % cmd) + if expected_contexts not in cmd_result.stdout_text: + test.fail("Fail to find '%s' in '%s'." % (expected_contexts, cmd_result.stdout)) + + +def check_vtpm_func(params, vm, test, on_remote=False): + """ + Check vtpm function + + :param params: dict, test parameters + :param vm: VM object + :param test: test object + :param on_remote: True to check vtpm function on remote + """ + tpm_cmd = params.get("tpm_cmd") + dest_uri = params.get("virsh_migrate_desturi") + src_uri = params.get("virsh_migrate_connect_uri") + test.log.debug("Check vtpm func: (on_remote: %s).", on_remote) + if on_remote: + if vm.serial_console is not None: + vm.cleanup_serial_console() + vm.connect_uri = dest_uri + if vm.serial_console is None: + vm.create_serial_console() + vm_session = vm.wait_for_serial_login(timeout=240) + if not utils_package.package_install("tpm2-tools", vm_session): + test.error("Failed to install tpm2-tools in vm") + cmd_result = vm_session.cmd_status(tpm_cmd) + vm_session.close() + if on_remote: + if vm.serial_console is not None: + vm.cleanup_serial_console() + vm.connect_uri = src_uri + if cmd_result: + test.fail("Fail to run '%s': %s." % (tpm_cmd, cmd_result)) + + +def launch_external_swtpm(params, test, skip_setup=False, on_remote=False): + """ + Launch externally swtpm + + :param params: dict, test parameters + :param test: test object + :param skip_setup: whether skip swtpm_setup steps + :param on_remote: True to execute on remote + """ + tpm_dict = eval(params.get('tpm_dict', '{}')) + source_socket = tpm_dict['backend']['source']['path'] + statedir = params.get("statedir") + swtpm_setup_path = params.get("swtpm_setup_path") + swtpm_path = params.get("swtpm_path") + + test.log.info("Launch external swtpm process: (on_remote: %s)", on_remote) + if not skip_setup: + cmd1 = 'chcon -t virtd_exec_t %s' % swtpm_setup_path + cmd2 = 'chcon -t virtd_exec_t %s' % swtpm_path + if on_remote: + remote.run_remote_cmd("rm -rf %s" % statedir, params) + remote.run_remote_cmd("mkdir %s" % statedir, params) + remote.run_remote_cmd(cmd1, params) + remote.run_remote_cmd(cmd2, params) + else: + if os.path.exists(statedir): + shutil.rmtree(statedir) + os.mkdir(statedir) + process.run(cmd1, ignore_status=False, shell=True) + process.run(cmd2, ignore_status=False, shell=True) + cmd3 = "systemd-run %s --tpm2 --tpmstate %s --create-ek-cert --create-platform-cert --overwrite" % (swtpm_setup_path, statedir) + cmd4 = "systemd-run %s socket --ctrl type=unixio,path=%s,mode=0600 --tpmstate dir=%s,mode=0600 --tpm2 --terminate" % (swtpm_path, source_socket, statedir) + try: + if not skip_setup: + if on_remote: + remote.run_remote_cmd(cmd3, params) + else: + process.run(cmd3, ignore_status=False, shell=True) + if on_remote: + remote.run_remote_cmd(cmd4, params) + remote.run_remote_cmd('chcon -t svirt_image_t %s' % source_socket, params) + remote.run_remote_cmd('chown qemu:qemu %s' % source_socket, params) + else: + process.run(cmd4, ignore_status=False, shell=True) + # Make sure the socket is created + utils_misc.wait_for(lambda: os.path.isdir(source_socket), timeout=3) + process.run('chcon -t svirt_image_t %s' % source_socket, ignore_status=False, shell=True) + process.run('chown qemu:qemu %s' % source_socket, ignore_status=False, shell=True) + except Exception as err: + if on_remote: + remote.run_remote_cmd('pkill swtpm', params) + else: + process.run("pkill swtpm", shell=True) + test.error("{}".format(err)) + + +def setup_vtpm(params, test, vm): + """ + Setup vTPM device in guest xml + + :param params: dict, test parameters + :param vm: VM object + :param test: test object + """ + vm_name = params.get("migrate_main_vm") + transient_vm = "yes" == params.get("transient_vm", "no") + tpm_dict = eval(params.get('tpm_dict', '{}')) + + test.log.info("Setup vTPM device in guest xml.") + if vm.is_alive(): + vm.destroy(gracefully=False) + vmxml = vm_xml.VMXML.new_from_inactive_dumpxml(vm_name) + # Remove all existing tpm devices + vmxml.remove_all_device_by_type('tpm') + + libvirt_vmxml.modify_vm_device(vmxml, 'tpm', tpm_dict) + + if transient_vm: + virsh.undefine(vm_name, options='--nvram', debug=True, ignore_status=False) + virsh.create(vmxml.xml, ignore_status=False, debug=True) + else: + vm.start() + vm.wait_for_login().close() + + +def run(test, params, env): + """ + Test migration with external vtpm device. + + :param test: test object + :param params: Dictionary with the test parameters + :param env: Dictionary with test environment. + """ + + def setup_test(): + """ + Setup steps + + """ + tpm_security_contexts = params.get("tpm_security_contexts") + + libvirt.set_vm_disk(vm, params) + launch_external_swtpm(params, test) + launch_external_swtpm(params, test, skip_setup=False, on_remote=True) + setup_vtpm(params, test, vm) + check_tpm_security_context(params, test, tpm_security_contexts) + check_vtpm_func(params, vm, test) + + def verify_test(): + """ + Verify steps + + """ + check_vtpm_func(params, vm, test, on_remote=True) + + def verify_test_again(): + """ + Verify steps for migration back + + """ + tpm_security_contexts = params.get("tpm_security_contexts") + + check_vtpm_func(params, vm, test) + vm.shutdown() + vm.wait_for_shutdown() + check_tpm_security_context(params, test, tpm_security_contexts) + + def cleanup_test(): + """ + Cleanup steps + + """ + swtpm_setup_path = params.get("swtpm_setup_path") + swtpm_path = params.get("swtpm_path") + statedir = params.get("statedir") + + cmd1 = "restorecon %s" % swtpm_setup_path + cmd2 = "restorecon %s" % swtpm_path + process.run(cmd1, ignore_status=False, shell=True) + process.run(cmd2, ignore_status=False, shell=True) + if os.path.exists(statedir): + shutil.rmtree(statedir) + remote.run_remote_cmd(cmd1, params) + remote.run_remote_cmd(cmd2, params) + remote.run_remote_cmd("rm -rf %s" % statedir, params) + migration_obj.cleanup_connection() + + vm_name = params.get("migrate_main_vm") + + vm = env.get_vm(vm_name) + migration_obj = base_steps.MigrationBase(test, vm, params) + + try: + setup_test() + migration_obj.run_migration() + verify_test() + launch_external_swtpm(params, test, skip_setup=True) + migration_obj.run_migration_back() + verify_test_again() + finally: + cleanup_test()