From dd22150ff33af79b5679528d1b383ac27ce4d80c Mon Sep 17 00:00:00 2001 From: Parag Kamble Date: Tue, 3 Dec 2024 11:18:14 +0530 Subject: [PATCH] Added Test Automation for disable keyrotation operation Signed-off-by: Parag Kamble --- ocs_ci/helpers/keyrotation_helper.py | 167 ++++++++++++++++ ocs_ci/ocs/constants.py | 2 + .../test_disable_pv_keyrotation.py | 185 ++++++++++++++++++ 3 files changed, 354 insertions(+) create mode 100644 tests/functional/pv/pv_encryption/test_disable_pv_keyrotation.py diff --git a/ocs_ci/helpers/keyrotation_helper.py b/ocs_ci/helpers/keyrotation_helper.py index ebea7388c81..4b71de8ef09 100644 --- a/ocs_ci/helpers/keyrotation_helper.py +++ b/ocs_ci/helpers/keyrotation_helper.py @@ -369,6 +369,7 @@ class PVKeyrotation(KeyRotation): def __init__(self, sc_obj): self.sc_obj = sc_obj self.kms = get_kms_details() + self.all_pvc_key_data = None def annotate_storageclass_key_rotation(self, schedule="@weekly"): """ @@ -420,3 +421,169 @@ def wait_till_keyrotation(self, device_handle): assert False return True + + def set_keyrotation_state_by_annotation(self, enable: bool): + """ + Enables or disables key rotation by annotating the StorageClass. + """ + state = "true" if enable else "false" + annotation = f"keyrotation.csiaddons.openshift.io/enable={state}" + self.sc_obj.annotate(annotation=annotation) + log.info( + f"Key rotation {'enabled' if enable else 'disabled'} for the StorageClass." + ) + + def set_keyrotation_state_by_rbac_user(self, pvc_obj, suspend_state=True): + """ + Updates key rotation CronJob state for a PVC. + """ + cron_job = self.get_keyrotation_cronjob_for_pvc(pvc_obj) + state = "unmanaged" if suspend_state else "managed" + cron_job.annotate(f"csiaddons.openshift.io/state={state}", overwrite=True) + + log.info(f"Updated CronJob annotation for PVC '{pvc_obj.name}' to '{state}'") + + suspend_patch = ( + '[{"op": "add", "path": "/spec/suspend", "value": true}]' + if suspend_state + else '[{"op": "remove", "path": "/spec/suspend"}]' + ) + cron_job.patch(params=suspend_patch, format_type="json") + log.info(f"'suspend' {'enabled' if suspend_state else 'removed'} for CronJob.") + + def get_keyrotation_cronjob_for_pvc(self, pvc_obj): + """ + Retrieves the key rotation CronJob associated with a PVC. + + Args: + pvc_obj (object): The PVC object for which to retrieve the CronJob. + + Returns: + object: The CronJob object associated with the PVC. + + Raises: + ValueError: If the PVC lacks the key rotation CronJob annotation. + """ + # Ensure annotations are loaded in the PVC object + if "annotations" not in pvc_obj.data["metadata"]: + pvc_obj.reload() + + # Extract the cronjob name from PVC annotations + cron_job_name = ( + pvc_obj.data["metadata"] + .get("annotations", {}) + .get("keyrotation.csiaddons.openshift.io/cronjob") + ) + + if not cron_job_name: + log.error(f"PVC '{pvc_obj.name}' lacks keyrotation cronjob annotation.") + raise ValueError(f"Missing keyrotation cronjob for PVC '{pvc_obj.name}'") + + log.info(f"Found CronJob '{cron_job_name}' for PVC '{pvc_obj.name}'.") + + cronjob_obj = OCP( + kind=constants.ENCRYPTIONKEYROTATIONCRONJOB, + namespace=pvc_obj.namespace, + resource_name=cron_job_name, + ) + + if not cronjob_obj.is_exist(): + log.error( + f"cronjob {cron_job_name} is not exists for the PVC: {pvc_obj.name}" + ) + raise ValueError( + f"Missing keyrotation cronjob Object for PVC '{pvc_obj.name}'" + ) + + return OCP( + kind=constants.ENCRYPTIONKEYROTATIONCRONJOB, + namespace=pvc_obj.namespace, + resource_name=cron_job_name, + ) + + def get_pvc_keys_data(self, pvc_objs): + """ + Retrieves key data for PVCs. + """ + return { + pvc.name: { + "device_handle": pvc.get_pv_volume_handle_name, + "vault_key": self.kms.get_pv_secret(pvc.get_pv_volume_handle_name), + } + for pvc in pvc_objs + } + + @retry(UnexpectedBehaviour, tries=5, delay=20) + def wait_till_all_pv_keyrotation_on_vault_kms(self, pvc_objs): + """ + Waits for all PVC keys to be rotated in the Vault KMS. + """ + if not self.all_pvc_key_data: + self.all_pvc_key_data = self.get_pvc_keys_data(pvc_objs) + raise UnexpectedBehaviour("Initializing PVC vault key data") + + new_pvc_keys = self.get_pvc_keys_data(pvc_objs) + if self.all_pvc_key_data == new_pvc_keys: + raise UnexpectedBehaviour("PVC keys have not rotated yet.") + + log.info("PVC keys rotated successfully.") + return True + + def change_pvc_keyrotation_cronjob_state(self, pvc_objs, disable=True): + """ + Modify the key rotation state of PVCs by annotating and patching their associated cronjobs. + + Args: + pvc_objs (list): List of PVC objects to modify. + disable (bool): If True, disables the key rotation. If False, enables it. Defaults to True. + + Returns: + bool: True if the operation succeeds. + """ + state_value = "unmanaged" if disable else "managed" + + for pvc in pvc_objs: + # Retrieve the cronjob associated with the PVC + cronjob = self.get_keyrotation_cronjob_for_pvc(pvc) + if not cronjob: + log.warning( + f"No KeyRotationCronjob found for PVC '{pvc.name}'. Skipping." + ) + continue + + # Annotate the cronjob to reflect the new state + state_annotation = f"csiaddons.openshift.io/state={state_value}" + cronjob.annotate(state_annotation, overwrite=True) + log.info( + f"Annotated KeyRotationCronjob for PVC '{pvc.name}' with state: {state_value}." + ) + + # Prepare the patch for suspending or resuming the cronjob + if disable: + suspend_patch = ( + '[{"op": "add", "path": "/spec/suspend", "value": true}]' + ) + log.info( + f"'suspend' set to True in KeyRotationCronjob for PVC '{pvc.name}'." + ) + else: + suspend_patch = '[{"op": "remove", "path": "/spec/suspend"}]' + log.info( + f"'suspend' removed from KeyRotationCronjob for PVC '{pvc.name}'." + ) + + # Apply the patch to the cronjob + try: + cronjob.patch(params=suspend_patch, format_type="json") + log.info( + f"Successfully patched KeyRotationCronjob for PVC '{pvc.name}'." + ) + except Exception as e: + log.error( + f"Failed to patch KeyRotationCronjob for PVC '{pvc.name}': {e}" + ) + raise + pvc.reload() + + log.info("Completed key rotation state changes for all specified PVCs.") + return True diff --git a/ocs_ci/ocs/constants.py b/ocs_ci/ocs/constants.py index 3331cfb3f9a..2dceec57a8c 100644 --- a/ocs_ci/ocs/constants.py +++ b/ocs_ci/ocs/constants.py @@ -240,6 +240,8 @@ IMAGECONTENTSOURCEPOLICY_KIND = "ImageContentSourcePolicy" NOOBAA_ACCOUNT = "NoobaaAccount" EXTERNAL_CLUSTER_SCRIPT_CONFIG = "rook-ceph-external-cluster-script-config" +ENCRYPTIONKEYROTATIONCRONJOB = "encryptionkeyrotationcronjobs.csiaddons.openshift.io" +ENCRYPTIONKEYROTATIONJOB = "encryptionkeyrotationjobs.csiaddons.openshift.io" # Provisioners AWS_EFS_PROVISIONER = "openshift.org/aws-efs" diff --git a/tests/functional/pv/pv_encryption/test_disable_pv_keyrotation.py b/tests/functional/pv/pv_encryption/test_disable_pv_keyrotation.py new file mode 100644 index 00000000000..b95337beb24 --- /dev/null +++ b/tests/functional/pv/pv_encryption/test_disable_pv_keyrotation.py @@ -0,0 +1,185 @@ +import logging +import pytest +from ocs_ci.framework.testlib import config, tier1 +from ocs_ci.ocs import constants +from ocs_ci.helpers.keyrotation_helper import PVKeyrotation +from ocs_ci.helpers.helpers import create_pods +from ocs_ci.framework.pytest_customization.marks import green_squad + +log = logging.getLogger(__name__) + +# Constants +kmsprovider = constants.VAULT_KMS_PROVIDER + +# Parametrize test cases based on environment +argnames = ["kv_version", "kms_provider", "use_vault_namespace"] +if config.ENV_DATA.get("vault_hcp"): + argvalues = [ + pytest.param("v1", kmsprovider, True), + pytest.param("v2", kmsprovider, True), + ] +else: + argvalues = [ + pytest.param("v1", kmsprovider, False), + pytest.param("v2", kmsprovider, False), + ] + + +class PVKeyrotationTestBase: + """ + Base class to reuse common setup and utility methods for PV key rotation tests. + """ + + @pytest.fixture() + def setup_common( + self, + kv_version, + kms_provider, + pv_encryption_kms_setup_factory, + project_factory, + storageclass_factory, + multi_pvc_factory, + pod_factory, + use_vault_namespace, + ): + """ + Common setup for CSI-KMS connection details, storage class, and PVCs. + """ + log.info( + "Starting setup: Configuring CSI-KMS connection details and resources." + ) + + # Set up KMS configuration + self.kms = pv_encryption_kms_setup_factory(kv_version, use_vault_namespace) + log.info("KMS setup successful.") + + # Create a project + self.proj_obj = project_factory() + log.info(f"Project {self.proj_obj.namespace} created.") + + # Key rotation annotations + keyrotation_annotations = { + "keyrotation.csiaddons.openshift.io/schedule": "* * * * *" + } + + # Create an encryption-enabled storage class + self.sc_obj = storageclass_factory( + interface=constants.CEPHBLOCKPOOL, + encrypted=True, + encryption_kms_id=self.kms.kmsid, + annotations=keyrotation_annotations, + ) + log.info("Encryption-enabled storage class created.") + + # Create Vault CSI KMS token in tenant namespace + self.kms.vault_path_token = self.kms.generate_vault_token() + self.kms.create_vault_csi_kms_token(namespace=self.proj_obj.namespace) + log.info("Vault CSI KMS token created.") + + # Create PVCs with encryption enabled + self.pvc_objs = multi_pvc_factory( + size=5, + num_of_pvc=3, + storageclass=self.sc_obj, + access_modes=[ + f"{constants.ACCESS_MODE_RWX}-Block", + f"{constants.ACCESS_MODE_RWO}-Block", + ], + wait_each=True, + project=self.proj_obj, + ) + log.info("PVCs created successfully.") + + # Create pods for the PVCs + self.pod_objs = create_pods( + self.pvc_objs, + pod_factory, + constants.CEPHBLOCKPOOL, + pods_for_rwx=1, + status=constants.STATUS_RUNNING, + ) + log.info("Pods created and running.") + + # Initialize the PVKeyrotation helper + self.pv_keyrotation_obj = PVKeyrotation(self.sc_obj) + + +@tier1 +@green_squad +@pytest.mark.parametrize( + argnames=argnames, + argvalues=argvalues, +) +class TestDisablePVKeyrotationOperation(PVKeyrotationTestBase): + @pytest.mark.polarion_id("OCS-6323") + def test_disable_pv_keyrotation_globally(self, setup_common): + """ + Test disabling PV key rotation globally by annotating the storage class. + + Steps: + 1. Add annotation to the storage class to disable key rotation. + 2. Verify key rotation jobs are deleted. + 3. Remove the annotation from the storage class. + 4. Verify key rotation cronjobs are recreated. + """ + log.info("Starting test: Disable PV key rotation globally.") + + # Disable key rotation globally + self.pv_keyrotation_obj.set_keyrotation_state_by_annotation(False) + log.info("Key rotation disabled globally via storage class annotation.") + + # Verify key rotation cronjobs are deleted + for pvc_obj in self.pvc_objs: + with pytest.raises(ValueError): + self.pv_keyrotation_obj.get_keyrotation_cronjob_for_pvc(pvc_obj) + log.info("Verified key rotation cronjobs are removed.") + + # Enable key rotation globally + self.pv_keyrotation_obj.set_keyrotation_state_by_annotation(True) + log.info("Key rotation re-enabled globally via storage class annotation.") + + # Verify key rotation cronjobs are recreated + assert self.pv_keyrotation_obj.wait_till_all_pv_keyrotation_on_vault_kms( + self.pvc_objs + ), "Failed to re-enable PV key rotation." + log.info("Key rotation successfully re-enabled globally.") + + @pytest.mark.polarion_id("OCS-6324") + def test_disable_pv_keyrotation_by_rbac_user(self, setup_common): + """ + Test disabling specific PV key rotation by RBAC user permissions. + + Steps: + 1. Disable key rotation for specific PVCs. + 2. Verify key rotation cronjobs has state suspent = True. + 3. Re-enable key rotation for specific PVCs. + 4. Verify key rotation cronjobs are recreated. + """ + log.info("Starting test: Disable PV key rotation by RBAC user.") + + # Disable key rotation for specific PVCs + self.pv_keyrotation_obj.change_pvc_keyrotation_cronjob_state( + self.pvc_objs, disable=True + ) + log.info("Key rotation disabled for specific PVCs.") + + # Verify Keyrotation is disabled for the PVC. + for pvc in self.pvc_objs: + cron_obj = self.pv_keyrotation_obj.get_keyrotation_cronjob_for_pvc(pvc) + assert cron_obj.data["spec"].get( + "suspend", False + ), "PVC keyrotation cronjob is not in 'suspend' state." + + log.info("Keyrotation is Disabled for all PVC") + + # Re-enable key rotation for specific PVCs + self.pv_keyrotation_obj.change_pvc_keyrotation_cronjob_state( + self.pvc_objs, disable=False + ) + log.info("Key rotation re-enabled for specific PVCs.") + + # Verify key rotation cronjobs are recreated + assert self.pv_keyrotation_obj.wait_till_all_pv_keyrotation_on_vault_kms( + self.pvc_objs + ), "Failed to re-enable PV key rotation for specific PVCs." + log.info("Key rotation successfully re-enabled for specific PVCs.")