Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RHEL Provider: Hydra and CSAF as data sources #772

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pytest-snapshot = "^0.9.0"
mashumaro = "^3.10"
iso8601 = "^2.1.0"
zstandard = ">=0.22,<0.24"
packageurl-python = "^0.16.0"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.2.2,<9.0.0"
Expand Down
4 changes: 3 additions & 1 deletion src/vunnel/providers/rhel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ class Config:
)
request_timeout: int = 125
parallelism: int = 4
full_sync_interval: int = 2
full_sync_interval: int = 2 # in days
skip_namespaces: list[str] = field(default_factory=lambda: ["rhel:3", "rhel:4"])
rhsa_source: str = "CSAF" # "CSAF" or "OVAL"


class Provider(provider.Provider):
Expand All @@ -43,6 +44,7 @@ def __init__(self, root: str, config: Config | None = None):
download_timeout=self.config.request_timeout,
max_workers=self.config.parallelism,
full_sync_interval=self.config.full_sync_interval,
rhsa_provider_type=self.config.rhsa_source,
skip_namespaces=self.config.skip_namespaces,
logger=self.logger,
)
Expand Down
167 changes: 167 additions & 0 deletions src/vunnel/providers/rhel/csaf_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import concurrent.futures
import contextlib
import csv
import functools
import logging
import os
from datetime import UTC, datetime

from vunnel.utils import http
from vunnel.utils.archive import extract
from vunnel.utils.csaf_types import CSAFDoc
from vunnel.utils.csaf_types import from_path as csaf_from_path
from vunnel.workspace import Workspace

RH_URL_PREFIX = "https://access.redhat.com/errata/"


class RedHatAdvisoryID:
def __init__(self, rhsa: str):
rhsa = rhsa.upper()
rhsa = rhsa.removeprefix(RH_URL_PREFIX)
if "-" and ":" in rhsa:
self.year = rhsa.split("-")[1].split(":")[0]
self.rhsa = rhsa
else:
raise ValueError(f"Invalid RHSA ID: {rhsa}, please provide like RHSA-2021:1234")

def advisory_url(self) -> str:
return f"{RH_URL_PREFIX}{self.rhsa}"

def advisory_year(self) -> str:
return self.year

def advisory_id(self) -> str:
return self.rhsa

def advisory_kind(self) -> str:
if self.rhsa.startswith("RHSA"):
return "security"
if self.rhsa.startswith("RHBA"):
return "bugfix"
if self.rhsa.startswith("RHEA"):
return "enhancement"
return "unknown"


class CSAFClient:
def __init__(self, workspace: Workspace, latest_url: str, logger: logging.Logger):
self.workspace = workspace
self.latest_url = latest_url
self.latest_filename = "archive_latest.txt"
self.latest_archive_url: str | None = None
self.archive_date: datetime | None = None
self.logger = logger
# self.csaf_path = os.path.join(self.workspace.input_path, "csaf")
self.advisories_path = os.path.join(self.workspace.input_path, "advisories")
self._download_and_update_archive()

def _changes_url(self) -> str:
return self.latest_url.replace(self.latest_filename, "changes.csv")

def _deletions_url(self) -> str:
return self.latest_url.replace(self.latest_filename, "deletions.csv")

def _archive_url(self) -> str:
if not self.latest_archive_url:
latest_resp = http.get(self.latest_url, logger=self.logger)
latest_name = latest_resp.text.strip()
self.latest_archive_url = self.latest_url.replace(self.latest_filename, latest_name)
date_part = latest_name.removeprefix("csaf_advisories_").removesuffix(".tar.zst")
self.archive_date = datetime.strptime(date_part, "%Y-%m-%d").replace(tzinfo=UTC)
return self.latest_archive_url

def _local_archive_path(self) -> str:
return os.path.join(self.workspace.input_path, self._archive_url().split("/")[-1])

def _local_changes_path(self) -> str:
return os.path.join(self.workspace.input_path, "changes.csv")

def _local_deletions_path(self) -> str:
return os.path.join(self.workspace.input_path, "deletions.csv")

def _download_stream(self, url: str, path: str) -> None:
with http.get(url, logger=self.logger, stream=True) as response, open(path, "wb") as fh:
for chunk in response.iter_content(chunk_size=65536): # 64k chunks
if chunk:
fh.write(chunk)

