From cf6520aa02967a9af0c2eb2f28f4373442f9885a Mon Sep 17 00:00:00 2001 From: Asa Price Date: Wed, 18 Dec 2024 12:14:32 -0500 Subject: [PATCH] Handle errors in SP validator; refactor; add type hints; add test --- lib/system_profile_validate.py | 20 ++++-- system_profile_validator.py | 124 ++++++++++++++++++++------------- tests/test_system_profile.py | 14 +++- 3 files changed, 101 insertions(+), 57 deletions(-) diff --git a/lib/system_profile_validate.py b/lib/system_profile_validate.py index 1bc4f4035..dca14641a 100644 --- a/lib/system_profile_validate.py +++ b/lib/system_profile_validate.py @@ -2,6 +2,7 @@ from datetime import datetime from datetime import timedelta +from confluent_kafka import Consumer from confluent_kafka import TopicPartition from marshmallow import ValidationError from requests import get @@ -23,22 +24,24 @@ def __init__(self): self.fail_count = 0 -def get_schema_from_url(url): +def get_schema_from_url(url: str) -> dict: response = get(url) if response.status_code != 200: raise ValueError(f"Schema not found at URL: {url}") return safe_load(get(url).content.decode("utf-8")) -def get_schema(fork, branch): +def get_schema(fork: str, branch: str) -> dict: return get_schema_from_url( f"https://raw.githubusercontent.com/{fork}/inventory-schemas/" f"{branch}/schemas/system_profile/v1.yaml" ) -def validate_sp_schemas(consumer, topics, schemas, days=1, max_messages=10000): +def validate_sp_schemas( + consumer: Consumer, topics: list[str], schemas: dict, days: int = 1, max_messages: int = 10000 +) -> dict[str, dict[str, TestResult]]: total_message_count = 0 - test_results = {branch: {} for branch in schemas.keys()} + test_results: dict[str, dict[str, TestResult]] = {branch: {} for branch in schemas.keys()} seek_date = datetime.now() + timedelta(days=(-1 * days)) logger.info("Validating messages from these topics:") @@ -104,8 +107,13 @@ def validate_sp_schemas(consumer, topics, schemas, days=1, max_messages=10000): def validate_sp_for_branch( - consumer, topics, repo_fork="RedHatInsights", repo_branch="master", days=1, max_messages=10000 -): + consumer: Consumer, + topics: list[str], + repo_fork: str = "RedHatInsights", + repo_branch: str = "master", + days: int = 1, + max_messages: int = 10000, +) -> dict[str, dict[str, TestResult]]: schemas = {"RedHatInsights/master": get_schema("RedHatInsights", "master")} schemas[f"{repo_fork}/{repo_branch}"] = get_schema(repo_fork, repo_branch) diff --git a/system_profile_validator.py b/system_profile_validator.py index 0269b8331..83a5791ec 100755 --- a/system_profile_validator.py +++ b/system_profile_validator.py @@ -1,20 +1,25 @@ #!/usr/bin/python import json import sys +from datetime import datetime from functools import partial from os import getenv +from typing import Union from confluent_kafka import Consumer as KafkaConsumer from dateutil import parser +from requests import Response from requests import get from requests import post from requests.auth import HTTPBasicAuth +from yaml import parser as yamlParser from app.config import Config from app.environment import RuntimeEnvironment from app.logging import configure_logging from app.logging import get_logger from app.logging import threadctx +from lib.system_profile_validate import TestResult from lib.system_profile_validate import get_schema from lib.system_profile_validate import get_schema_from_url from lib.system_profile_validate import validate_sp_schemas @@ -26,10 +31,12 @@ REPO_NAME = "inventory-schemas" SP_SPEC_PATH = "schemas/system_profile/v1.yaml" RUNTIME_ENVIRONMENT = RuntimeEnvironment.JOB -GIT_USER = getenv("GIT_USER") -GIT_TOKEN = getenv("GIT_TOKEN") +GIT_USER = getenv("GIT_USER", "") +GIT_TOKEN = getenv("GIT_TOKEN", "") VALIDATE_DAYS = int(getenv("VALIDATE_DAYS", 3)) +logger = get_logger(LOGGER_NAME) + def _init_config(): config = Config(RUNTIME_ENVIRONMENT) @@ -41,31 +48,34 @@ def _excepthook(logger, type, value, traceback): logger.exception("System Profile Validator failed", exc_info=value) -def _get_git_response(path): +def _get_git_response(path: str) -> dict: return json.loads( get(f"https://api.github.com{path}", auth=HTTPBasicAuth(GIT_USER, GIT_TOKEN)).content.decode("utf-8") ) -def _post_git_response(path, content): +def _post_git_response(path: str, content: str) -> Response: return post(f"https://api.github.com{path}", auth=HTTPBasicAuth(GIT_USER, GIT_TOKEN), json={"body": content}) -def _validation_results_plaintext(test_results): +def _validation_results_plaintext(test_results: dict[str, TestResult]) -> str: text = "" for reporter, result in test_results.items(): text += f"{reporter}:\n\tPass: {result.pass_count}\n\tFail: {result.fail_count}\n\n" return text -def _post_git_results_comment(pr_number, test_results): - content = ( +def _generate_comment_from_results(test_results: dict) -> str: + return ( f"Here are the System Profile validation results using Prod data.\n" f"Validating against the {REPO_OWNER}/{REPO_NAME} master spec:\n```\n" f"{_validation_results_plaintext(test_results[f'{REPO_OWNER}/{REPO_NAME}'])}\n```\n" f"Validating against this PR's spec:\n```\n" f"{_validation_results_plaintext(test_results['this'])}\n```\n" ) + + +def _post_git_comment_to_pr(pr_number: str, content: str) -> None: response = _post_git_response(f"/repos/{REPO_OWNER}/{REPO_NAME}/issues/{pr_number}/comments", content) if response.status_code >= 400: logger.error(f"Could not post a comment to PR #{pr_number}. Response: {response.text}") @@ -73,13 +83,13 @@ def _post_git_results_comment(pr_number, test_results): logger.info(f"Posted a comment to PR #{pr_number}, with response status {response.status_code}") -def _get_latest_commit_datetime_for_pr(owner, repo, pr_number): +def _get_latest_commit_datetime_for_pr(owner: str, repo: str, pr_number: str) -> datetime: pr_commits = _get_git_response(f"/repos/{owner}/{repo}/pulls/{pr_number}/commits") latest_commit = pr_commits[-1] return parser.isoparse(latest_commit["commit"]["author"]["date"]) -def _get_latest_self_comment_datetime_for_pr(owner, repo, pr_number): +def _get_latest_self_comment_datetime_for_pr(owner: str, repo: str, pr_number: str) -> Union[datetime, None]: pr_comments = _get_git_response(f"/repos/{owner}/{repo}/issues/{pr_number}/comments") for comment in reversed(pr_comments): if comment["user"]["login"] == GIT_USER: @@ -87,7 +97,7 @@ def _get_latest_self_comment_datetime_for_pr(owner, repo, pr_number): return None -def _does_pr_require_validation(owner, repo, pr_number): +def _does_pr_require_validation(owner: str, repo: str, pr_number: str) -> bool: latest_commit_datetime = _get_latest_commit_datetime_for_pr(owner, repo, pr_number) latest_self_comment_datetime = _get_latest_self_comment_datetime_for_pr(owner, repo, pr_number) sp_spec_modified = SP_SPEC_PATH in [ @@ -105,7 +115,7 @@ def _does_pr_require_validation(owner, repo, pr_number): return False -def _get_prs_that_require_validation(owner, repo): +def _get_prs_that_require_validation(owner: str, repo: str) -> list[str]: logger.info(f"Checking whether {owner}/{repo} PRs need schema validation...") prs_to_validate = [] @@ -116,6 +126,53 @@ def _get_prs_that_require_validation(owner, repo): return prs_to_validate +def _get_sp_spec_from_pr(pr_number: str) -> dict: + # Get spec file from PR + file_list = _get_git_response(f"/repos/{REPO_OWNER}/{REPO_NAME}/pulls/{pr_number}/files") + for file in file_list: + if file["filename"] == SP_SPEC_PATH: + logger.debug(f"Getting SP spec from {file['raw_url']}") + return get_schema_from_url(file["raw_url"]) + + raise FileNotFoundError() + + +def _validate_schema_for_pr_and_generate_comment(pr_number: str, config: Config) -> str: + consumer = KafkaConsumer( + { + "bootstrap.servers": config.bootstrap_servers, + **config.validator_kafka_consumer, + } + ) + + try: + schemas = { + f"{REPO_OWNER}/{REPO_NAME}": get_schema(REPO_OWNER, "master"), + "this": _get_sp_spec_from_pr(pr_number), + } + except yamlParser.ParserError as pe: + logger.error(pe) + return ( + "An error occurred while trying to parse the schema in this PR. " + "Please verify the syntax and formatting, and see the pod logs for further details." + ) + + try: + test_results = validate_sp_schemas( + consumer, + [config.kafka_consumer_topic, config.additional_validation_topic], + schemas, + VALIDATE_DAYS, + config.sp_validator_max_messages, + ) + consumer.close() + return _generate_comment_from_results(test_results) + except ValueError as ve: + logger.exception(ve) + consumer.close() + sys.exit(1) + + def main(logger): config = _init_config() @@ -128,48 +185,16 @@ def main(logger): # For each PR in prs_to_validate, validate the parsed hosts and leave a comment on the PR for pr_number in prs_to_validate: - consumer = KafkaConsumer( - { - "bootstrap.servers": config.bootstrap_servers, - **config.validator_kafka_consumer, - } - ) - - sp_spec = None - - # Get spec file from PR - file_list = _get_git_response(f"/repos/{REPO_OWNER}/{REPO_NAME}/pulls/{pr_number}/files") - for file in file_list: - if file["filename"] == SP_SPEC_PATH: - logger.debug(f"Getting SP spec from {file['raw_url']}") - sp_spec = get_schema_from_url(file["raw_url"]) - break - - # If the System Profile spec wasn't modified, skip to the next PR. - if not sp_spec: - continue - - schemas = {f"{REPO_OWNER}/{REPO_NAME}": get_schema(REPO_OWNER, "master")} - schemas["this"] = sp_spec - try: - test_results = validate_sp_schemas( - consumer, - [config.kafka_consumer_topic, config.additional_validation_topic], - schemas, - VALIDATE_DAYS, - config.sp_validator_max_messages, - ) - consumer.close() - except ValueError as ve: - logger.exception(ve) - consumer.close() - sys.exit(1) + message = _validate_schema_for_pr_and_generate_comment(pr_number, config) + except FileNotFoundError: + # System Profile not changed in PR, no need to validate + continue # Only post a comment if there still isn't one on the PR. - # This is needed because another validation job may have posted a comment in the meantime. + # This check is needed because another validation job may have posted a comment in the meantime. if _does_pr_require_validation(REPO_OWNER, REPO_NAME, pr_number): - _post_git_results_comment(pr_number, test_results) + _post_git_comment_to_pr(pr_number, message) logger.info("The validator has finished. Bye!") sys.exit(0) @@ -178,7 +203,6 @@ def main(logger): if __name__ == "__main__": configure_logging() - logger = get_logger(LOGGER_NAME) sys.excepthook = partial(_excepthook, logger) threadctx.request_id = None diff --git a/tests/test_system_profile.py b/tests/test_system_profile.py index 30bebe1cc..837fa59a3 100644 --- a/tests/test_system_profile.py +++ b/tests/test_system_profile.py @@ -1,10 +1,12 @@ import pytest +from yaml.parser import ParserError from app.config import Config from app.environment import RuntimeEnvironment from app.exceptions import ValidationException from lib.host_repository import find_hosts_by_staleness from lib.system_profile_validate import validate_sp_for_branch +from system_profile_validator import _validate_schema_for_pr_and_generate_comment from tests.helpers.api_utils import HOST_READ_ALLOWED_RBAC_RESPONSE_FILES from tests.helpers.api_utils import HOST_READ_PROHIBITED_RBAC_RESPONSE_FILES from tests.helpers.api_utils import HOST_URL @@ -256,8 +258,18 @@ def test_validate_sp_for_missing_branch_or_repo(mocker): assert "Schema not found at URL" in str(excinfo.value) +def test_validate_unparseable_sp(mocker): + # Mock schema fetch + get_schema_from_url_mock = mocker.patch("lib.system_profile_validate.get_schema_from_url") + get_schema_from_url_mock.side_effect = ParserError("Error parsing yaml") + config = Config(RuntimeEnvironment.SERVICE) + + comment_content = _validate_schema_for_pr_and_generate_comment("test", config) + assert "An error occurred while trying to parse the schema in this PR" in comment_content + + def test_validate_sp_for_invalid_days(api_post): - response_status, response_data = api_post( + response_status, _ = api_post( url=f"{SYSTEM_PROFILE_URL}/validate_schema?repo_branch=master&days=0", host_data=None )