From 004989b55e598041292258c06b101f5aabdf218c Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Thu, 26 Sep 2024 16:01:34 +1000 Subject: [PATCH 1/6] set rest last_update_date from response --- docker/importer/importer.py | 42 ++++++++++++++++----------- docker/importer/importer_test.py | 17 +++++++---- docker/mock_test/mock_test_handler.py | 2 +- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/docker/importer/importer.py b/docker/importer/importer.py index 4db19d77912..cb568b3387c 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -704,35 +704,44 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): """Process updates from REST API.""" logging.info('Begin processing REST: %s', source_repo.name) - ignore_last_import_time = ( - source_repo.ignore_last_import_time or not source_repo.last_update_date) - if ignore_last_import_time: + last_update_date = source_repo.last_update_date or datetime.datetime.min + if source_repo.ignore_last_import_time: + last_update_date = datetime.datetime.min source_repo.ignore_last_import_time = False source_repo.put() - import_time_now = utcnow() + request = requests.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) if request.status_code != 200: logging.error('Failed to fetch REST API: %s', request.status_code) return - if 'Last-Modified' in request.headers: - last_modified = datetime.datetime.strptime( - request.headers['Last-Modified'], _HTTP_LAST_MODIFIED_FORMAT) - # Check whether endpoint has been modified since last update - if not ignore_last_import_time and (last_modified - < source_repo.last_update_date): - logging.info('No changes since last update.') - return + request_last_modified = None + if last_modified := request.headers.get('Last-Modified'): + try: + request_last_modified = datetime.datetime.strptime( + last_modified, _HTTP_LAST_MODIFIED_FORMAT) + # Check whether endpoint has been modified since last update + if request_last_modified <= last_update_date: + logging.info('No changes since last update.') + return + except ValueError: + logging.error('Invalid Last-Modified header: "%s"', last_modified) request = requests.get(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) # Parse vulns into Vulnerability objects from the REST API request. vulns = osv.parse_vulnerabilities_from_data( request.text, source_repo.extension, strict=self._strict_validation) + + vulns_last_modified = datetime.datetime.min # Create tasks for changed files. for vuln in vulns: import_failure_logs = [] - if not ignore_last_import_time and vuln.modified.ToDatetime( - ) < source_repo.last_update_date: + vuln_modified = vuln.modified.ToDatetime() + if request_last_modified and vuln_modified > request_last_modified: + logging.warning('%s was modified (%s) after Last-Modified header (%s)', + vuln.id, vuln_modified, request_last_modified) + vulns_last_modified = max(vulns_last_modified, vuln_modified) + if vuln_modified <= last_update_date: continue try: # TODO(jesslowe): Use a ThreadPoolExecutor to parallelize this @@ -754,14 +763,13 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): except Exception as e: logging.exception('Failed to parse %s: error type: %s, details: %s', vuln.id, e.__class__.__name__, e) - import_failure_logs.append('Failed to parse vulnerability "' + vuln.id + - '"') + import_failure_logs.append(f'Failed to parse vulnerability "{vuln.id}"') continue replace_importer_log(storage.Client(), source_repo.name, self._public_log_bucket, import_failure_logs) - source_repo.last_update_date = import_time_now + source_repo.last_update_date = request_last_modified or vulns_last_modified source_repo.put() logging.info('Finished processing REST: %s', source_repo.name) diff --git a/docker/importer/importer_test.py b/docker/importer/importer_test.py index 32c748c957a..4be35de3be2 100644 --- a/docker/importer/importer_test.py +++ b/docker/importer/importer_test.py @@ -849,17 +849,19 @@ def test_all_updated(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing basic rest endpoint import""" data_handler = MockDataHandler + data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) thread = threading.Thread(target=self.httpd.serve_forever) thread.start() self.source_repo.last_update_date = datetime.datetime(2020, 1, 1) - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) + self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -867,18 +869,20 @@ def test_last_update_ignored(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing last update ignored""" data_handler = MockDataHandler + data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) thread = threading.Thread(target=self.httpd.serve_forever) thread.start() self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) self.source_repo.ignore_last_import_time = True - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) + self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -889,8 +893,8 @@ def test_no_updates(self, unused_mock_time: mock.MagicMock, self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) thread = threading.Thread(target=self.httpd.serve_forever) thread.start() - self.source_repo.last_update_date = datetime.datetime(2024, 1, 1) - self.source_repo.put() + self.source_repo.last_update_date = datetime.datetime(2024, 2, 1) + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', True, False) @@ -898,6 +902,7 @@ def test_no_updates(self, unused_mock_time: mock.MagicMock, imp.run() mock_publish.assert_not_called() self.assertIn('INFO:root:No changes since last update.', logs.output[1]) + self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 2, 1)) @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -905,11 +910,12 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing from date between entries - only entries after 6/6/2023 should be called""" + MockDataHandler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) thread = threading.Thread(target=self.httpd.serve_forever) thread.start() self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) - self.source_repo.put() + repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) @@ -976,6 +982,7 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, deleted='false', req_timestamp='12345') ]) + self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) @mock.patch('importer.utcnow', lambda: datetime.datetime(2024, 1, 1)) diff --git a/docker/mock_test/mock_test_handler.py b/docker/mock_test/mock_test_handler.py index 7c9ca954da1..a6c41f7d974 100644 --- a/docker/mock_test/mock_test_handler.py +++ b/docker/mock_test/mock_test_handler.py @@ -8,7 +8,7 @@ class MockDataHandler(http.server.BaseHTTPRequestHandler): """Mock data handler for testing.""" - last_modified = 'Tue, 13 Jun 2023 00:00:00 GMT' + last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' file_path = 'rest_test.json' cve_count = -1 data = None From 10a81229e0909dea1fe4c7b6c85a888120817d6a Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Thu, 26 Sep 2024 16:26:00 +1000 Subject: [PATCH 2/6] small logic fix --- docker/importer/importer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/importer/importer.py b/docker/importer/importer.py index cb568b3387c..8ab34ce37db 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -732,7 +732,7 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): vulns = osv.parse_vulnerabilities_from_data( request.text, source_repo.extension, strict=self._strict_validation) - vulns_last_modified = datetime.datetime.min + vulns_last_modified = last_update_date # Create tasks for changed files. for vuln in vulns: import_failure_logs = [] From 9ce7a86c2256ff17c84cdd21959c6e8bb4f89282 Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Fri, 27 Sep 2024 14:44:43 +1000 Subject: [PATCH 3/6] assertion messages --- docker/importer/importer_test.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/docker/importer/importer_test.py b/docker/importer/importer_test.py index 4be35de3be2..2554cd0f994 100644 --- a/docker/importer/importer_test.py +++ b/docker/importer/importer_test.py @@ -861,7 +861,10 @@ def test_all_updated(self, unused_mock_time: mock.MagicMock, False, False) imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) - self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -882,7 +885,10 @@ def test_last_update_ignored(self, unused_mock_time: mock.MagicMock, False, False) imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) - self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -902,7 +908,10 @@ def test_no_updates(self, unused_mock_time: mock.MagicMock, imp.run() mock_publish.assert_not_called() self.assertIn('INFO:root:No changes since last update.', logs.output[1]) - self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 2, 1)) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 2, 1), + msg='last_update_date should not have been updated') @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -982,7 +991,10 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, deleted='false', req_timestamp='12345') ]) - self.assertEqual(repo.get().last_update_date, datetime.datetime(2024, 1, 1)) + self.assertEqual( + repo.get().last_update_date, + datetime.datetime(2024, 1, 1), + msg='Expected last_update_date to equal REST Last-Modified date') @mock.patch('importer.utcnow', lambda: datetime.datetime(2024, 1, 1)) From 2841c894b9213bdada5469c8ca1349aefbf547f5 Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Fri, 27 Sep 2024 15:19:29 +1000 Subject: [PATCH 4/6] docstring --- docker/importer/importer.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/docker/importer/importer.py b/docker/importer/importer.py index 8ab34ce37db..823a7ec3ecd 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -701,7 +701,18 @@ def _process_deletions_bucket(self, self._public_log_bucket, import_failure_logs) def _process_updates_rest(self, source_repo: osv.SourceRepository): - """Process updates from REST API.""" + """Process updates from REST API. + + To find new updates, first makes a HEAD request to check the 'Last-Modified' + header, and skips processing if it's before the source's last_modified_date + (and ignore_last_import_time isn't set). + + Otherwise, GETs the list of vulnerabilities and requests updates for + vulnerabilities modified after last_modified_date. + + last_modified_date is updated to the HEAD's 'Last-Modified' time, or the + latest vulnerability's modified date if 'Last-Modified' was missing/invalid. + """ logging.info('Begin processing REST: %s', source_repo.name) last_update_date = source_repo.last_update_date or datetime.datetime.min From 085e16141dc9abf0db2a8cc1264f54a7c6c08f69 Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Fri, 27 Sep 2024 17:35:39 +1000 Subject: [PATCH 5/6] Give retries a go --- docker/importer/importer.py | 23 ++++++++++++++--- docker/importer/importer_test.py | 44 +++++++++++++++++++------------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/docker/importer/importer.py b/docker/importer/importer.py index 823a7ec3ecd..46a63dd8f97 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -25,9 +25,11 @@ import logging import os import requests +from requests.adapters import HTTPAdapter import shutil import threading import time +from urllib3.util import Retry import atexit from typing import List, Tuple, Optional @@ -721,7 +723,18 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): source_repo.ignore_last_import_time = False source_repo.put() - request = requests.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + s = requests.Session() + adapter = HTTPAdapter( + max_retries=Retry( + total=3, status_forcelist=[502, 503, 504], backoff_factor=1)) + s.mount(source_repo.rest_api_url, adapter) + s.mount(source_repo.link, adapter) + + try: + request = s.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + except Exception: + logging.exception('Exception querying REST API:') + return if request.status_code != 200: logging.error('Failed to fetch REST API: %s', request.status_code) return @@ -738,7 +751,11 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): except ValueError: logging.error('Invalid Last-Modified header: "%s"', last_modified) - request = requests.get(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + try: + request = s.get(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS) + except Exception: + logging.exception('Exception querying REST API:') + return # Parse vulns into Vulnerability objects from the REST API request. vulns = osv.parse_vulnerabilities_from_data( request.text, source_repo.extension, strict=self._strict_validation) @@ -756,7 +773,7 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): continue try: # TODO(jesslowe): Use a ThreadPoolExecutor to parallelize this - single_vuln = requests.get( + single_vuln = s.get( source_repo.link + vuln.id + source_repo.extension, timeout=_TIMEOUT_SECONDS) # Validate the individual request diff --git a/docker/importer/importer_test.py b/docker/importer/importer_test.py index 2554cd0f994..c6d570524c1 100644 --- a/docker/importer/importer_test.py +++ b/docker/importer/importer_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Importer tests.""" +import contextlib import datetime import os import shutil @@ -22,6 +23,7 @@ import threading from unittest import mock +from urllib3.exceptions import SystemTimeWarning import warnings from google.cloud import ndb @@ -66,6 +68,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) self.mock_repo = tests.mock_repository(self) storage_patcher = mock.patch('google.cloud.storage.Client') @@ -407,6 +410,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) self.source_repo = osv.SourceRepository( type=osv.SourceRepositoryType.BUCKET, @@ -821,7 +825,7 @@ def setUp(self): self.tmp_dir = tempfile.mkdtemp() tests.mock_datetime(self) - warnings.filterwarnings("ignore", "unclosed", ResourceWarning) + warnings.filterwarnings('ignore', category=SystemTimeWarning) storage_patcher = mock.patch('google.cloud.storage.Client') self.addCleanup(storage_patcher.stop) @@ -841,7 +845,19 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmp_dir, ignore_errors=True) - self.httpd.shutdown() + + @contextlib.contextmanager + def server(self, handler_class): + """REST mock server context manager.""" + httpd = http.server.HTTPServer(SERVER_ADDRESS, handler_class) + thread = threading.Thread(target=httpd.serve_forever) + thread.start() + try: + yield httpd + finally: + httpd.shutdown() + httpd.server_close() + thread.join() @mock.patch('google.cloud.pubsub_v1.PublisherClient.publish') @mock.patch('time.time', return_value=12345.0) @@ -851,15 +867,13 @@ def test_all_updated(self, unused_mock_time: mock.MagicMock, data_handler = MockDataHandler data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2020, 1, 1) repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(data_handler): + imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) self.assertEqual( repo.get().last_update_date, @@ -874,16 +888,14 @@ def test_last_update_ignored(self, unused_mock_time: mock.MagicMock, data_handler = MockDataHandler data_handler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' data_handler.load_file(data_handler, 'rest_test.json') - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, data_handler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) self.source_repo.ignore_last_import_time = True repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(data_handler): + imp.run() self.assertEqual(mock_publish.call_count, data_handler.cve_count) self.assertEqual( repo.get().last_update_date, @@ -896,15 +908,12 @@ def test_no_updates(self, unused_mock_time: mock.MagicMock, mock_publish: mock.MagicMock): """Testing none last modified""" MockDataHandler.last_modified = 'Fri, 01 Jan 2021 00:00:00 GMT' - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2024, 2, 1) repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', True, False) - with self.assertLogs() as logs: + with self.assertLogs() as logs, self.server(MockDataHandler): imp.run() mock_publish.assert_not_called() self.assertIn('INFO:root:No changes since last update.', logs.output[1]) @@ -920,15 +929,13 @@ def test_few_updates(self, unused_mock_time: mock.MagicMock, """Testing from date between entries - only entries after 6/6/2023 should be called""" MockDataHandler.last_modified = 'Mon, 01 Jan 2024 00:00:00 GMT' - self.httpd = http.server.HTTPServer(SERVER_ADDRESS, MockDataHandler) - thread = threading.Thread(target=self.httpd.serve_forever) - thread.start() self.source_repo.last_update_date = datetime.datetime(2023, 6, 6) repo = self.source_repo.put() imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir, importer.DEFAULT_PUBLIC_LOGGING_BUCKET, 'bucket', False, False) - imp.run() + with self.server(MockDataHandler): + imp.run() mock_publish.assert_has_calls([ mock.call( self.tasks_topic, @@ -1005,6 +1012,7 @@ def setUp(self): tests.reset_emulator() tests.mock_datetime(self) + warnings.filterwarnings('ignore', category=SystemTimeWarning) def test_add_finding(self): """Test that creating an import finding works.""" From 65d911a77093f0e33f73cd261e754112d094dd1f Mon Sep 17 00:00:00 2001 From: Michael Kedar Date: Mon, 30 Sep 2024 13:23:18 +1000 Subject: [PATCH 6/6] mount retry adapter for all http(s) --- docker/importer/importer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/importer/importer.py b/docker/importer/importer.py index 46a63dd8f97..6da950e8f79 100755 --- a/docker/importer/importer.py +++ b/docker/importer/importer.py @@ -727,8 +727,8 @@ def _process_updates_rest(self, source_repo: osv.SourceRepository): adapter = HTTPAdapter( max_retries=Retry( total=3, status_forcelist=[502, 503, 504], backoff_factor=1)) - s.mount(source_repo.rest_api_url, adapter) - s.mount(source_repo.link, adapter) + s.mount('http://', adapter) + s.mount('https://', adapter) try: request = s.head(source_repo.rest_api_url, timeout=_TIMEOUT_SECONDS)