Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Test Automation for disable keyrotation operation #10966

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions ocs_ci/helpers/keyrotation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class PVKeyrotation(KeyRotation):
def __init__(self, sc_obj):
self.sc_obj = sc_obj
self.kms = get_kms_details()
self.all_pvc_key_data = None

def annotate_storageclass_key_rotation(self, schedule="@weekly"):
"""
Expand Down Expand Up @@ -420,3 +421,169 @@ def wait_till_keyrotation(self, device_handle):
assert False

return True

def set_keyrotation_state_by_annotation(self, enable: bool):
"""
Enables or disables key rotation by annotating the StorageClass.
"""
state = "true" if enable else "false"
annotation = f"keyrotation.csiaddons.openshift.io/enable={state}"
self.sc_obj.annotate(annotation=annotation)
log.info(
f"Key rotation {'enabled' if enable else 'disabled'} for the StorageClass."
)

def set_keyrotation_state_by_rbac_user(self, pvc_obj, suspend_state=True):
"""
Updates key rotation CronJob state for a PVC.
"""
cron_job = self.get_keyrotation_cronjob_for_pvc(pvc_obj)
state = "unmanaged" if suspend_state else "managed"
cron_job.annotate(f"csiaddons.openshift.io/state={state}", overwrite=True)

log.info(f"Updated CronJob annotation for PVC '{pvc_obj.name}' to '{state}'")

suspend_patch = (
'[{"op": "add", "path": "/spec/suspend", "value": true}]'
if suspend_state
else '[{"op": "remove", "path": "/spec/suspend"}]'
)
cron_job.patch(params=suspend_patch, format_type="json")
log.info(f"'suspend' {'enabled' if suspend_state else 'removed'} for CronJob.")

def get_keyrotation_cronjob_for_pvc(self, pvc_obj):
"""
Retrieves the key rotation CronJob associated with a PVC.

Args:
pvc_obj (object): The PVC object for which to retrieve the CronJob.

Returns:
object: The CronJob object associated with the PVC.

Raises:
ValueError: If the PVC lacks the key rotation CronJob annotation.
"""
# Ensure annotations are loaded in the PVC object
if "annotations" not in pvc_obj.data["metadata"]:
pvc_obj.reload()

# Extract the cronjob name from PVC annotations
cron_job_name = (
pvc_obj.data["metadata"]
.get("annotations", {})
.get("keyrotation.csiaddons.openshift.io/cronjob")
)

if not cron_job_name:
log.error(f"PVC '{pvc_obj.name}' lacks keyrotation cronjob annotation.")
raise ValueError(f"Missing keyrotation cronjob for PVC '{pvc_obj.name}'")

log.info(f"Found CronJob '{cron_job_name}' for PVC '{pvc_obj.name}'.")

cronjob_obj = OCP(
kind=constants.ENCRYPTIONKEYROTATIONCRONJOB,
namespace=pvc_obj.namespace,
resource_name=cron_job_name,
)

if not cronjob_obj.is_exist():
log.error(
f"cronjob {cron_job_name} is not exists for the PVC: {pvc_obj.name}"
)
raise ValueError(
f"Missing keyrotation cronjob Object for PVC '{pvc_obj.name}'"
)

return OCP(
kind=constants.ENCRYPTIONKEYROTATIONCRONJOB,
namespace=pvc_obj.namespace,
resource_name=cron_job_name,
)

def get_pvc_keys_data(self, pvc_objs):
"""
Retrieves key data for PVCs.
"""
return {
pvc.name: {
"device_handle": pvc.get_pv_volume_handle_name,
"vault_key": self.kms.get_pv_secret(pvc.get_pv_volume_handle_name),
}
for pvc in pvc_objs
}

@retry(UnexpectedBehaviour, tries=5, delay=20)
def wait_till_all_pv_keyrotation_on_vault_kms(self, pvc_objs):
"""
Waits for all PVC keys to be rotated in the Vault KMS.
"""
if not self.all_pvc_key_data:
self.all_pvc_key_data = self.get_pvc_keys_data(pvc_objs)
raise UnexpectedBehaviour("Initializing PVC vault key data")

new_pvc_keys = self.get_pvc_keys_data(pvc_objs)
if self.all_pvc_key_data == new_pvc_keys:
raise UnexpectedBehaviour("PVC keys have not rotated yet.")

log.info("PVC keys rotated successfully.")
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is reduandant


def change_pvc_keyrotation_cronjob_state(self, pvc_objs, disable=True):
"""
Modify the key rotation state of PVCs by annotating and patching their associated cronjobs.

