Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align chart testing logic with helm cert pipeline #458

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 56 additions & 59 deletions scripts/src/saforcharttesting/saforcharttesting.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import sys
import time
import os
import argparse
import base64
import json
import argparse
import os
import subprocess
import sys
import tempfile
import re
import time
from string import Template

namespace_template = """\
Expand All @@ -24,6 +23,17 @@
namespace: ${name}
"""

token_template = """\
apiVersion: v1
kind: Secret
type: kubernetes.io/service-account-token
metadata:
name: ${name}
namespace: ${name}
annotations:
kubernetes.io/service-account.name: ${name}
"""

role_template = """\
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
Expand Down Expand Up @@ -122,7 +132,7 @@ def apply_config(tmpl, **values):
config_path = os.path.join(tmpdir, "config.yaml")
with open(config_path, "w") as fd:
fd.write(content)
out = subprocess.run(["./oc", "apply", "-f", config_path], capture_output=True)
out = subprocess.run(["oc", "apply", "-f", config_path], capture_output=True)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
Expand All @@ -138,7 +148,7 @@ def delete_config(tmpl, **values):
config_path = os.path.join(tmpdir, "config.yaml")
with open(config_path, "w") as fd:
fd.write(content)
out = subprocess.run(["./oc", "delete", "-f", config_path], capture_output=True)
out = subprocess.run(["oc", "delete", "-f", config_path], capture_output=True)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
Expand All @@ -164,6 +174,14 @@ def create_serviceaccount(namespace):
print("[ERROR] creating ServiceAccount:", stderr)


def create_tokensecret(namespace):
print("creating token Secret:", namespace)
stdout, stderr = apply_config(token_template, name=namespace)
print("stdout:\n", stdout, sep="")
if stderr.strip():
print("[ERROR] creating token Secret:", stderr)


def create_role(namespace):
print("creating Role:", namespace)
stdout, stderr = apply_config(role_template, name=namespace)
Expand Down Expand Up @@ -223,80 +241,58 @@ def delete_clusterrolebinding(name):
sys.exit(1)


def write_sa_token(namespace, token):
secret_found = False
secrets = []
def write_sa_token(namespace, token_file):
"""Write's the service account token to token_file."""
token_found = False
for i in range(7):
# On retry, wait a little extra time before starting to give the cluster
# time to process the resources created before this.
if i > 0:
time.sleep(5)
print(f"[INFO] looking for service account token (retry {i})")
out = subprocess.run(
["./oc", "get", "serviceaccount", namespace, "-n", namespace, "-o", "json"],
["oc", "get", "secret", namespace, "-n", namespace, "-o", "json"],
capture_output=True,
)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
if stderr.strip():
print("[ERROR] retrieving ServiceAccount:", namespace, stderr)
time.sleep(10)
else:
sa = json.loads(stdout)
if len(sa["secrets"]) >= 2:
secrets = sa["secrets"]
secret_found = True
break
else:
pattern = r"Tokens:\s+([A-Za-z0-9-]+)"
dout = subprocess.run(
["./oc", "describe", "serviceaccount", namespace, "-n", namespace],
capture_output=True,
)
dstdout = dout.stdout.decode("utf-8")
match = re.search(pattern, dstdout)
if match:
token_name = match.group(1)
else:
print("[ERROR] Token not found, Exiting")
sys.exit(1)
secrets.append({"name": token_name})
secret_found = True
break
time.sleep(10)
print("[ERROR] retrieving token secret:", namespace, stderr)
continue

if not secret_found:
print("[ERROR] retrieving ServiceAccount:", namespace, stderr)
sys.exit(1)
secret = json.loads(stdout)
token = secret.get("data", {}).get("token", None)

for secret in secrets:
out = subprocess.run(
["./oc", "get", "secret", secret["name"], "-n", namespace, "-o", "json"],
capture_output=True,
if not token:
print("[ERROR] token not yet found in secret:", namespace)
continue

token_found = True
break

if not token_found:
print(
"[ERROR] all attempts to find service account token have failed:", namespace
)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
if stderr.strip():
print("[ERROR] retrieving secret:", secret["name"], stderr)
continue
else:
sec = json.loads(stdout)
if sec["type"] == "kubernetes.io/service-account-token":
content = sec["data"]["token"]
with open(token, "w") as fd:
fd.write(base64.b64decode(content).decode("utf-8"))
sys.exit(1)

with open(token_file, "w") as fd:
fd.write(base64.b64decode(token).decode("utf-8"))


def switch_project_context(namespace, token, api_server):
tkn = open(token).read()
for i in range(7):
out = subprocess.run(
["./oc", "login", "--token", tkn, "--server", api_server],
capture_output=True,
["oc", "login", "--token", tkn, "--server", api_server], capture_output=True
)
stdout = out.stdout.decode("utf-8")
print(stdout)
out = subprocess.run(["./oc", "project", namespace], capture_output=True)
out = subprocess.run(["oc", "project", namespace], capture_output=True)
stdout = out.stdout.decode("utf-8")
print(stdout)
out = subprocess.run(["./oc", "config", "current-context"], capture_output=True)
out = subprocess.run(["oc", "config", "current-context"], capture_output=True)
stdout = out.stdout.decode("utf-8").strip()
print(stdout)
if stdout.endswith(":".join((namespace, namespace))):
Expand Down Expand Up @@ -349,6 +345,7 @@ def main():
if args.create:
create_namespace(args.create)
create_serviceaccount(args.create)
create_tokensecret(args.create)
create_role(args.create)
create_rolebinding(args.create)
create_clusterrole(args.create)
Expand Down