From a2620d5dc9a352deaac712766ee701b676318572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Wed, 27 Nov 2024 10:16:55 +0100 Subject: [PATCH 1/2] fix: stream cve data during cve_lookup install in order to fix RAM issues on the CI --- src/install/requirements_pre_install.txt | 2 + src/plugins/analysis/cve_lookup/install.py | 3 +- .../cve_lookup/internal/data_parsing.py | 37 ++++++++++++------- .../cve_lookup/internal/database/db_setup.py | 21 ++++++----- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/src/install/requirements_pre_install.txt b/src/install/requirements_pre_install.txt index 3c251b550..fbc78ee1d 100644 --- a/src/install/requirements_pre_install.txt +++ b/src/install/requirements_pre_install.txt @@ -9,6 +9,8 @@ requests==2.32.2 pydantic==2.4.0 werkzeug~=3.0.3 toml==0.10.2 +# needed during installation of cve_lookup plugin +ijson==3.3.0 git+https://github.com/fkie-cad/common_helper_files.git diff --git a/src/plugins/analysis/cve_lookup/install.py b/src/plugins/analysis/cve_lookup/install.py index 351f53688..e26ea7398 100755 --- a/src/plugins/analysis/cve_lookup/install.py +++ b/src/plugins/analysis/cve_lookup/install.py @@ -30,11 +30,10 @@ def install_files(self): Install files for the CVE lookup plugin. """ os.chdir('internal') - cve_list = parse_data() connection = DbConnection() connection.drop_tables() db = DbSetup(connection) - db.add_cve_items(cve_list) + db.add_cve_items(parse_data()) os.chdir(self.base_path) diff --git a/src/plugins/analysis/cve_lookup/internal/data_parsing.py b/src/plugins/analysis/cve_lookup/internal/data_parsing.py index c743226a9..15e48f181 100644 --- a/src/plugins/analysis/cve_lookup/internal/data_parsing.py +++ b/src/plugins/analysis/cve_lookup/internal/data_parsing.py @@ -1,35 +1,40 @@ from __future__ import annotations -import json -import lzma import re -from typing import TYPE_CHECKING +from pathlib import Path +from shlex import split +from subprocess import run +import ijson import requests from requests.adapters import HTTPAdapter, Retry -if TYPE_CHECKING: - from requests.models import Response - from ..internal.helper_functions import CveEntry FILE_NAME = 'CVE-all.json.xz' CVE_URL = f'https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download/{FILE_NAME}' +DB_DIR = Path(__file__).parent / 'database' +OUTPUT_FILE = DB_DIR / FILE_NAME -def _retrieve_url(download_url: str) -> Response: +def _retrieve_url(download_url: str, target: Path): adapter = HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.1)) with requests.Session() as session: session.mount('http://', adapter) - return session.get(download_url) + with requests.get(download_url, stream=True) as request: + request.raise_for_status() + with target.open('wb') as fp: + for chunk in request.iter_content(chunk_size=65_536): + fp.write(chunk) -def download_and_decompress_data() -> bytes: +def download_and_decompress_file() -> Path: """ - Downloads data from a URL, saves it to a file, decompresses it, and returns the decompressed data. + Downloads data from a URL, saves it to a file, decompresses it, and returns the path. """ - response = _retrieve_url(CVE_URL) - return lzma.decompress(response.content) + _retrieve_url(CVE_URL, OUTPUT_FILE) + run(split(f'unxz --force {OUTPUT_FILE.name}'), cwd=DB_DIR, check=True) + return DB_DIR / OUTPUT_FILE.stem # the .xz suffix was removed during extraction def extract_english_summary(descriptions: list) -> str: @@ -87,8 +92,12 @@ def parse_data() -> list[CveEntry]: """ Parse the data from the JSON file and return a list of CveEntry objects. """ - cve_json = json.loads(download_and_decompress_data()) - return [extract_data_from_cve(cve_item) for cve_item in cve_json.get('cve_items', [])] + cve_path = download_and_decompress_file() + with cve_path.open('rb') as fp: + # the file is huge, so we use ijson to stream the data + for cve_item in ijson.items(fp, 'cve_items.item'): + yield extract_data_from_cve(cve_item) + cve_path.unlink() # remove the temporary file after we are done if __name__ == '__main__': diff --git a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py index 84a1d2125..fe2364645 100644 --- a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py +++ b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py @@ -51,20 +51,18 @@ def create_cpe(self, cpe_id: str): update=update, ) - def add_cve_items(self, cve_list: list[CveEntry]): + def add_cve_items(self, cve_list: list[CveEntry], chunk_size: int = 2**12): """ Add CVE items to the database. """ existing_cve_ids = set() existing_cpe_ids = set() - cves = [] - associations = [] - cpes = [] + db_objects = [] for cve_item in cve_list: if cve_item.cve_id not in existing_cve_ids: - cves.append(self.create_cve(cve_item)) + db_objects.append(self.create_cve(cve_item)) for cpe_entry in cve_item.cpe_entries: ( cpe_id, @@ -74,9 +72,9 @@ def add_cve_items(self, cve_list: list[CveEntry]): version_end_excluding, ) = cpe_entry if cpe_id not in existing_cpe_ids: - cpes.append(self.create_cpe(cpe_id)) + db_objects.append(self.create_cpe(cpe_id)) existing_cpe_ids.add(cpe_id) - associations.append( + db_objects.append( Association( cve_id=cve_item.cve_id, cpe_id=cpe_id, @@ -87,5 +85,10 @@ def add_cve_items(self, cve_list: list[CveEntry]): ) ) existing_cve_ids.add(cve_item.cve_id) - self.session.bulk_save_objects(cves + associations + cpes) - self.session.commit() + if len(db_objects) >= chunk_size: + self.session.bulk_save_objects(db_objects) + self.session.commit() + db_objects.clear() + if db_objects: + self.session.bulk_save_objects(db_objects) + self.session.commit() From 573d6be393274eda32249124d7ca972d65e9592b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Wed, 27 Nov 2024 11:20:24 +0100 Subject: [PATCH 2/2] chore: refactor bulky add_cve_items method which got even larger because of the RAM fix --- .../cve_lookup/internal/database/db_setup.py | 83 ++++++++++--------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py index fe2364645..c3912f486 100644 --- a/src/plugins/analysis/cve_lookup/internal/database/db_setup.py +++ b/src/plugins/analysis/cve_lookup/internal/database/db_setup.py @@ -2,10 +2,10 @@ import re from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable from ..helper_functions import CveEntry, replace_wildcards -from .schema import Association, Cpe, Cve +from .schema import Association, Base, Cpe, Cve if TYPE_CHECKING: from .db_connection import DbConnection @@ -20,6 +20,8 @@ def __init__(self, connection: DbConnection): self.connection = connection self.connection.create_tables() self.session = self.connection.create_session() + self.existing_cve_ids = set() + self.existing_cpe_ids = set() def create_cve(self, cve_item: CveEntry) -> Cve: """ @@ -51,44 +53,51 @@ def create_cpe(self, cpe_id: str): update=update, ) - def add_cve_items(self, cve_list: list[CveEntry], chunk_size: int = 2**12): + def create_association(self, cve_id: str, cpe_entry: tuple[str, str, str, str, str]) -> Association: """ - Add CVE items to the database. + Create an Association object from a CVE ID and a CPE entry. """ - existing_cve_ids = set() - existing_cpe_ids = set() + ( + cpe_id, + version_start_including, + version_start_excluding, + version_end_including, + version_end_excluding, + ) = cpe_entry + return Association( + cve_id=cve_id, + cpe_id=cpe_id, + version_start_including=version_start_including, + version_start_excluding=version_start_excluding, + version_end_including=version_end_including, + version_end_excluding=version_end_excluding, + ) - db_objects = [] + def add_cve_items(self, cve_list: Iterable[CveEntry], chunk_size: int = 2**12): + """ + Add CVE items to the database chunk-wise. + """ + + db_objects: list[Base] = [] for cve_item in cve_list: - if cve_item.cve_id not in existing_cve_ids: - db_objects.append(self.create_cve(cve_item)) - for cpe_entry in cve_item.cpe_entries: - ( - cpe_id, - version_start_including, - version_start_excluding, - version_end_including, - version_end_excluding, - ) = cpe_entry - if cpe_id not in existing_cpe_ids: - db_objects.append(self.create_cpe(cpe_id)) - existing_cpe_ids.add(cpe_id) - db_objects.append( - Association( - cve_id=cve_item.cve_id, - cpe_id=cpe_id, - version_start_including=version_start_including, - version_start_excluding=version_start_excluding, - version_end_including=version_end_including, - version_end_excluding=version_end_excluding, - ) - ) - existing_cve_ids.add(cve_item.cve_id) - if len(db_objects) >= chunk_size: - self.session.bulk_save_objects(db_objects) - self.session.commit() - db_objects.clear() + if cve_item.cve_id not in self.existing_cve_ids: + db_objects.extend(self._create_db_objects_for_cve(cve_item)) + if len(db_objects) >= chunk_size: + self._save_objects(db_objects) + db_objects.clear() if db_objects: - self.session.bulk_save_objects(db_objects) - self.session.commit() + self._save_objects(db_objects) + + def _create_db_objects_for_cve(self, cve_item: CveEntry) -> Iterable[Base]: + yield self.create_cve(cve_item) + for cpe_entry in cve_item.cpe_entries: + if (cpe_id := cpe_entry[0]) not in self.existing_cpe_ids: + yield self.create_cpe(cpe_id) + self.existing_cpe_ids.add(cpe_id) + yield self.create_association(cve_item.cve_id, cpe_entry) + self.existing_cve_ids.add(cve_item.cve_id) + + def _save_objects(self, objects: list[Base]): + self.session.bulk_save_objects(objects) + self.session.commit()