Args:
pvc_objs (list): List of PVC objects to modify.
disable (bool): If True, disables the key rotation. If False, enables it. Defaults to True.

Returns:
bool: True if the operation succeeds.
"""
state_value = "unmanaged" if disable else "managed"

for pvc in pvc_objs:
# Retrieve the cronjob associated with the PVC
cronjob = self.get_keyrotation_cronjob_for_pvc(pvc)
if not cronjob:
log.warning(
f"No KeyRotationCronjob found for PVC '{pvc.name}'. Skipping."
)
continue

# Annotate the cronjob to reflect the new state
state_annotation = f"csiaddons.openshift.io/state={state_value}"
cronjob.annotate(state_annotation, overwrite=True)
log.info(
f"Annotated KeyRotationCronjob for PVC '{pvc.name}' with state: {state_value}."
)

# Prepare the patch for suspending or resuming the cronjob
if disable:
suspend_patch = (
'[{"op": "add", "path": "/spec/suspend", "value": true}]'
)
log.info(
f"'suspend' set to True in KeyRotationCronjob for PVC '{pvc.name}'."
)
else:
suspend_patch = '[{"op": "remove", "path": "/spec/suspend"}]'
log.info(
f"'suspend' removed from KeyRotationCronjob for PVC '{pvc.name}'."
)

# Apply the patch to the cronjob
try:
cronjob.patch(params=suspend_patch, format_type="json")
log.info(
f"Successfully patched KeyRotationCronjob for PVC '{pvc.name}'."
)
except Exception as e:
log.error(
f"Failed to patch KeyRotationCronjob for PVC '{pvc.name}': {e}"
)
raise
pvc.reload()

log.info("Completed key rotation state changes for all specified PVCs.")
return True
2 changes: 2 additions & 0 deletions ocs_ci/ocs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@
IMAGECONTENTSOURCEPOLICY_KIND = "ImageContentSourcePolicy"
NOOBAA_ACCOUNT = "NoobaaAccount"
EXTERNAL_CLUSTER_SCRIPT_CONFIG = "rook-ceph-external-cluster-script-config"
ENCRYPTIONKEYROTATIONCRONJOB = "encryptionkeyrotationcronjobs.csiaddons.openshift.io"
ENCRYPTIONKEYROTATIONJOB = "encryptionkeyrotationjobs.csiaddons.openshift.io"

# Provisioners
AWS_EFS_PROVISIONER = "openshift.org/aws-efs"
Expand Down
185 changes: 185 additions & 0 deletions tests/functional/pv/pv_encryption/test_disable_pv_keyrotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import logging
import pytest
from ocs_ci.framework.testlib import config, tier1
from ocs_ci.ocs import constants
from ocs_ci.helpers.keyrotation_helper import PVKeyrotation
from ocs_ci.helpers.helpers import create_pods
from ocs_ci.framework.pytest_customization.marks import green_squad

log = logging.getLogger(__name__)

# Constants
kmsprovider = constants.VAULT_KMS_PROVIDER

# Parametrize test cases based on environment
argnames = ["kv_version", "kms_provider", "use_vault_namespace"]
if config.ENV_DATA.get("vault_hcp"):
argvalues = [
pytest.param("v1", kmsprovider, True),
pytest.param("v2", kmsprovider, True),
]
else:
argvalues = [
pytest.param("v1", kmsprovider, False),
pytest.param("v2", kmsprovider, False),
]


class PVKeyrotationTestBase:
"""
Base class to reuse common setup and utility methods for PV key rotation tests.
"""

@pytest.fixture()
def setup_common(
self,
kv_version,
kms_provider,
pv_encryption_kms_setup_factory,
project_factory,
storageclass_factory,
multi_pvc_factory,
pod_factory,
use_vault_namespace,
):
"""
Common setup for CSI-KMS connection details, storage class, and PVCs.
"""
log.info(
"Starting setup: Configuring CSI-KMS connection details and resources."
)

# Set up KMS configuration
self.kms = pv_encryption_kms_setup_factory(kv_version, use_vault_namespace)
log.info("KMS setup successful.")

# Create a project
self.proj_obj = project_factory()
log.info(f"Project {self.proj_obj.namespace} created.")

# Key rotation annotations
keyrotation_annotations = {
"keyrotation.csiaddons.openshift.io/schedule": "* * * * *"
}

# Create an encryption-enabled storage class
self.sc_obj = storageclass_factory(
interface=constants.CEPHBLOCKPOOL,
encrypted=True,
encryption_kms_id=self.kms.kmsid,
annotations=keyrotation_annotations,
)
log.info("Encryption-enabled storage class created.")

# Create Vault CSI KMS token in tenant namespace
self.kms.vault_path_token = self.kms.generate_vault_token()
self.kms.create_vault_csi_kms_token(namespace=self.proj_obj.namespace)
log.info("Vault CSI KMS token created.")

# Create PVCs with encryption enabled
self.pvc_objs = multi_pvc_factory(
size=5,
num_of_pvc=3,
storageclass=self.sc_obj,
access_modes=[
f"{constants.ACCESS_MODE_RWX}-Block",
f"{constants.ACCESS_MODE_RWO}-Block",
],
wait_each=True,
project=self.proj_obj,
)
log.info("PVCs created successfully.")

# Create pods for the PVCs
self.pod_objs = create_pods(
self.pvc_objs,
pod_factory,
constants.CEPHBLOCKPOOL,
pods_for_rwx=1,
status=constants.STATUS_RUNNING,
)
log.info("Pods created and running.")

# Initialize the PVKeyrotation helper
self.pv_keyrotation_obj = PVKeyrotation(self.sc_obj)


@tier1
@green_squad
@pytest.mark.parametrize(
argnames=argnames,
argvalues=argvalues,
)
class TestDisablePVKeyrotationOperation(PVKeyrotationTestBase):
@pytest.mark.polarion_id("OCS-6323")
def test_disable_pv_keyrotation_globally(self, setup_common):
"""
Test disabling PV key rotation globally by annotating the storage class.

