diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..f338d003 --- /dev/null +++ b/.flake8 @@ -0,0 +1,7 @@ +[flake8] +# Recommend matching the black line length (default 88), +# rather than using the flake8 default of 79: +max-line-length = 88 +extend-ignore = + # See https://github.com/PyCQA/pycodestyle/issues/373 + E203, diff --git a/scripts/src/owners/checkuser.py b/scripts/src/owners/checkuser.py index ac08853e..402c1873 100644 --- a/scripts/src/owners/checkuser.py +++ b/scripts/src/owners/checkuser.py @@ -1,13 +1,14 @@ """ -Used by a github action to determine if the owner of a PR is permitted to change the files -associated with publishing a release of the chart verifier. +Used by a github action to determine if the owner of a PR is permitted to change the +files associated with publishing a release of the chart verifier. parameters: --api-url : API URL for the pull request --user : user to be checked for authority to modify release files in a PR results: - exit code 1 if pull request contains restricted files and user is not authorized to modify them. + exit code 1 if pull request contains restricted files and user is not authorized + to modify them. """ import re @@ -16,10 +17,11 @@ import os import sys import yaml + try: - from yaml import CLoader as Loader, CDumper as Dumper + from yaml import CLoader as Loader except ImportError: - from yaml import Loader, Dumper + from yaml import Loader OWNERS_FILE = "OWNERS" @@ -41,45 +43,62 @@ def verify_user(username): print(f"[ERROR] {username} not auhtorized") return False + def check_for_restricted_file(api_url): - files_api_url = f'{api_url}/files' - headers = {'Accept': 'application/vnd.github.v3+json'} + files_api_url = f"{api_url}/files" + headers = {"Accept": "application/vnd.github.v3+json"} pattern_owners = re.compile(OWNERS_FILE) pattern_versionfile = re.compile(VERSION_FILE) pattern_thisfile = re.compile(THIS_FILE) page_number = 1 - max_page_size,page_size = 100,100 + max_page_size, page_size = 100, 100 - while (page_size == max_page_size): - - files_api_query = f'{files_api_url}?per_page={page_size}&page={page_number}' - r = requests.get(files_api_query,headers=headers) + while page_size == max_page_size: + files_api_query = f"{files_api_url}?per_page={page_size}&page={page_number}" + r = requests.get(files_api_query, headers=headers) files = r.json() page_size = len(files) page_number += 1 for f in files: - filename = f["filename"] - if pattern_versionfile.match(filename) or pattern_owners.match(filename) or pattern_thisfile.match(filename): - print(f"[INFO] restricted file found: {filename}") - return True - + filename = f["filename"] + if ( + pattern_versionfile.match(filename) + or pattern_owners.match(filename) + or pattern_thisfile.match(filename) + ): + print(f"[INFO] restricted file found: {filename}") + return True + return False def main(): parser = argparse.ArgumentParser() - parser.add_argument("-a", "--api-url", dest="api_url", type=str, required=True, - help="API URL for the pull request") - parser.add_argument("-u", "--user", dest="username", type=str, required=True, - help="user to be checked for authority to modify release files in a PR") + parser.add_argument( + "-a", + "--api-url", + dest="api_url", + type=str, + required=True, + help="API URL for the pull request", + ) + parser.add_argument( + "-u", + "--user", + dest="username", + type=str, + required=True, + help="user to be checked for authority to modify release files in a PR", + ) args = parser.parse_args() if check_for_restricted_file(args.api_url): - if verify_user(args.username): - print(f"[INFO] {args.username} is authorized to modify all files in the PR") + username = args.username + if verify_user(username): + print(f"[INFO] {username} is authorized to modify all files in the PR") else: - print(f"[INFO] {args.username} is not authorized to modify all files in the PR") + print(f"[INFO] {username} is not authorized to modify all files in the PR") sys.exit(1) else: - print(f"[INFO] no restricted files found in the PR") + print("[INFO] no restricted files found in the PR") diff --git a/scripts/src/release/releasebody.py b/scripts/src/release/releasebody.py index b9b4c7ef..33f2ce14 100644 --- a/scripts/src/release/releasebody.py +++ b/scripts/src/release/releasebody.py @@ -1,12 +1,13 @@ import sys -sys.path.append('./scripts/src/') -from release import releasechecker -from utils import utils +sys.path.append("./scripts/src/") +from release import releasechecker # noqa E402 + def get_release_body(version, image_name, release_info): """Generate the body of the GitHub release""" - body = f"Chart verifier version {version}

Docker Image:
- {image_name}:{version}

" + body = f"Chart verifier version {version}

" + body += f"Docker Image:
- {image_name}:{version}