def process_changes_and_deletions(self) -> None:
"""process the changes and deletions. deletions.csv is the list of CSAF JSON
files that have been deleted. Download it and loop over it, deleting all
referenced files. changes.csv is a date-sorted list of when each CSAF JSON
file changed. Download it, and loop over the rows, until we get back to the
date of the archive, keeping a list of unique files, to get the set of files
that have changed since the archive was published. Re-download all of them, over-writing
whatever data was in the archive."""
changes_path = self._local_changes_path()
deletions_path = self._local_deletions_path()
with open(deletions_path, newline="") as fh:
reader = csv.reader(fh)
for row in reader:
deleted_fragment = row[0]
# suppress FileNotFound because deleting the same file twice
# should no-op rather than raise an error
with contextlib.suppress(FileNotFoundError):
os.remove(os.path.join(self.advisories_path, deleted_fragment))
seen_files = set()
with open(changes_path, newline="") as fh:
reader = csv.reader(fh)
for row in reader:
# row is like "2021/cve-2021-47265.json","2024-11-08T18:28:22+00:00"
changed_file = row[0]
date_str = row[1]
change_date = datetime.fromisoformat(date_str)
if self.archive_date and change_date < self.archive_date:
break
if changed_file in seen_files:
continue
seen_files.add(changed_file)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
futures = {
executor.submit(
self._download_stream,
url=self.latest_url.replace("archive_latest.txt", changed_file),
path=os.path.join(self.advisories_path, changed_file),
): changed_file
for changed_file in seen_files
}
concurrent.futures.wait(futures.keys())
for future, changed_file in futures.items():
if future.exception() is not None:
self.logger.warning(f"Failed to download {changed_file}: {future.exception()}")

def _download_and_update_archive(self) -> None:
if not self.latest_archive_url:
self.latest_archive_url = self._archive_url()
archive_path = self._local_archive_path()
print(f"archive_path: {archive_path}")
if not os.path.exists(self.advisories_path):
os.makedirs(self.advisories_path)
# if there's a new one, the paths won't match and we need to download it
if not os.path.exists(archive_path):
self._download_stream(self._archive_url(), archive_path)
extract(archive_path, self.advisories_path)
# always download and process changes and deletions
# because this is how the API tells us about updates between
# publicatins of the main archive.
self._download_stream(self._changes_url(), os.path.join(self.workspace.input_path, "changes.csv"))
self._download_stream(self._deletions_url(), os.path.join(self.workspace.input_path, "deletions.csv"))
self.process_changes_and_deletions()

def path_from_rhsa_id(self, rhsa_id: RedHatAdvisoryID) -> str:
# https://security.access.redhat.com/data/csaf/v2/advisories/2024/rhba-2024_0599.json
return os.path.join(self.advisories_path, rhsa_id.advisory_year(), rhsa_id.advisory_id().lower().replace(":", "_") + ".json")

def csaf_doc_for_rhsa(self, rhsa: str) -> CSAFDoc | None:
"""Get the CSAF document for a given RHSA ID"""
doc_path = self.path_from_rhsa_id(RedHatAdvisoryID(rhsa))
if os.path.exists(doc_path):
return _csaf_doc_from_path(doc_path)
self.logger.info(f"CSAF document not found for {rhsa}")
return None


@functools.lru_cache(maxsize=1024)
def _csaf_doc_from_path(doc_path: str) -> CSAFDoc:
return csaf_from_path(doc_path)
111 changes: 111 additions & 0 deletions src/vunnel/providers/rhel/csaf_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import os

from packageurl import PackageURL

from vunnel.providers.rhel.csaf_client import CSAFClient
from vunnel.utils.csaf_types import CSAFDoc
from vunnel.workspace import Workspace

ADVISORIES_LATEST_URL = "https://security.access.redhat.com/data/csaf/v2/advisories/archive_latest.txt"


class CSAFParser:
def __init__(self, workspace: Workspace, logger: logging.Logger | None = None, download_timeout: int = 125):
self.download_timeout = download_timeout
self.workspace = workspace

if not logger:
logger = logging.getLogger(self.__class__.__name__)

