From 80ae584adcfe6fca79e84f6b3419cef1f803fb61 Mon Sep 17 00:00:00 2001 From: Henri Rosten Date: Wed, 29 Nov 2023 15:05:16 +0200 Subject: [PATCH] Allow repology usage as library - Make it possible to call repology-related code from the repology package. - Change vulnxscan and nix_outdated so they call the replogoy-related code from the repology package instead of creating a new process with exec_cmd to invoke repology_cli or repology_cve. Signed-off-by: Henri Rosten --- src/common/utils.py | 8 ++-- src/nixupdate/nix_outdated.py | 23 +++++---- src/repology/exceptions.py | 25 ++++++++++ src/repology/repology_cli.py | 86 ++++++++++++++++++++-------------- src/repology/repology_cve.py | 23 +++++---- src/vulnxscan/vulnxscan_cli.py | 54 ++++++++++----------- 6 files changed, 131 insertions(+), 88 deletions(-) create mode 100644 src/repology/exceptions.py diff --git a/src/common/utils.py b/src/common/utils.py index e7c3a60..cfca26b 100644 --- a/src/common/utils.py +++ b/src/common/utils.py @@ -200,14 +200,14 @@ def version_distance(v1, v2): re_vsplit = re.compile(r".*?(?P[0-9][0-9]*)(?P.*)$") match = re.match(re_vsplit, v1_clean) if not match: - LOG.warning("Unexpected v1 version '%s'", v1) + LOG.debug("Unexpected v1 version '%s'", v1) return 0.0 v1_major = match.group("ver_beg") v1_minor = match.group("ver_end").replace(".", "") v1_float = float(v1_major + "." + v1_minor) match = re.match(re_vsplit, v2_clean) if not match: - LOG.warning("Unexpected v2 version '%s'", v2) + LOG.debug("Unexpected v2 version '%s'", v2) return 0.0 v2_major = match.group("ver_beg") v2_minor = match.group("ver_end").replace(".", "") @@ -226,7 +226,7 @@ def parse_version(ver_str): re_ver = re.compile(r".*?(?P[0-9][0-9.]*)(?P.*)$") match = re_ver.match(ver_str) if not match: - LOG.warning("Unable to parse version '%s'", ver_str) + LOG.debug("Unable to parse version '%s'", ver_str) return None ver_beg = match.group("ver_beg").rstrip(".") ver_end = match.group("ver_end") @@ -240,7 +240,7 @@ def parse_version(ver_str): ver = re.sub(r"\.+", ".", ver) LOG.log(LOG_SPAM, "%s --> %s", ver_str, ver) if not ver: - LOG.warning("Invalid version '%s'", ver_str) + LOG.debug("Invalid version '%s'", ver_str) return None return packaging.version.parse(ver) diff --git a/src/nixupdate/nix_outdated.py b/src/nixupdate/nix_outdated.py index 92a04a7..774ad31 100755 --- a/src/nixupdate/nix_outdated.py +++ b/src/nixupdate/nix_outdated.py @@ -15,6 +15,7 @@ from argparse import ArgumentParser from tabulate import tabulate from sbomnix.sbomdb import SbomDb +import repology.repology_cli from common.utils import ( LOG, LOG_SPAM, @@ -86,15 +87,15 @@ def _generate_sbom(target_path, runtime=True, buildtime=False): def _run_repology_cli(sbompath): LOG.info("Running repology_cli") - prefix = "repology_" - suffix = ".csv" - with NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as f: - cmd = ( - "repology_cli " - f"--sbom_cdx={sbompath} --repository=nix_unstable --out={f.name}" - ) - exec_cmd(cmd.split()) - return f.name + repology_cli = repology.repology_cli.Repology() + args = [] + args.append("--repository=nix_unstable") + args.append(f"--sbom_cdx={sbompath}") + return repology_cli.query( + repology.repology_cli.getargs(args), + stdout_report=False, + file_report=False, + ) def _run_nix_visualize(targt_path): @@ -258,9 +259,7 @@ def main(): sbom_path = _generate_sbom(target_path_abs, runtime, args.buildtime) LOG.info("Using SBOM '%s'", sbom_path) - repology_out_path = _run_repology_cli(sbom_path) - LOG.info("Using repology out: '%s'", repology_out_path) - df_repology = df_from_csv_file(repology_out_path) + df_repology = _run_repology_cli(sbom_path) df_log(df_repology, LOG_SPAM) if not args.buildtime: diff --git a/src/repology/exceptions.py b/src/repology/exceptions.py new file mode 100644 index 0000000..f23fe1f --- /dev/null +++ b/src/repology/exceptions.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: 2023 Technology Innovation Institute (TII) +# +# SPDX-License-Identifier: Apache-2.0 + +# pylint: disable=unnecessary-pass + +"""Repology exceptions""" + + +class RepologyError(Exception): + """Base class for exceptions raised in the repology modules""" + + pass + + +class RepologyNoMatchingPackages(RepologyError): + """Raised when no matching repology packages found""" + + pass + + +class RepologyUnexpectedResponse(RepologyError): + """Raised when repology sends unexpected response""" + + pass diff --git a/src/repology/repology_cli.py b/src/repology/repology_cli.py index 533e0bf..2d3676c 100755 --- a/src/repology/repology_cli.py +++ b/src/repology/repology_cli.py @@ -11,7 +11,6 @@ """ Command-line interface to repology.org """ import os -import sys import pathlib import json import re @@ -24,6 +23,7 @@ import numpy as np import pandas as pd from tabulate import tabulate +import repology.exceptions from common.utils import ( LOG, LOG_SPAM, @@ -43,8 +43,14 @@ def _pkg_str(str_obj): raise ArgumentTypeError("Value must be a non-empty string") -def getargs(): - """Parse command line arguments""" +def getargs(args=None): + """ + Parse arguments: by default parses the sys.argv if `args` is not + specified, otherwise, parses arguments from the `args` list of strings. + + This is simply a wrapper for function ArgumentParser.parse_args(), + returning argument attributes in argparse.Namespace object. + """ desc = "Command line client to query repology.org for package information." epil = ( f"Example: ./{os.path.basename(__file__)} --pkg_search 'firef' " @@ -90,6 +96,8 @@ def getargs(): optional.add_argument("--verbose", help=helps, type=int, default=1) helps = "Path to output report file (default: ./repology_report.csv)" optional.add_argument("--out", help=helps, default="repology_report.csv") + if args: + return parser.parse_args(args) return parser.parse_args() @@ -174,20 +182,20 @@ def _sbom_fields(self): self.df.drop("name", axis=1, inplace=True) def _get_resp(self, query): - LOG.info("GET: %s", query) + LOG.debug("GET: %s", query) resp = self.session.get(query, headers=self.headers) LOG.debug("resp.status_code: %s", resp.status_code) if resp.status_code == 404: LOG.fatal("No matching packages found") - sys.exit(1) + raise repology.exceptions.RepologyNoMatchingPackages resp.raise_for_status() return resp - def _report(self, args): + def _report(self, args, console_report=True): """Generate result report to console and to csv file""" if self.df.empty: - LOG.warning("No matching packages found") - sys.exit(1) + LOG.debug("No matching packages found") + raise repology.exceptions.RepologyNoMatchingPackages if self.df_sbom is not None: self._sbom_fields() self.df["sbom_version_classify"] = self.df.apply(_sbom_row_classify, axis=1) @@ -201,26 +209,28 @@ def _report(self, args): df = df[~df.status.isin(["IGNORED", "NO_VERSION"])] df = df.drop_duplicates(keep="first") # Write the console report - table = tabulate( - df, - headers="keys", - tablefmt="orgtbl", - numalign="center", - showindex=False, - ) - LOG.info( - "Repology package info, packages:%s\n\n%s\n\n" - "For more details, see: %s\n", - df.shape[0], - table, - self.urlq, - ) - if args.stats: - self._stats_repology() - if self.df_sbom is not None: - self._stats_sbom() - # Write the full report to csv file - df_to_csv_file(self.df, args.out) + if console_report: + table = tabulate( + df, + headers="keys", + tablefmt="orgtbl", + numalign="center", + showindex=False, + ) + LOG.info( + "Repology package info, packages:%s\n\n%s\n\n" + "For more details, see: %s\n", + df.shape[0], + table, + self.urlq, + ) + if args.stats: + self._stats_repology() + if self.df_sbom is not None: + self._stats_sbom() + if args.out is not None: + # Write the full report to csv file + df_to_csv_file(self.df, args.out) def _stats_sbom(self): df = self.df.copy() @@ -332,8 +342,8 @@ def _parse_pkg_search_resp(self, resp, repo, pkg_stop=None): for idx, header in enumerate(projects_table.thead.find_all("th")): headers[header.text] = idx if not headers: - LOG.fatal("Unexpected response") - sys.exit(1) + LOG.fatal("Unexpected response, missing headers") + raise repology.exceptions.RepologyUnexpectedResponse LOG.log(LOG_SPAM, headers) projects_table_rows = projects_table.tbody.find_all("tr") rows = 0 @@ -475,7 +485,7 @@ def _query_sbom_cdx(self, args): LOG.debug("Package: %s", cmp) if not cmp.name: LOG.fatal("Missing package name: %s", cmp) - sys.exit(1) + raise repology.exceptions.RepologyUnexpectedResponse pkg_id = f"{args.repository}:{cmp.name}" if pkg_id in self.processed: LOG.debug("Package '%s' in sbom already processed", cmp.name) @@ -513,8 +523,10 @@ def _query_sbom_cdx(self, args): self._packages_to_df(args, re_pkg_internal=cmp.name) self.urlq = self.url_projects - def query(self, args): + def query(self, args, stdout_report=True, file_report=True): """Query package information from repology.org""" + if not file_report: + args.out = None if args.pkg_search: self._query_pkg_search(args) elif args.pkg_exact: @@ -522,7 +534,8 @@ def query(self, args): elif args.sbom_cdx: self._query_sbom_cdx(args) self._packages_to_df(args, re_pkg_internal=args.pkg_exact) - self._report(args) + self._report(args, console_report=stdout_report) + return self.df.copy(deep=True) ################################################################################ @@ -555,8 +568,11 @@ def main(): """main entry point""" args = getargs() set_log_verbosity(args.verbose) - repology = Repology() - repology.query(args) + repology_cli = Repology() + try: + repology_cli.query(args) + except repology.exceptions.RepologyNoMatchingPackages: + LOG.warning("No matching packages found") ################################################################################ diff --git a/src/repology/repology_cve.py b/src/repology/repology_cve.py index 23c9418..194f6e5 100755 --- a/src/repology/repology_cve.py +++ b/src/repology/repology_cve.py @@ -20,6 +20,7 @@ import numpy as np import pandas as pd from tabulate import tabulate +import repology.exceptions from common.utils import ( LOG, LOG_SPAM, @@ -77,7 +78,7 @@ def _parse_cve_resp(resp, pkg_name, pkg_version): headers[header.text] = idx if not headers or "CVE ID" not in headers: LOG.fatal("Unexpected response") - sys.exit(1) + raise repology.exceptions.RepologyUnexpectedResponse LOG.log(LOG_SPAM, headers) cve_table_rows = cve_table.tbody.find_all("tr") cve_dict = {} @@ -118,7 +119,7 @@ def _is_affected(version, affected_ver_str): version_local = parse_version(version) if not version_local: LOG.fatal("Unexpected local version string: %s", version) - sys.exit(1) + raise repology.exceptions.RepologyError # Pad with spaces to simplify regexps affected_ver_str = f" {affected_ver_str} " # Match version group @@ -131,7 +132,7 @@ def _is_affected(version, affected_ver_str): for impacted_group in matches: if len(impacted_group) != 4: LOG.fatal("Unexpected version group: %s", affected_ver_str) - sys.exit(1) + raise repology.exceptions.RepologyUnexpectedResponse # impacted_group[0] = beg beg_ind = impacted_group[0] # impacted_group[1] = begver @@ -168,7 +169,7 @@ def _is_affected(version, affected_ver_str): def _report(df): - if df.empty: + if df is None or df.empty: LOG.warning("No matching vulnerabilities found") sys.exit(0) # Write the console report @@ -182,7 +183,11 @@ def _report(df): LOG.info("Repology affected CVE(s)\n\n%s\n\n", table) -def _query_cve(pkg_name, pkg_version): +def query_cve(pkg_name, pkg_version): + """ + Return vulnerabilities known to repology that impact the given package name + and version. Results are returned in pandas dataframe. + """ session = CachedLimiterSession(per_second=1, expire_after=7200) ua_product = "repology_cli/0" ua_comment = "(https://github.com/tiiuae/sbomnix/tree/main/scripts/repology)" @@ -190,12 +195,12 @@ def _query_cve(pkg_name, pkg_version): pkg = urllib.parse.quote(pkg_name) ver = urllib.parse.quote(pkg_version) query = f"https://repology.org/project/{pkg}/cves?version={ver}" - LOG.info("GET: %s", query) + LOG.debug("GET: %s", query) resp = session.get(query, headers=headers) LOG.debug("resp.status_code: %s", resp.status_code) if resp.status_code == 404: - LOG.fatal("Package '%s' not found", pkg_name) - sys.exit(1) + LOG.warning("Repology package '%s' not found", pkg_name) + return None resp.raise_for_status() return _parse_cve_resp(resp, pkg_name, pkg_version) @@ -207,7 +212,7 @@ def main(): """main entry point""" args = getargs() set_log_verbosity(args.verbose) - df = _query_cve(args.PKG_NAME, args.PKG_VERSION) + df = query_cve(args.PKG_NAME, args.PKG_VERSION) _report(df) df_to_csv_file(df, args.out) diff --git a/src/vulnxscan/vulnxscan_cli.py b/src/vulnxscan/vulnxscan_cli.py index 7bd62df..082dde2 100755 --- a/src/vulnxscan/vulnxscan_cli.py +++ b/src/vulnxscan/vulnxscan_cli.py @@ -34,6 +34,9 @@ from tabulate import tabulate from vulnxscan.osv import OSV from sbomnix.sbomdb import SbomDb +import repology.repology_cli +import repology.repology_cve +import repology.exceptions from common.utils import ( LOG, LOG_SPAM, @@ -396,7 +399,6 @@ class CachedLimiterSession(CacheMixin, LimiterMixin, Session): _repology_cve_dfs = {} _repology_cli_dfs = {} -_repology_nix_repo = "nix_unstable" # Rate-limited and cached session. For github api rate limits, see: # https://docs.github.com/en/rest/search?apiVersion=latest#rate-limit _session = CachedLimiterSession(per_minute=9, per_second=1, expire_after=7200) @@ -422,23 +424,25 @@ def _run_repology_cli(pname, match_type="--pkg_exact"): LOG.log(LOG_SPAM, "Using cached repology_cli results") df_repology_cli = _repology_cli_dfs[pname] else: - prefix = "repology_cli_" - suffix = ".csv" - with NamedTemporaryFile(delete=True, prefix=prefix, suffix=suffix) as f: - repo = f"--repository {_repology_nix_repo}" - status = "--re_status=outdated|newest|devel|unique" - out = f"--out={f.name}" - search = f"{match_type}={pname}" - cmd = f"repology_cli {repo} {status} {search} {out} " - ret = exec_cmd(cmd.split(), raise_on_error=False, return_error=True) - if ret and ret.stderr and "No matching packages" in ret.stderr: - return None - df_repology_cli = df_from_csv_file(f.name, exit_on_error=False) - if df_repology_cli is None: - return None - df_repology_cli = _select_newest(df_repology_cli) - _repology_cli_dfs[pname] = df_repology_cli - df_log(df_repology_cli, LOG_SPAM) + repology_cli = repology.repology_cli.Repology() + args = [] + args.append("--repository=nix_unstable") + args.append("--re_status=outdated|newest|devel|unique") + args.append(f"{match_type}={pname}") + try: + df_repology_cli = repology_cli.query( + repology.repology_cli.getargs(args), + stdout_report=False, + file_report=False, + ) + except repology.exceptions.RepologyNoMatchingPackages: + pass + if df_repology_cli is None or df_repology_cli.empty: + LOG.debug("No results from repology_cli") + return None + df_repology_cli = _select_newest(df_repology_cli) + _repology_cli_dfs[pname] = df_repology_cli + df_log(df_repology_cli, LOG_SPAM) return df_repology_cli @@ -553,16 +557,10 @@ def _pkg_is_vulnerable(repo_pkg_name, pkg_version, cve_id=None): LOG.log(LOG_SPAM, "Using cached repology_cve results") df = _repology_cve_dfs[key] else: - prefix = "repology_cve_" - suffix = ".csv" - with NamedTemporaryFile(delete=True, prefix=prefix, suffix=suffix) as f: - args = f"{repo_pkg_name} {pkg_version}" - cmd = f"repology_cve --out={f.name} {args}" - exec_cmd(cmd.split(), raise_on_error=False) - df = df_from_csv_file(f.name, exit_on_error=False) - if df is None: - df = pd.DataFrame() - df_log(df, LOG_SPAM) + df = repology.repology_cve.query_cve(str(repo_pkg_name), str(pkg_version)) + if df is None: + df = pd.DataFrame() + df_log(df, LOG_SPAM) _repology_cve_dfs[key] = df if cve_id and not df.empty: df = df[df["cve"] == cve_id]