Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding kms policy name for same kms encryption keys #11497

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions ocs_ci/utility/kms.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __init__(self):
"VAULT_BACKEND", defaults.VAULT_DEFAULT_BACKEND_VERSION
)
self.kmsid = None
self.vault_policy_name = None
self.vault_policy_name = self.kms_vault_policy_name
self.vault_kube_auth_path = "kubernetes"
self.vault_kube_auth_role = constants.VAULT_KUBERNETES_AUTH_ROLE
self.vault_kube_auth_namespace = None
Expand All @@ -147,6 +147,16 @@ def vault_path_token(self, value):
# For setting values in test cases
Vault._vault_path_token = value

@property
def kms_vault_policy_name(self):
cluster_name = config.ENV_DATA.get("cluster_name")
if config.multicluster:
with config.RunWithPrimaryConfigContext():
cluster_name = config.ENV_DATA.get("cluster_name")
policy_name = f"kpn_{cluster_name}"
logger.info(f"Vault policy name will be {policy_name}")
return policy_name

def deploy(self):
"""
This function delegates the deployment of vault
Expand Down Expand Up @@ -555,7 +565,7 @@ def vault_create_backend_path(self, backend_path=None, kv_version=None):
f"Failed to create path f{self.vault_backend_path}"
)
if not backend_path:
self.vault_create_policy()
self.vault_create_policy(policy_name=self.vault_policy_name)

def vault_create_policy(self, policy_name=None):
"""
Expand All @@ -565,17 +575,40 @@ def vault_create_policy(self, policy_name=None):
VaultOperationError exception
"""
policy = (
f'path "{self.vault_backend_path}/*" {{\n'
f' capabilities = ["create", "read", "update","delete"]'
f"\n}}\n"
f'path "sys/mounts" {{\n'
f'capabilities = ["read"]\n'
f"}}"
)
vault_hcl = tempfile.NamedTemporaryFile(mode="w+", prefix="test", delete=False)
with open(vault_hcl.name, "w") as hcl:
hcl.write(policy)
# Check if policy still exists
cmd_list_policy = "vault policy list --format=json"

out = subprocess.check_output(shlex.split(cmd_list_policy))
json_out = json.loads(out)
if self.vault_policy_name in json_out:
# if policy already exists append the secondary cluster backend path to the policy
poilcy_data = (
f"\n}}\n"
f'path "{self.vault_backend_path}/*" {{\n'
f' capabilities = ["create", "read", "update","delete"]'
)
Comment on lines +582 to +589
Copy link
Member

@petr-balogh petr-balogh Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in poilcy_data
and you are repeating this unnecesarly. Should be defined once only.
This is simplified how you can do this:

Suggested change
json_out = json.loads(out)
if self.vault_policy_name in json_out:
# if policy already exists append the secondary cluster backend path to the policy
poilcy_data = (
f"\n}}\n"
f'path "{self.vault_backend_path}/*" {{\n'
f' capabilities = ["create", "read", "update","delete"]'
)
existing_policy_data = None
read_policy_cmd = f"vault policy read {self.vault_policy_name}"
try:
existing_policy_data = subprocess.check_output(shlex.split(read_policy_cmd))
logger.info(f"Existing policy found!:\n{existing_policy_data}")
expect Exception:
logger.info("No existing policy found!")
policy_backend_path = (
f'path "{self.vault_backend_path}/*" {{\n'
f' capabilities = ["create", "read", "update","delete"]'
f"\n}}\n"
)
policy_sys_mount = (
f'path "sys/mounts" {{\n'
f'capabilities = ["read"]\n'
f"}}"
)
if existing_policy_data is None:
policy = policy_backend_path + policy_sys_mount
else:
policy = policy_backend_path + existing_policy_data
vault_hcl = tempfile.NamedTemporaryFile(
mode="a+", prefix="test", delete=False
)
logger.info(
f"Creating or updating policy: {self.vault_policy_name} with content:\n"
f"{policy}"
)
with open(vault_hcl.name, "a") as hcl:
hcl.write(policy)

vault_hcl = tempfile.NamedTemporaryFile(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the rest from this line can be deleted if you will follow the suggestion from code above

mode="a+", prefix="test", delete=False
)
logger.info(
f"Appending secondary cluster backend path to policy: {self.vault_policy_name}"
)
with open(vault_hcl.name, "a") as hcl:
hcl.write(poilcy_data)
else:
policy = (
f'path "{self.vault_backend_path}/*" {{\n'
f' capabilities = ["create", "read", "update","delete"]'
f"\n}}\n"
f'path "sys/mounts" {{\n'
f'capabilities = ["read"]\n'
f"}}"
)
vault_hcl = tempfile.NamedTemporaryFile(
mode="w+", prefix="test", delete=False
)
with open(vault_hcl.name, "w") as hcl:
hcl.write(policy)

if policy_name:
self.vault_policy_name = policy_name
Expand Down Expand Up @@ -808,20 +841,25 @@ def remove_vault_policy(self, vault_namespace=None):
Args:
vault namespace (str): Namespace in Vault, if exists, where the backend path is created
"""

if vault_namespace:
cmd = f"vault policy delete -namespace={vault_namespace} {self.vault_policy_name} "
else:
cmd = f"vault policy delete {self.vault_policy_name}"
subprocess.check_output(shlex.split(cmd))

# Check if policy still exists
if vault_namespace:
cmd = f"vault policy list -namespace={vault_namespace} --format=json"
cmd_list_policy = (
f"vault policy list -namespace={vault_namespace} --format=json"
)
else:
cmd = "vault policy list --format=json"
cmd_list_policy = "vault policy list --format=json"

out = subprocess.check_output(shlex.split(cmd))
out = subprocess.check_output(shlex.split(cmd_list_policy))
json_out = json.loads(out)
if self.vault_policy_name in json_out:
if vault_namespace:
cmd = f"vault policy delete -namespace={vault_namespace} {self.vault_policy_name} "
else:
cmd = f"vault policy delete {self.vault_policy_name}"
subprocess.check_output(shlex.split(cmd))

# Check if policy still exists
out = subprocess.check_output(shlex.split(cmd_list_policy))
json_out = json.loads(out)
if self.vault_policy_name in json_out:
raise KMSResourceCleaneupError(
Expand Down
Loading