" body += "This version includes:
" for info in release_info: if info.startswith("<"): @@ -15,7 +16,12 @@ def get_release_body(version, image_name, release_info): body += f"- {info}
" return body + def main(): version_info = releasechecker.get_version_info() - release_body = get_release_body(version_info["version"],version_info["quay-image"],version_info["release-info"]) + release_body = get_release_body( + version_info["version"], + version_info["quay-image"], + version_info["release-info"], + ) print(release_body) diff --git a/scripts/src/release/releasechecker.py b/scripts/src/release/releasechecker.py index c8152f90..c91f9f2c 100644 --- a/scripts/src/release/releasechecker.py +++ b/scripts/src/release/releasechecker.py @@ -1,6 +1,7 @@ """ Used by a github action -1. To determine if the contents of pull request contain only the file which contains the chart verifier release. +1. To determine if the contents of pull request contain only the file which contains + the chart verifier release. 2. To determine if the release has been updated. parameters: @@ -15,10 +16,11 @@ PR_includes_release : Set to true if the PR contains the version file. PR_release_body : Body of text to be used to describe the release. if --version only is specified, output variables are set: - updated : set to true if the version specified is later than the version in the version file - from the main branch. + updated : set to true if the version specified is later than the version in the + version file from the main branch. if neither parameters are specified, output variables are set: - PR_version : The chart verifier version read from the version file from main branch. + PR_version : The chart verifier version read from the version file from main + branch. PR_release_image : The name of the image from the version file from main branch. """ @@ -30,27 +32,28 @@ import requests import semver import sys -sys.path.append('./scripts/src/') -from release import tarfile_asset, releasebody -from utils import utils -VERSION_FILE = 'pkg/chartverifier/version/version_info.json' +sys.path.append("./scripts/src/") +from release import tarfile_asset, releasebody # noqa 402 +from utils import utils # noqa 402 + +VERSION_FILE = "pkg/chartverifier/version/version_info.json" + def check_if_only_version_file_is_modified(api_url): - # api_url https://api.github.com/repos///pulls/ + # Example format for api_url: + # https://api.github.com/repos///pulls/ - files_api_url = f'{api_url}/files' - headers = {'Accept': 'application/vnd.github.v3+json'} + files_api_url = f"{api_url}/files" + headers = {"Accept": "application/vnd.github.v3+json"} pattern_versionfile = re.compile(r"pkg/chartverifier/version/version_info.json") page_number = 1 - max_page_size,page_size = 100,100 - + max_page_size, page_size = 100, 100 version_file_found = False - while (page_size == max_page_size): - - files_api_query = f'{files_api_url}?per_page={page_size}&page={page_number}' - r = requests.get(files_api_query,headers=headers) + while page_size == max_page_size: + files_api_query = f"{files_api_url}?per_page={page_size}&page={page_number}" + r = requests.get(files_api_query, headers=headers) files = r.json() page_size = len(files) page_number += 1 @@ -64,12 +67,14 @@ def check_if_only_version_file_is_modified(api_url): return version_file_found + def get_version_info(): data = {} with open(VERSION_FILE) as json_file: data = json.load(json_file) return data + def release_exists(version): g = Github(os.environ.get("GITHUB_TOKEN")) releases = g.get_repo(os.environ.get("GITHUB_REPOSITORY")).get_releases() @@ -81,40 +86,65 @@ def release_exists(version): def main(): parser = argparse.ArgumentParser() - parser.add_argument("-a", "--api-url", dest="api_url", type=str, required=False, - help="API URL for the pull request") - parser.add_argument("-v", "--version", dest="version", type=str, required=False, - help="Version to compare") + parser.add_argument( + "-a", + "--api-url", + dest="api_url", + type=str, + required=False, + help="API URL for the pull request", + ) + parser.add_argument( + "-v", + "--version", + dest="version", + type=str, + required=False, + help="Version to compare", + ) args = parser.parse_args() if args.api_url: version_info = get_version_info() asset_file = tarfile_asset.create(version_info["version"]) - print(f'[INFO] Verifier tarball created : {asset_file}.') - utils.add_output("PR_tarball_name",asset_file) + print(f"[INFO] Verifier tarball created : {asset_file}.") + utils.add_output("PR_tarball_name", asset_file) if check_if_only_version_file_is_modified(args.api_url): - ## should be on PR branch + # should be on PR branch print(f'[INFO] Release found in PR files : {version_info["version"]}.') - utils.add_output("PR_version",version_info["version"]) - utils.add_output("PR_release_image",version_info["quay-image"]) - utils.add_output("PR_release_info",version_info["release-info"]) - utils.add_output("PR_includes_release","true") - release_body = releasebody.get_release_body(version_info["version"],version_info["quay-image"],version_info["release-info"]) - utils.add_output("PR_release_body",release_body) + utils.add_output("PR_version", version_info["version"]) + utils.add_output("PR_release_image", version_info["quay-image"]) + utils.add_output("PR_release_info", version_info["release-info"]) + utils.add_output("PR_includes_release", "true") + release_body = releasebody.get_release_body( + version_info["version"], + version_info["quay-image"], + version_info["release-info"], + ) + utils.add_output("PR_release_body", release_body) else: version_info = get_version_info() if args.version: # should be on main branch - version_compare = semver.compare(args.version,version_info["version"]) - if version_compare > 0 : - print(f'[INFO] Release {args.version} found in PR files is newer than: {version_info["version"]}.') - utils.add_output("updated","true") + version_compare = semver.compare(args.version, version_info["version"]) + if version_compare > 0: + print( + f"[INFO] Release {args.version} found in PR files is newer than: " + f'{version_info["version"]}.' + ) + utils.add_output("updated", "true") elif version_compare == 0 and not release_exists(args.version): - print(f'[INFO] Release {args.version} found in PR files is not new but no release exists yet.') - utils.add_output("updated","true") + print( + f"[INFO] Release {args.version} found in PR files is not new but " + "no release exists yet." + ) + utils.add_output("updated", "true") else: - print(f'[INFO] Release found in PR files is not new : {version_info["version"]} already exists.') + print( + "[INFO] Release found in PR files is not new: " + f'{version_info["version"]} already exists.' + ) else: - utils.add_output("PR_version",version_info["version"]) - utils.add_output("PR_release_image",version_info["quay-image"]) + utils.add_output("PR_version", version_info["version"]) + utils.add_output("PR_release_image", version_info["quay-image"]) print("[INFO] PR contains non-release files.") diff --git a/scripts/src/release/tarfile_asset.py b/scripts/src/release/tarfile_asset.py index 07923ec7..9c4c3655 100644 --- a/scripts/src/release/tarfile_asset.py +++ b/scripts/src/release/tarfile_asset.py @@ -3,31 +3,38 @@ import tarfile from utils import utils -tar_content_files = [ {"name": "out/chart-verifier", "arc_name": "chart-verifier"} ] +tar_content_files = [{"name": "out/chart-verifier", "arc_name": "chart-verifier"}] def create(release): - tgz_name = f"chart-verifier-{release}.tgz" - utils.add_output("tarball_base_name",tgz_name) + utils.add_output("tarball_base_name", tgz_name) if os.path.exists(tgz_name): os.remove(tgz_name) with tarfile.open(tgz_name, "x:gz") as tar: for tar_content_file in tar_content_files: - tar.add(os.path.join(os.getcwd(),tar_content_file["name"]),arcname=tar_content_file["arc_name"]) + tar.add( + os.path.join(os.getcwd(), tar_content_file["name"]), + arcname=tar_content_file["arc_name"], + ) + return os.path.join(os.getcwd(), tgz_name) - return os.path.join(os.getcwd(),tgz_name) def main(): parser = argparse.ArgumentParser() - parser.add_argument("-r", "--release", dest="release", type=str, required=True, - help="Release name for the tar file") + parser.add_argument( + "-r", + "--release", + dest="release", + type=str, + required=True, + help="Release name for the tar file", + ) args = parser.parse_args() tarfile = create(args.release) - print(f'[INFO] Verifier tarball created : {tarfile}.') - utils.add_output("tarball_full_name",tarfile) - + print(f"[INFO] Verifier tarball created : {tarfile}.") + utils.add_output("tarball_full_name", tarfile) diff --git a/scripts/src/saforcharttesting/saforcharttesting.py b/scripts/src/saforcharttesting/saforcharttesting.py index 354b1e7b..d138745f 100644 --- a/scripts/src/saforcharttesting/saforcharttesting.py +++ b/scripts/src/saforcharttesting/saforcharttesting.py @@ -115,6 +115,7 @@ namespace: ${name} """ + def apply_config(tmpl, **values): with tempfile.TemporaryDirectory(prefix="sa-for-chart-testing-") as tmpdir: content = Template(tmpl).substitute(values) @@ -130,6 +131,7 @@ def apply_config(tmpl, **values): return stdout, stderr + def delete_config(tmpl, **values): with tempfile.TemporaryDirectory(prefix="sa-for-chart-testing-") as tmpdir: content = Template(tmpl).substitute(values) @@ -145,6 +147,7 @@ def delete_config(tmpl, **values): return stdout, stderr + def create_namespace(namespace): print("creating Namespace:", namespace) stdout, stderr = apply_config(namespace_template, name=namespace) @@ -152,6 +155,7 @@ def create_namespace(namespace): if stderr.strip(): print("[ERROR] creating Namespace:", stderr) + def create_serviceaccount(namespace): print("creating ServiceAccount:", namespace) stdout, stderr = apply_config(serviceaccount_template, name=namespace) @@ -159,6 +163,7 @@ def create_serviceaccount(namespace): if stderr.strip(): print("[ERROR] creating ServiceAccount:", stderr) + def create_role(namespace): print("creating Role:", namespace) stdout, stderr = apply_config(role_template, name=namespace) @@ -166,6 +171,7 @@ def create_role(namespace): if stderr.strip(): print("[ERROR] creating Role:", stderr) + def create_rolebinding(namespace): print("creating RoleBinding:", namespace) stdout, stderr = apply_config(rolebinding_template, name=namespace) @@ -173,6 +179,7 @@ def create_rolebinding(namespace): if stderr.strip(): print("[ERROR] creating RoleBinding:", stderr) + def create_clusterrole(namespace): print("creating ClusterRole:", namespace) stdout, stderr = apply_config(clusterrole_template, name=namespace) @@ -180,6 +187,7 @@ def create_clusterrole(namespace): if stderr.strip(): print("[ERROR] creating ClusterRole:", stderr) + def create_clusterrolebinding(namespace): print("creating ClusterRoleBinding:", namespace) stdout, stderr = apply_config(clusterrolebinding_template, name=namespace) @@ -187,6 +195,7 @@ def create_clusterrolebinding(namespace): if stderr.strip(): print("[ERROR] creating ClusterRoleBinding:", stderr) + def delete_namespace(namespace): print("deleting Namespace:", namespace) stdout, stderr = delete_config(namespace_template, name=namespace) @@ -195,6 +204,7 @@ def delete_namespace(namespace): print("[ERROR] deleting Namespace:", namespace, stderr) sys.exit(1) + def delete_clusterrole(name): print("deleting ClusterRole:", name) stdout, stderr = delete_config(clusterrole_template, name=name) @@ -203,6 +213,7 @@ def delete_clusterrole(name): print("[ERROR] deleting ClusterRole:", name, stderr) sys.exit(1) + def delete_clusterrolebinding(name): print("deleting ClusterRoleBinding:", name) stdout, stderr = delete_config(clusterrolebinding_template, name=name) @@ -211,11 +222,15 @@ def delete_clusterrolebinding(name): print("[ERROR] deleting ClusterRoleBinding:", name, stderr) sys.exit(1) + def write_sa_token(namespace, token): secret_found = False secrets = [] for i in range(7): - out = subprocess.run(["./oc", "get", "serviceaccount", namespace, "-n", namespace, "-o", "json"], capture_output=True) + out = subprocess.run( + ["./oc", "get", "serviceaccount", namespace, "-n", namespace, "-o", "json"], + capture_output=True, + ) stdout = out.stdout.decode("utf-8") if out.returncode != 0: stderr = out.stderr.decode("utf-8") @@ -229,15 +244,18 @@ def write_sa_token(namespace, token): secret_found = True break else: - pattern = r'Tokens:\s+([A-Za-z0-9-]+)' - dout = subprocess.run(["./oc", "describe", "serviceaccount", namespace, "-n", namespace], capture_output=True) + pattern = r"Tokens:\s+([A-Za-z0-9-]+)" + dout = subprocess.run( + ["./oc", "describe", "serviceaccount", namespace, "-n", namespace], + capture_output=True, + ) dstdout = dout.stdout.decode("utf-8") match = re.search(pattern, dstdout) if match: - token_name = match.group(1) + token_name = match.group(1) else: - print("[ERROR] Token not found, Exiting") - sys.exit(1) + print("[ERROR] Token not found, Exiting") + sys.exit(1) secrets.append({"name": token_name}) secret_found = True break @@ -248,7 +266,10 @@ def write_sa_token(namespace, token): sys.exit(1) for secret in secrets: - out = subprocess.run(["./oc", "get", "secret", secret["name"], "-n", namespace, "-o", "json"], capture_output=True) + out = subprocess.run( + ["./oc", "get", "secret", secret["name"], "-n", namespace, "-o", "json"], + capture_output=True, + ) stdout = out.stdout.decode("utf-8") if out.returncode != 0: stderr = out.stderr.decode("utf-8") @@ -262,10 +283,14 @@ def write_sa_token(namespace, token): with open(token, "w") as fd: fd.write(base64.b64decode(content).decode("utf-8")) + def switch_project_context(namespace, token, api_server): tkn = open(token).read() for i in range(7): - out = subprocess.run(["./oc", "login", "--token", tkn, "--server", api_server], capture_output=True) + out = subprocess.run( + ["./oc", "login", "--token", tkn, "--server", api_server], + capture_output=True, + ) stdout = out.stdout.decode("utf-8") print(stdout) out = subprocess.run(["./oc", "project", namespace], capture_output=True) @@ -280,19 +305,45 @@ def switch_project_context(namespace, token, api_server): time.sleep(10) # This exit will happen if there is an infra failure - print("""[ERROR] There is an error creating the namespace and service account. It happens due to some infrastructure failure. It is not directly related to the changes in the pull request. You can wait for some time and try to re-run the job. To re-run the job change the PR into a draft and remove the draft state.""") + print( + """[ERROR] There is an error creating the namespace and service account. It + happens due to some infrastructure failure. It is not directly related to the + changes in the pull request. You can wait for some time and try to re-run the + job. To re-run the job change the PR into a draft and remove the draft + state.""" + ) sys.exit(1) + def main(): parser = argparse.ArgumentParser() - parser.add_argument("-c", "--create", dest="create", type=str, required=False, - help="create service account and namespace for chart testing") - parser.add_argument("-t", "--token", dest="token", type=str, required=False, - help="service account token for chart testing") - parser.add_argument("-d", "--delete", dest="delete", type=str, required=False, - help="delete service account and namespace used for chart testing") - parser.add_argument("-s", "--server", dest="server", type=str, required=False, - help="API server URL") + parser.add_argument( + "-c", + "--create", + dest="create", + type=str, + required=False, + help="create service account and namespace for chart testing", + ) + parser.add_argument( + "-t", + "--token", + dest="token", + type=str, + required=False, + help="service account token for chart testing", + ) + parser.add_argument( + "-d", + "--delete", + dest="delete", + type=str, + required=False, + help="delete service account and namespace used for chart testing", + ) + parser.add_argument( + "-s", "--server", dest="server", type=str, required=False, help="API server URL" + ) args = parser.parse_args() if args.create: diff --git a/scripts/src/utils/utils.py b/scripts/src/utils/utils.py index ac148a56..6280e957 100644 --- a/scripts/src/utils/utils.py +++ b/scripts/src/utils/utils.py @@ -1,13 +1,14 @@ import os -def add_output(key,value): + +def add_output(key, value): """This function prints the key/value pair to stdout in a "key=value" format. If called from a GitHub workflow, it also sets an output parameter. """ - print(f'{key}={value}') + print(f"{key}={value}") if "GITHUB_OUTPUT" in os.environ: - with open(os.environ['GITHUB_OUTPUT'],'a') as fh: - print(f'{key}={value}',file=fh) + with open(os.environ["GITHUB_OUTPUT"], "a") as fh: + print(f"{key}={value}", file=fh) diff --git a/tests/tests/functional/chart_test.py b/tests/tests/functional/chart_test.py index 1c6534f2..31826c06 100644 --- a/tests/tests/functional/chart_test.py +++ b/tests/tests/functional/chart_test.py @@ -1,32 +1,35 @@ -"""This file contains the tests for chart-verifier. It follows behavioral driven development using pytest-bdd. +"""This file contains the tests for chart-verifier. It follows behavioral driven +development using pytest-bdd. Following environment variables are expected to be set in order to run the tests: - VERIFIER_TARBALL_NAME: Path to the tarball to use for the tarball tests. - PODMAN_IMAGE_TAG: Tag of the container image to use to run the podman tests. -- KUBECONFIG: Path to the kubeconfig file to use to connect to the OCP cluster to deploy the charts on. +- KUBECONFIG: Path to the kubeconfig file to use to connect to the OCP cluster to + deploy the charts on. """ from pytest_bdd import scenario, given, when, then, parsers import os import subprocess -import sys import pytest import json import tarfile from deepdiff import DeepDiff import yaml + try: - from yaml import CLoader as Loader, CDumper as Dumper + from yaml import CLoader as Loader except ImportError: - from yaml import Loader, Dumper + from yaml import Loader REPORT_ANNOTATIONS = "annotations" REPORT_RESULTS = "results" REPORT_DIGESTS = "digests" REPORT_METADATA = "metadata" + @scenario( "features/chart_good.feature", "A chart provider verifies their chart using the chart verifier", @@ -34,6 +37,7 @@ def test_chart_source(): pass + @scenario( "features/chart_good.feature", "A chart provider verifies their signed chart using the chart verifier", @@ -41,27 +45,50 @@ def test_chart_source(): def test_chart_signed(): pass -@given(parsers.parse("I would like to use the profile"),target_fixture="profile_type") + +@given( + parsers.parse("I would like to use the profile"), + target_fixture="profile_type", +) def profile_type(type): return type -@given(parsers.parse("I will provide a of a "),target_fixture="chart_location") -def chart_location(location,helm_chart): - return os.path.join(location,helm_chart) -@given(parsers.parse("I will provide a of an expected "),target_fixture="report_info_location") -def report_info_location(location,report_info): - return os.path.join(location,report_info) +@given( + parsers.parse("I will provide a of a "), + target_fixture="chart_location", +) +def chart_location(location, helm_chart): + return os.path.join(location, helm_chart) + -@given(parsers.parse("I will provide a of a to verify the signature"),target_fixture="public_key_location") -def public_key_location(location,public_key): - return os.path.join(location,public_key) +@given( + parsers.parse("I will provide a of an expected "), + target_fixture="report_info_location", +) +def report_info_location(location, report_info): + return os.path.join(location, report_info) -@given(parsers.parse("I will use the chart verifier image"),target_fixture="image_type") + +@given( + parsers.parse( + "I will provide a of a to verify the signature" + ), + target_fixture="public_key_location", +) +def public_key_location(location, public_key): + return os.path.join(location, public_key) + + +@given( + parsers.parse("I will use the chart verifier image"), + target_fixture="image_type", +) def image_type(image_type): return image_type -@given('The chart verifier version value',target_fixture='verifier_version') + +@given("The chart verifier version value", target_fixture="verifier_version") def verifier_version(image_type): """Get the version of the chart verifier tool used to produce and verify reports. @@ -83,25 +110,53 @@ def verifier_version(image_type): image_tag = os.environ.get("PODMAN_IMAGE_TAG") if not image_tag: image_tag = "main" - image_name = "quay.io/redhat-certification/chart-verifier" + image_name = "quay.io/redhat-certification/chart-verifier" print(f"\nRun version using podman image {image_name}:{image_tag}") - return run_version_podman_image(image_name,image_tag) + return run_version_podman_image(image_name, image_tag) -@when(parsers.parse("I run the chart-verifier verify command against the chart to generate a report"),target_fixture="run_verify") -def run_verify(image_type, profile_type, chart_location): - print(f"\nrun {image_type} verifier verify with profile : {profile_type}, and chart: {chart_location}") - return run_verifier(image_type, profile_type, chart_location,"verify") -@when(parsers.parse("I run the chart-verifier verify command against the signed chart to generate a report"),target_fixture="run_signed_verify") +@when( + parsers.parse( + "I run the chart-verifier verify command against the chart to generate a report" + ), + target_fixture="run_verify", +) +def run_verify(image_type, profile_type, chart_location): + print( + f"\nrun {image_type} verifier verify with profile : {profile_type}," + f" and chart: {chart_location}" + ) + return run_verifier(image_type, profile_type, chart_location, "verify") + + +@when( + parsers.parse( + "I run the chart-verifier verify command against the signed chart to generate " + "a report" + ), + target_fixture="run_signed_verify", +) def run_signed_verify(image_type, profile_type, chart_location, public_key_location): - print(f"\nrun {image_type} verifier verify with profile : {profile_type}, and signed chart: {chart_location}") - return run_verifier(image_type, profile_type, chart_location,"verify",public_key_location) + print( + f"\nrun {image_type} verifier verify with profile : {profile_type}, " + f"and signed chart: {chart_location}" + ) + return run_verifier( + image_type, profile_type, chart_location, "verify", public_key_location + ) + def run_report(image_type, profile_type, report_location): - print(f"\nrun {image_type} verifier report with profile : {profile_type}, and chart: {report_location}") - return run_verifier(image_type, profile_type, report_location,"report") + print( + f"\nrun {image_type} verifier report with profile : {profile_type}, " + f"and chart: {report_location}" + ) + return run_verifier(image_type, profile_type, report_location, "report") -def run_verifier(image_type, profile_type, target_location, command, pgp_key_location=None): + +def run_verifier( + image_type, profile_type, target_location, command, pgp_key_location=None +): """Run chart-verifier command and capture output Args: @@ -109,7 +164,8 @@ def run_verifier(image_type, profile_type, target_location, command, pgp_key_loc profile_type (str): Profile to use. Options: partner, redhat, community. target_location (str): Path to the Helm chart to run verify against. command (str): Command to run, either verify or report. - pgp_key_location (str, optional): Path to the GPG public key of the key used to sign the chart. + pgp_key_location (str, optional): Path to the GPG public key of the key used to + sign the chart. Returns: str: The output provided by the command. @@ -118,27 +174,37 @@ def run_verifier(image_type, profile_type, target_location, command, pgp_key_loc tarball_name = os.environ.get("VERIFIER_TARBALL_NAME") print(f"\nRun {command} using tarball: {tarball_name}") if command == "verify": - return run_verify_tarball_image(tarball_name,profile_type,target_location,pgp_key_location) + return run_verify_tarball_image( + tarball_name, profile_type, target_location, pgp_key_location + ) else: - return run_report_tarball_image(tarball_name,profile_type,target_location) + return run_report_tarball_image(tarball_name, profile_type, target_location) # Podman image_tag = os.environ.get("PODMAN_IMAGE_TAG") if not image_tag: image_tag = "main" - image_name = "quay.io/redhat-certification/chart-verifier" + image_name = "quay.io/redhat-certification/chart-verifier" print(f"\nRun {command} using podman image {image_name}:{image_tag}") if command == "verify": - return run_verify_podman_image(image_name,image_tag,profile_type,target_location,pgp_key_location) + return run_verify_podman_image( + image_name, image_tag, profile_type, target_location, pgp_key_location + ) else: - return run_report_podman_image(image_name,image_tag,profile_type,target_location) + return run_report_podman_image( + image_name, image_tag, profile_type, target_location + ) + def run_version_tarball_image(tarball_name): tar = tarfile.open(tarball_name, "r:gz") tar.extractall(path="./test_verifier") - out = subprocess.run(["./test_verifier/chart-verifier","version", "--as-data"],capture_output=True) + out = subprocess.run( + ["./test_verifier/chart-verifier", "version", "--as-data"], capture_output=True + ) return normalize_version(out.stdout.decode("utf-8")) + def normalize_version(version): """Extract normalized version from JSON data @@ -148,16 +214,30 @@ def normalize_version(version): Returns: str: a normalized semver like 0.0.0. """ - print(f'version input to normalize_version function is: {version}') + print(f"version input to normalize_version function is: {version}") version_dict = json.loads(version) return version_dict["version"] -def run_version_podman_image(verifier_image_name,verifier_image_tag): + +def run_version_podman_image(verifier_image_name, verifier_image_tag): """Run chart verifier's version command in Podman.""" - out = subprocess.run(["podman", "run", "--rm", f"{verifier_image_name}:{verifier_image_tag}", "version", "--as-data"], capture_output=True) + out = subprocess.run( + [ + "podman", + "run", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "version", + "--as-data", + ], + capture_output=True, + ) return normalize_version(out.stdout.decode("utf-8")) -def run_verify_tarball_image(tarball_name,profile_type, chart_location,pgp_key_location=None): + +def run_verify_tarball_image( + tarball_name, profile_type, chart_location, pgp_key_location=None +): print(f"Run tarball image from {tarball_name}") tar = tarfile.open(tarball_name, "r:gz") @@ -165,72 +245,216 @@ def run_verify_tarball_image(tarball_name,profile_type, chart_location,pgp_key_l tar.extractall(path="./test_verifier") if pgp_key_location: - out = subprocess.run(["./test_verifier/chart-verifier","verify","--set",f"profile.vendorType={profile_type}","--pgp-public-key",pgp_key_location,chart_location],capture_output=True) + out = subprocess.run( + [ + "./test_verifier/chart-verifier", + "verify", + "--set", + f"profile.vendorType={profile_type}", + "--pgp-public-key", + pgp_key_location, + chart_location, + ], + capture_output=True, + ) else: - out = subprocess.run(["./test_verifier/chart-verifier","verify","--set",f"profile.vendorType={profile_type}",chart_location],capture_output=True) + out = subprocess.run( + [ + "./test_verifier/chart-verifier", + "verify", + "--set", + f"profile.vendorType={profile_type}", + chart_location, + ], + capture_output=True, + ) return out.stdout.decode("utf-8") -def run_report_tarball_image(tarball_name,profile_type, chart_location): + +def run_report_tarball_image(tarball_name, profile_type, chart_location): print(f"Run tarball image from {tarball_name}") tar = tarfile.open(tarball_name, "r:gz") tar.extractall(path="./test_verifier") - out = subprocess.run(["./test_verifier/chart-verifier","report","all","--set",f"profile.vendorType={profile_type}",chart_location],capture_output=True) + out = subprocess.run( + [ + "./test_verifier/chart-verifier", + "report", + "all", + "--set", + f"profile.vendorType={profile_type}", + chart_location, + ], + capture_output=True, + ) return out.stdout.decode("utf-8") -def run_verify_podman_image(verifier_image_name,verifier_image_tag,profile_type, chart_location,pgp_key_location=None): +def run_verify_podman_image( + verifier_image_name, + verifier_image_tag, + profile_type, + chart_location, + pgp_key_location=None, +): print(f"Run podman image - {verifier_image_name}:{verifier_image_tag}") kubeconfig = os.environ.get("KUBECONFIG") if not kubeconfig: return "FAIL: missing KUBECONFIG environment variable" - if chart_location.startswith('http:/') or chart_location.startswith('https:/'): - if pgp_location: - out = subprocess.run(["podman", "run", "-v", f"{kubeconfig}:/kubeconfig:z", "-e", "KUBECONFIG=/kubeconfig", "--rm", - f"{verifier_image_name}:{verifier_image_tag}", "verify", "--set", f"profile.vendortype={profile_type}","--pgp-public-key",public_key_location,chart_location], capture_output=True) + if chart_location.startswith("http:/") or chart_location.startswith("https:/"): + if pgp_key_location: + out = subprocess.run( + [ + "podman", + "run", + "-v", + f"{kubeconfig}:/kubeconfig:z", + "-e", + "KUBECONFIG=/kubeconfig", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "verify", + "--set", + f"profile.vendortype={profile_type}", + "--pgp-public-key", + public_key_location, + chart_location, + ], + capture_output=True, + ) else: - out = subprocess.run(["podman", "run", "-v", f"{kubeconfig}:/kubeconfig:z", "-e", "KUBECONFIG=/kubeconfig", "--rm", - f"{verifier_image_name}:{verifier_image_tag}", "verify", "--set", f"profile.vendortype={profile_type}", chart_location], capture_output=True) + out = subprocess.run( + [ + "podman", + "run", + "-v", + f"{kubeconfig}:/kubeconfig:z", + "-e", + "KUBECONFIG=/kubeconfig", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "verify", + "--set", + f"profile.vendortype={profile_type}", + chart_location, + ], + capture_output=True, + ) else: chart_directory = os.path.dirname(os.path.abspath(chart_location)) chart_name = os.path.basename(os.path.abspath(chart_location)) if pgp_key_location: pgp_key_name = os.path.basename(os.path.abspath(pgp_key_location)) - out = subprocess.run(["podman", "run", "-v", f"{chart_directory}:/charts:z", "-v", f"{kubeconfig}:/kubeconfig:z", "-e", "KUBECONFIG=/kubeconfig", "--rm", - f"{verifier_image_name}:{verifier_image_tag}", "verify", "--set", f"profile.vendortype={profile_type}","--pgp-public-key",f"/charts/{pgp_key_name}",f"/charts/{chart_name}"], capture_output=True) + out = subprocess.run( + [ + "podman", + "run", + "-v", + f"{chart_directory}:/charts:z", + "-v", + f"{kubeconfig}:/kubeconfig:z", + "-e", + "KUBECONFIG=/kubeconfig", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "verify", + "--set", + f"profile.vendortype={profile_type}", + "--pgp-public-key", + f"/charts/{pgp_key_name}", + f"/charts/{chart_name}", + ], + capture_output=True, + ) else: - out = subprocess.run(["podman", "run", "-v", f"{chart_directory}:/charts:z", "-v", f"{kubeconfig}:/kubeconfig:z", "-e", "KUBECONFIG=/kubeconfig", "--rm", - f"{verifier_image_name}:{verifier_image_tag}", "verify", "--set", f"profile.vendortype={profile_type}", f"/charts/{chart_name}"], capture_output=True) + out = subprocess.run( + [ + "podman", + "run", + "-v", + f"{chart_directory}:/charts:z", + "-v", + f"{kubeconfig}:/kubeconfig:z", + "-e", + "KUBECONFIG=/kubeconfig", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "verify", + "--set", + f"profile.vendortype={profile_type}", + f"/charts/{chart_name}", + ], + capture_output=True, + ) return out.stdout.decode("utf-8") -def run_report_podman_image(verifier_image_name,verifier_image_tag,profile_type, report_location): +def run_report_podman_image( + verifier_image_name, verifier_image_tag, profile_type, report_location +): print(f"Run podman image - {verifier_image_name}:{verifier_image_tag}") report_directory = os.path.dirname(os.path.abspath(report_location)) report_name = os.path.basename(os.path.abspath(report_location)) - out = subprocess.run(["podman", "run", "-v", f"{report_directory}:/reports:z", "--rm", - f"{verifier_image_name}:{verifier_image_tag}", "report", "all", "--set", f"profile.vendortype={profile_type}", f"/reports/{report_name}"], capture_output=True) + out = subprocess.run( + [ + "podman", + "run", + "-v", + f"{report_directory}:/reports:z", + "--rm", + f"{verifier_image_name}:{verifier_image_tag}", + "report", + "all", + "--set", + f"profile.vendortype={profile_type}", + f"/reports/{report_name}", + ], + capture_output=True, + ) return out.stdout.decode("utf-8") -@then("I should see the report-info from the report for the signed chart matching the expected report-info") -def signed_chart_report(run_signed_verify, profile_type, report_info_location, image_type, verifier_version): - check_report(run_signed_verify, profile_type, report_info_location, image_type, verifier_version) +@then( + "I should see the report-info from the report for the signed chart matching the " + "expected report-info" +) +def signed_chart_report( + run_signed_verify, profile_type, report_info_location, image_type, verifier_version +): + check_report( + run_signed_verify, + profile_type, + report_info_location, + image_type, + verifier_version, + ) + + +@then( + "I should see the report-info from the generated report matching the expected " + "report-info" +) +def chart_report( + run_verify, profile_type, report_info_location, image_type, verifier_version +): + check_report( + run_verify, profile_type, report_info_location, image_type, verifier_version + ) -@then("I should see the report-info from the generated report matching the expected report-info") -def chart_report(run_verify, profile_type, report_info_location, image_type, verifier_version): - check_report(run_verify, profile_type, report_info_location, image_type, verifier_version) -def check_report(verify_result, profile_type, report_info_location, image_type, verifier_version): - """Compares the output of chart-verifier against a pre-generated report, containing the expected results. +def check_report( + verify_result, profile_type, report_info_location, image_type, verifier_version +): + """Compares the output of chart-verifier against a pre-generated report, containing + the expected results. Fails the tests if a difference is found. @@ -242,7 +466,7 @@ def check_report(verify_result, profile_type, report_info_location, image_type, verifier_version (str): Normalized version as given by chart-verifier. """ if verify_result.startswith("FAIL"): - pytest.fail(f'FAIL some tests failed: {verify_result}') + pytest.fail(f"FAIL some tests failed: {verify_result}") print(f"Report data:\n{verify_result}\ndone") @@ -250,39 +474,49 @@ def check_report(verify_result, profile_type, report_info_location, image_type, test_passed = True - report_verifier_version = report_data['metadata']['tool']['verifier-version'] + report_verifier_version = report_data["metadata"]["tool"]["verifier-version"] if report_verifier_version != verifier_version: - print(f"FAIL: verifier-version found in report does not match tool version. Expected {verifier_version}, but report has {report_verifier_version}") + print( + "FAIL: verifier-version found in report does not match tool version. " + f"Expected {verifier_version}, but report has {report_verifier_version}" + ) test_passed = False report_vendor_type = report_data["metadata"]["tool"]["profile"]["VendorType"] if report_vendor_type != profile_type: - print(f"FAIL: profiles do not match. Expected {profile_type}, but report has {report_vendor_type}") + print( + "FAIL: profiles do not match. " + f"Expected {profile_type}, but report has {report_vendor_type}" + ) test_passed = False - chart_name = report_data["metadata"]["chart"]["name"] + chart_name = report_data["metadata"]["chart"]["name"] chart_version = report_data["metadata"]["chart"]["version"] - report_name = f'{profile_type}-{chart_name}-{chart_version}-report.yaml' - report_dir = os.path.join(os.getcwd(),"test-reports") - report_path = os.path.join(report_dir,report_name) + report_name = f"{profile_type}-{chart_name}-{chart_version}-report.yaml" + report_dir = os.path.join(os.getcwd(), "test-reports") + report_path = os.path.join(report_dir, report_name) if not os.path.isdir(report_dir): os.makedirs(report_dir) - print(f'Report path : {report_path}') + print(f"Report path : {report_path}") with open(report_path, "w") as fd: fd.write(verify_result) - expected_reports_file = open(report_info_location,) + expected_reports_file = open( + report_info_location, + ) expected_reports = json.load(expected_reports_file) test_report_data = run_report(image_type, profile_type, report_path) print(f"test_report_data : \n{test_report_data}") test_report = json.loads(test_report_data) - results_diff = DeepDiff(expected_reports[REPORT_RESULTS],test_report[REPORT_RESULTS],ignore_order=True) + results_diff = DeepDiff( + expected_reports[REPORT_RESULTS], test_report[REPORT_RESULTS], ignore_order=True + ) if results_diff: print(f"difference found in results : {results_diff}") test_passed = False @@ -295,26 +529,38 @@ def check_report(verify_result, profile_type, report_info_location, image_type, for report_annotation in test_report[REPORT_ANNOTATIONS]: tested_annotations[report_annotation["name"]] = report_annotation["value"] - missing_annotations = set(expected_annotations.keys()) - set(tested_annotations.keys()) + missing_annotations = set(expected_annotations.keys()) - set( + tested_annotations.keys() + ) if missing_annotations: test_passed = False print(f"Missing annotations: {missing_annotations}") - extra_annotations = set(tested_annotations.keys()) - set(expected_annotations.keys()) + extra_annotations = set(tested_annotations.keys()) - set( + expected_annotations.keys() + ) if extra_annotations: test_passed = False print(f"extra annotations: {extra_annotations}") - differing_annotations = {"charts.openshift.io/lastCertifiedTimestamp", - "charts.openshift.io/testedOpenShiftVersion"} + differing_annotations = { + "charts.openshift.io/lastCertifiedTimestamp", + "charts.openshift.io/testedOpenShiftVersion", + } for annotation in expected_annotations.keys(): - if not annotation in differing_annotations: + if annotation not in differing_annotations: if expected_annotations[annotation] != tested_annotations[annotation]: test_passed = False - print(f"{annotation} has different content, expected: {expected_annotations[annotation]}, got: {tested_annotations[annotation]}") - - digests_diff = DeepDiff(expected_reports[REPORT_DIGESTS],test_report[REPORT_DIGESTS],ignore_order=True) + print( + f"{annotation} has different content, " + f"expected: {expected_annotations[annotation]}, " + f"got: {tested_annotations[annotation]}" + ) + + digests_diff = DeepDiff( + expected_reports[REPORT_DIGESTS], test_report[REPORT_DIGESTS], ignore_order=True + ) if digests_diff: print(f"difference found in digests : {digests_diff}") test_passed = False @@ -323,12 +569,13 @@ def check_report(verify_result, profile_type, report_info_location, image_type, expected_metadata = expected_reports[REPORT_METADATA] test_metadata = test_report[REPORT_METADATA] for metadata in expected_metadata.keys(): - if not metadata in differing_metadata: - metadata_diff = DeepDiff(expected_metadata[metadata],test_metadata[metadata],ignore_order=True) + if metadata not in differing_metadata: + metadata_diff = DeepDiff( + expected_metadata[metadata], test_metadata[metadata], ignore_order=True + ) if metadata_diff: test_passed = False print(f"difference found in {metadata} metadata : {metadata_diff}") if not test_passed: - pytest.fail('FAIL differences found in reports') - + pytest.fail("FAIL differences found in reports")