Steps:
1. Add annotation to the storage class to disable key rotation.
2. Verify key rotation jobs are deleted.
3. Remove the annotation from the storage class.
4. Verify key rotation cronjobs are recreated.
"""
log.info("Starting test: Disable PV key rotation globally.")

# Disable key rotation globally
self.pv_keyrotation_obj.set_keyrotation_state_by_annotation(False)
log.info("Key rotation disabled globally via storage class annotation.")

# Verify key rotation cronjobs are deleted
for pvc_obj in self.pvc_objs:
with pytest.raises(ValueError):
self.pv_keyrotation_obj.get_keyrotation_cronjob_for_pvc(pvc_obj)
log.info("Verified key rotation cronjobs are removed.")

# Enable key rotation globally
self.pv_keyrotation_obj.set_keyrotation_state_by_annotation(True)
log.info("Key rotation re-enabled globally via storage class annotation.")

# Verify key rotation cronjobs are recreated
assert self.pv_keyrotation_obj.wait_till_all_pv_keyrotation_on_vault_kms(
self.pvc_objs
), "Failed to re-enable PV key rotation."
log.info("Key rotation successfully re-enabled globally.")

@pytest.mark.polarion_id("OCS-6324")
def test_disable_pv_keyrotation_by_rbac_user(self, setup_common):
"""
Test disabling specific PV key rotation by RBAC user permissions.

Steps:
1. Disable key rotation for specific PVCs.
2. Verify key rotation cronjobs has state suspent = True.
3. Re-enable key rotation for specific PVCs.
4. Verify key rotation cronjobs are recreated.
"""
log.info("Starting test: Disable PV key rotation by RBAC user.")

# Disable key rotation for specific PVCs
self.pv_keyrotation_obj.change_pvc_keyrotation_cronjob_state(
self.pvc_objs, disable=True
)
log.info("Key rotation disabled for specific PVCs.")

# Verify Keyrotation is disabled for the PVC.
for pvc in self.pvc_objs:
cron_obj = self.pv_keyrotation_obj.get_keyrotation_cronjob_for_pvc(pvc)
assert cron_obj.data["spec"].get(
"suspend", False
), "PVC keyrotation cronjob is not in 'suspend' state."

log.info("Keyrotation is Disabled for all PVC")

# Re-enable key rotation for specific PVCs
self.pv_keyrotation_obj.change_pvc_keyrotation_cronjob_state(
self.pvc_objs, disable=False
)
log.info("Key rotation re-enabled for specific PVCs.")

# Verify key rotation cronjobs are recreated
assert self.pv_keyrotation_obj.wait_till_all_pv_keyrotation_on_vault_kms(
self.pvc_objs
), "Failed to re-enable PV key rotation for specific PVCs."
log.info("Key rotation successfully re-enabled for specific PVCs.")
Loading