self.logger = logger
self.advisory_download_path = os.path.join(self.workspace.input_path, "advisory_archive.tar.zst")
self.advisories_path = os.path.join(self.workspace.input_path, "advisories")
self._urls: set[str] = set()
self.csaf_client = CSAFClient(self.workspace, ADVISORIES_LATEST_URL, self.logger)

@property
def urls(self) -> list[str]:
return list(self._urls)

def platform_module_name_version_from_fpi(self, doc: CSAFDoc, fpi: str) -> tuple[str | None, str | None, str | None, str | None]:
module = None
plat_or_module = doc.product_tree.parent(fpi)
if not plat_or_module:
return None, None, None, None
plat = doc.product_tree.parent(plat_or_module)
if plat:
module = plat_or_module.removeprefix(f"{plat}:")
package = fpi.removeprefix(f"{plat}:{module}:")
else:
plat = plat_or_module
package = fpi.removeprefix(f"{plat}:")

version = None
name = None
purl = doc.product_tree.purl_for_product_id(package)

if purl:
if purl.startswith("pkg:rpmmod"):
return None, None, None, None
parsed_purl = PackageURL.from_string(purl)
epoch = parsed_purl.qualifiers.get("epoch", "0") if isinstance(parsed_purl.qualifiers, dict) else "0"
version = f"{epoch}:{parsed_purl.version}"
name = parsed_purl.name
else:
self.logger.trace(f"no purl for {package} from {fpi}") # type: ignore[attr-defined]

platform_product_node = next((p for p in doc.product_tree.branches[0].product_name_branches() if p.product_id() == plat), None)
platform_cpe = platform_product_node.cpe() if platform_product_node else None
if not platform_cpe:
self.logger.trace(f"no platform cpe for {plat} from {fpi}") # type: ignore[attr-defined]
return None, None, None, None

if module:
mod_purl = doc.product_tree.purl_for_product_id(module)
if mod_purl:
parsed_mod_purl = PackageURL.from_string(mod_purl)
mod_version = parsed_mod_purl.version or ""
if mod_version and ":" in mod_version:
mod_version = mod_version.split(":")[0]
module = f"{parsed_mod_purl.name}:{mod_version}"
self.logger.trace(f"module: {module} for {fpi} by {mod_purl}") # type: ignore[attr-defined]
else:
self.logger.trace(f"no module purl for {module} from {fpi}") # type: ignore[attr-defined]

return platform_cpe, module, name, version

def get_fix_info(self, cve_id: str, ar: dict[str, str | None], normalized_pkg_name: str | None) -> tuple[str | None, str | None]: # noqa: PLR0911
"""
Get fix information for an RHSA from the CSAF data.

The `ar` dict is expected to have the following
"""
fix_id = ar.get("advisory") or ar.get("rhsa_id")
if not fix_id:
print("no advisory")
return None, None
doc = self.csaf_client.csaf_doc_for_rhsa(fix_id)
if not doc:
return None, None
vuln = next((v for v in doc.vulnerabilities if v.cve == cve_id), None)
if not vuln:
self.logger.trace(f"{cve_id}: {fix_id} CSAF doc does not claim to fix this CVE") # type: ignore[attr-defined]
return None, None
remediation = next((r for r in vuln.remediations if r.category == "vendor_fix" and r.url and r.url.endswith(fix_id)), None)
if not remediation:
self.logger.trace(f"{cve_id} no remediation obj for {fix_id}") # type: ignore[attr-defined]
return None, None
candidate_full_product_ids = remediation.product_ids
ar_plat_cpe = ar.get("cpe") or ar.get("platform_cpe")
if not ar_plat_cpe:
print("no platform cpe")
return None, None
self.logger.trace(f"{cve_id} searching {fix_id} based on {ar_plat_cpe} and {normalized_pkg_name}") # type: ignore[attr-defined]
for fpi in candidate_full_product_ids:
plat, module, name, version = self.platform_module_name_version_from_fpi(doc, fpi)
if name == normalized_pkg_name and plat and plat.startswith(ar_plat_cpe):
self.logger.trace(f"found match for {fpi}, {name}, {plat} against {ar_plat_cpe}: {normalized_pkg_name}") # type: ignore[attr-defined]
return version, module
self.logger.trace(f"{cve_id} no match for {fix_id} against {ar_plat_cpe}: {normalized_pkg_name}") # type: ignore[attr-defined]
return None, None
Loading
Loading