From 1e8e89775282aed1e51f87910ca02bb01a4ce76f Mon Sep 17 00:00:00 2001 From: charly reux Date: Wed, 1 May 2024 10:30:13 +0200 Subject: [PATCH 1/2] Add tests for the propagation of vulnerabilities within git repositories --- osv/impact_git_test.py | 488 ++++++++++++++++++++++++++++++ osv/test_tools/test_repository.py | 158 ++++++++++ osv/testdata/.gitignore | 3 +- run_tests.sh | 1 + 4 files changed, 649 insertions(+), 1 deletion(-) create mode 100644 osv/impact_git_test.py create mode 100644 osv/test_tools/test_repository.py diff --git a/osv/impact_git_test.py b/osv/impact_git_test.py new file mode 100644 index 00000000000..7fefc5a52ae --- /dev/null +++ b/osv/impact_git_test.py @@ -0,0 +1,488 @@ +"""impact_git_test.py: Tests for the impact module using git repositories.""" + +from .test_tools.test_repository import TestRepository +import unittest +from . import impact + + +class GitImpactTest(unittest.TestCase): + """Tests for the impact module using git repositories.""" + + @classmethod + def setUpClass(cls): + cls.__repo_analyzer = impact.RepoAnalyzer(detect_cherrypicks=False) + + ######## 1rst : tests with only "introduced" and "fixed" + + def test_introduced_fixed_linear(self): + """Simple range, only two commits are vulnerable. """ + + repo = TestRepository("test_introduced_fixed_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_branch_propagation(self): + """Ensures the detection of the propagation + of the vulnerability in created branches""" + repo = TestRepository( + "test_introduced_fixed_branch_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + fourth = repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_merge(self): + """Ensures that a merge without a fix does not + affect the propagation of a vulnerability""" + repo = TestRepository("test_introduced_fixed_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_two_linear(self): + """Ensures that multiple introduced commit + in the same branch are correctly handled""" + repo = TestRepository("test_introduced_fixed_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_merge_propagation(self): + """Ensures that a vulnerability is propagated from + a branch, in spite of the main branch having a fix.""" + + repo = TestRepository( + "test_introduced_fixed_merge_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + fourth = repo.add_empty_commit(parents=[second, third]) + repo.add_empty_commit( + parents=[fourth], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_fix_propagation(self): + """Ensures that a fix gets propagated, in the case of a merge""" + repo = TestRepository("test_introduced_fixed_fix_propagation") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 2nd : tests with "introduced" and "limit" + + def test_introduced_limit_linear(self): + """Ensures the basic behavior of limit + (the limit commit is considered unaffected).""" + repo = TestRepository("test_intoduced_limit_linear") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_branch(self): + """Ensures that a limit commit does limit the vulnerability to a branch.""" + repo = TestRepository("test_intoduced_limit_branch") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([ + first.hex, + second.hex, + ]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_merge(self): + """Ensures that a merge without a fix does + not affect the propagation of a vulnerability.""" + repo = TestRepository("test_intoduced_limit_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_two_linear(self): + """Ensures that multiple introduced commit in + the same branch are correctly handled, wrt limit.""" + repo = TestRepository("test_introduced_limit_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 2nd : tests with "introduced" and "last-affected" + + def test_introduced_last_affected_linear(self): + """Ensures the basic behavior of last_affected + commits (the las_affected commit is considered affected).""" + repo = TestRepository("test_introduced_last_affected_linear") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_branch_propagation(self): + """Ensures that vulnerabilities are propagated to branches""" + repo = TestRepository( + "test_introduced_last_affected_branch_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + fourth = repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_merge(self): + """Ensures that a merge without a fix does + not affect the propagation of a vulnerability.""" + repo = TestRepository("test_introduced_last_affected_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + fourth = repo.add_empty_commit( + parents=[third], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_two_linear(self): + """Ensures that multiple introduced commit in + the same branch are correctly handled, wrt last_affected.""" + repo = TestRepository( + "test_introduced_last_affected_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + fourth = repo.add_empty_commit( + parents=[third], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 3nd : tests with "introduced", "limit", and "fixed" + + def test_introduced_limit_fixed_linear_lf(self): + """Ensures the behaviors of limit and fixed commits are not conflicting.""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_fixed_linear_fl(self): + """Ensures the behaviors of limit and fixed commits are not conflicting""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_branch_limit(self): + """Ensures the behaviors of limit and fixed + commits are not conflicting, in the case of a branch created.""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) \ No newline at end of file diff --git a/osv/test_tools/test_repository.py b/osv/test_tools/test_repository.py new file mode 100644 index 00000000000..d12647ba41a --- /dev/null +++ b/osv/test_tools/test_repository.py @@ -0,0 +1,158 @@ +"""test_repository""" +import pygit2 +import json +from datetime import datetime +from enum import Enum +import os +import shutil +import uuid + + +class TestRepository: + """ Utilitary class to create a test repository for the git tests + """ + + class VulnerabilityType(Enum): + INTRODUCED = 1 + FIXED = 2 + LAST_AFFECTED = 3 + LIMIT = 4 + NONE = 5 + + _author = pygit2.Signature('John Smith', 'johnSmith@example.com') + _commiter = pygit2.Signature('John Smith', 'johnSmith@example.com') + + _initial_commit = None + + def __init__(self, name: str, debug: bool = False): + self.debug = debug + self.name = name + self.introduced = [] + self.fixed = [] + self.last_affected = [] + self.limit = [] + + if os.path.exists(f"osv/testdata/test_repositories/{name}"): + shutil.rmtree(f"osv/testdata/test_repositories/{name}") + self.repo = pygit2.init_repository( + f"osv/testdata/test_repositories/{name}", bare=False) + #empty initial commit usefull for the creation of the repository + tree = self.repo.TreeBuilder().write() + self._initial_commit = self.repo.create_commit('refs/heads/main', + self._author, self._commiter, + "message", tree, []) + self.create_branch(f"branch_{self._initial_commit.hex}", + self._initial_commit) + self.repo.references.create("refs/remotes/origin/main", + self._initial_commit) + + def create_branch(self, name: str, commit: pygit2.Oid): + self.repo.references.create(f'refs/heads/{name}', commit) + self.repo.references.create(f'refs/remotes/origin/{name}', commit) + + def add_empty_commit( + self, + parents: list[pygit2.Oid] = None, + vulnerability: VulnerabilityType = VulnerabilityType.NONE, + message: str = "Empty") -> pygit2.Oid: + """ + Adds a empty commit to the repository, tags it with the vulnerability + type and adds it to the vulnerability list if specified + """ + + tree = self.repo.TreeBuilder().write() + self._author = pygit2.Signature( + str(uuid.uuid1()), 'johnSmith@example.com' + ) #using a random uuid to avoid commits being the same + commit = None + + if not parents or len(parents) == 0: + self.repo.create_branch( + 'branch_temp', self.repo.revparse_single(self._initial_commit.hex)) + commit = self.repo.create_commit('refs/heads/branch_temp', self._author, + self._commiter, message, tree, + [self._initial_commit]) + + self.repo.branches.delete('branch_temp') + self.create_branch(f'branch_{commit.hex}', commit) + + else: + self.repo.create_branch('branch_temp', + self.repo.revparse_single(parents[0].hex)) + commit = self.repo.create_commit('refs/heads/branch_temp', self._author, + self._commiter, message, tree, parents) + self.repo.branches.delete('branch_temp') + self.create_branch(commit=commit, name=f'branch_{commit.hex}') + + self.repo.references.get('refs/remotes/{0}/{1}'.format( + "origin", "main")).set_target(commit) + self.repo.references.get('refs/heads/main').set_target(commit) + + if self.debug: + os.system("echo -------------------------------" + + "-----------------------------------") + os.system(f"git -C osv/testdata/test_repositories/{self.name}" + + " log --all --graph --decorate") + + #self.repo.branches.delete(created_branch.branch_name) + + match vulnerability: + case self.VulnerabilityType.INTRODUCED: + self.introduced.append(commit.hex) + case self.VulnerabilityType.FIXED: + self.fixed.append(commit.hex) + case self.VulnerabilityType.LAST_AFFECTED: + self.last_affected.append(commit.hex) + case self.VulnerabilityType.LIMIT: + self.limit.append(commit.hex) + case self.VulnerabilityType.NONE: + pass + case _: + raise ValueError("Invalid vulnerability type") + return commit + + def remove(self): + shutil.rmtree(f"osv/testdata/test_repositories/{self.name}/") + while os.path.exists( + f"osv/testdata/test_repositories/{self.name}/"): # check if it exists + pass + ##cleanup + self.introduced = [] + self.fixed = [] + self.last_affected = [] + self.limit = [] + + def get_ranges(self): + """ + return the ranges of the repository + """ + return (self.introduced, self.fixed, self.last_affected, self.limit) + + def print_commits(self): + """ prints the commits of the repository + """ + print(self.name) + commits = [] + for ref in self.repo.listall_reference_objects(): + print(ref.target) + for commit in self.repo.walk(ref.target, pygit2.GIT_SORT_TIME): + + current_commit = { + 'hash': + commit.hex, + 'message': + commit.message, + 'commit_date': + datetime.utcfromtimestamp(commit.commit_time + ).strftime('%Y-%m-%dT%H:%M:%SZ'), + 'author_name': + commit.author.name, + 'author_email': + commit.author.email, + 'parents': [c.hex for c in commit.parents], + } + if current_commit in commits: + break + commits.append(current_commit) + + print(json.dumps(commits, indent=2)) \ No newline at end of file diff --git a/osv/testdata/.gitignore b/osv/testdata/.gitignore index f7bc0fe8b7c..67746de30ce 100644 --- a/osv/testdata/.gitignore +++ b/osv/testdata/.gitignore @@ -1 +1,2 @@ -version_enum \ No newline at end of file +version_enum +test_repositories/** \ No newline at end of file diff --git a/run_tests.sh b/run_tests.sh index dd8af7aa902..dc8be557c4e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -6,6 +6,7 @@ python3 -m pipenv run python -m unittest osv.bug_test python3 -m pipenv run python -m unittest osv.purl_helpers_test python3 -m pipenv run python -m unittest osv.request_helper_test python3 -m pipenv run python -m unittest osv.semver_index_test +python3 -m pipenv run python -m unittest osv.impact_git_test python3 -m pipenv run python -m unittest osv.impact_test # Run all osv.ecosystems tests From 05f77f86e9caf21163e15e1ea8028a9468a781d6 Mon Sep 17 00:00:00 2001 From: RomainLefeuvre <33905216+RomainLefeuvre@users.noreply.github.com> Date: Thu, 16 May 2024 12:18:05 -0400 Subject: [PATCH 2/2] Refactor test_repository.py and impact_git_test.py (#1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Refactoring – `test_repository.py ` * Make the repo generation cleaner, remove unnecessary branch that were generated * Use protobuf type for events ## Refactoring - `impact_git_test.py` * Move to a template architecture * Remove inconsistent tests * Support cherry picking test Note : `impact.py ` has been modified to handle repository that do not support caching. We are still trying to find a solution to enable the cache for synthetic repository created with pygit2. --- osv/impact.py | 12 +- osv/impact_git_test.py | 735 ++++++++++++++---------------- osv/test_tools/test_repository.py | 297 ++++++++---- 3 files changed, 550 insertions(+), 494 deletions(-) diff --git a/osv/impact.py b/osv/impact.py index f5f0f28edff..b5ee6744767 100644 --- a/osv/impact.py +++ b/osv/impact.py @@ -302,12 +302,16 @@ def _get_equivalent_commit(self, # Ignore commits without parents and merge commits with multiple parents. if not commit.parents or len(commit.parents) > 1: continue - - patch_id = repo.cache.get(commit.id) - if not patch_id: + #Handle repositories that do not support cache + if hasattr(repo, 'cache'): + patch_id = repo.cache.get(commit.id) + if not patch_id: + diff = repo.diff(commit.parents[0], commit) + patch_id = diff.patchid + repo.cache[commit.id] = patch_id + else: diff = repo.diff(commit.parents[0], commit) patch_id = diff.patchid - repo.cache[commit.id] = patch_id if patch_id == target_patch_id: return str(commit.id) diff --git a/osv/impact_git_test.py b/osv/impact_git_test.py index 7fefc5a52ae..81ad984a284 100644 --- a/osv/impact_git_test.py +++ b/osv/impact_git_test.py @@ -1,6 +1,7 @@ """impact_git_test.py: Tests for the impact module using git repositories.""" from .test_tools.test_repository import TestRepository + import unittest from . import impact @@ -12,477 +13,431 @@ class GitImpactTest(unittest.TestCase): def setUpClass(cls): cls.__repo_analyzer = impact.RepoAnalyzer(detect_cherrypicks=False) - ######## 1rst : tests with only "introduced" and "fixed" - + ######## 1st : tests with "introduced" and "fixed" def test_introduced_fixed_linear(self): - """Simple range, only two commits are vulnerable. """ - - repo = TestRepository("test_introduced_fixed_linear", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + """Simple range, only two commits are vulnerable. + Model : A->B->C->D """ + events = {"B": "introduced", "D": "fixed"} + expected_vulnerable = {"B", "C"} + self.template_four_linear(events, expected_vulnerable, + "test_introduced_fixed_linear") - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) + ######## 2nd : tests with "introduced" and "limit" + def test_introduced_limit_linear(self): + """Ensures the basic behavior of limit + (the limit commit is considered unaffected). + Model : A->B->C->D """ + events = {"B": "introduced", "D": "limit"} + expected_vulnerable = {"B", "C"} + self.template_four_linear(events, expected_vulnerable, + "test_introduced_limit_linear") + + ######## 3nd : tests with "introduced" and "last-affected" + def test_introduced_last_affected_linear(self): + """Ensures the basic behavior of last_affected + commits (the last_affected commit is considered affected). + Model : A->B->C->D """ + events = {"B": "introduced", "D": "last_affected"} + expected_vulnerable = {"B", "C", "D"} + self.template_four_linear(events, expected_vulnerable, + "test_introduced_last_affected_linear") + + ######## 4nd : tests with "introduced", "limit", and "fixed" + def test_introduced_limit_fixed_linear_lf(self): + """Ensures the behaviors of limit and fixed commits are not conflicting. + Model : A->B->C->D """ + events = {"B": "introduced", "C": "limit", "D": "fixed"} + expected_vulnerable = {"B"} + self.template_four_linear(events, expected_vulnerable, + "test_introduced_limit_fixed_linear_lf") + + ######## 5nd : tests with "introduced", "limit", + # and "fixed" in a different order + def test_introduced_limit_fixed_linear_fl(self): + """Ensures the behaviors of limit and fixed commits are not conflicting. + Model : A->B->C->D """ + events = {"B": "introduced", "C": "fixed", "D": "limit"} + expected_vulnerable = {"B"} + self.template_four_linear(events, expected_vulnerable, + "test_introduced_limit_fixed_linear_fl") - expected = set([first.hex, second.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) +######## 6nd : branch tests with "introduced", and "fixed" def test_introduced_fixed_branch_propagation(self): - """Ensures the detection of the propagation - of the vulnerability in created branches""" - repo = TestRepository( - "test_introduced_fixed_branch_propagation", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) - fourth = repo.add_empty_commit(parents=[second]) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + """Simple range, checking the propagation of the + vulnerability in created branch. + Model : A->B->C->D + |->E""" + events = { + "B": "introduced", + "D": "fixed", + } + expected_vulnerable = {"B", "C", "E"} + self.template_five_last_branch(events, expected_vulnerable, + "test_introduced_fixed_branch_propagation") + +######## 7nd : branch tests with "introduced" and "limit" - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) + def test_introduced_limit_branch(self): + """ensures the basic behavior of limit commits in branches. + Model : A->B->C->D + |->E""" + events = { + "B": "introduced", + "D": "limit", + } + expected_vulnerable = {"B", "C"} + self.template_five_last_branch(events, expected_vulnerable, + "test_introduced_limit_branch") + +######## 8nd : branch tests with "introduced" and "last-affected" - expected = set([first.hex, second.hex, fourth.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) + def test_introduced_last_affected_branch_propagation(self): + """ensures the basic behavior of last_affected commits when + the repository has a branch. + Model : A->B->C->D + |->E""" + events = { + "B": "introduced", + "D": "last_affected", + } + expected_vulnerable = {"B", "C", "D", "E"} + self.template_five_last_branch( + events, expected_vulnerable, + "test_introduced_last_affected_branch_propagation") + +######## 9nd : merge tests with "introduced" and "fixed" def test_introduced_fixed_merge(self): - """Ensures that a merge without a fix does not - affect the propagation of a vulnerability""" - repo = TestRepository("test_introduced_fixed_merge", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit() - third = repo.add_empty_commit(parents=[first, second]) - repo.add_empty_commit( - parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + """ Simple range, checking the non propagation of the + vulnerability in the created branch . + Model : A ->B-> D->E + |->C-/^""" + events = {"B": "introduced", "E": "fixed"} + expected_vulnerable = {"B", "D"} + self.template_five_second_branch_merge(events, expected_vulnerable, + "test_introduced_fixed_merge") - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex, third.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) +######## 10nd : merge tests with "introduced" and "limit" - def test_introduced_fixed_two_linear(self): - """Ensures that multiple introduced commit - in the same branch are correctly handled""" - repo = TestRepository("test_introduced_fixed_two_linear", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) - third = repo.add_empty_commit( - parents=[second], - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - repo.add_empty_commit( - parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + def test_introduced_limit_merge(self): + """ Simple range, checking the non propagation of the + vulnerability in created branch with a limit commit. + Model : A ->B-> D->E + |->C-/^""" + events = {"B": "introduced", "E": "limit"} + expected_vulnerable = {"B", "D"} + self.template_five_second_branch_merge(events, expected_vulnerable, + "test_introduced_limit_merge") - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) +######## 11nd : merge tests with "introduced" and "last-affected" - expected = set([first.hex, third.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) + def test_introduced_last_affected_merge(self): + """ Simple range, checking the non propagation of the vulnerability + in the created branch with a last-affected commit. + Model : A ->B-> D->E + |->C-/^""" + events = {"B": "introduced", "E": "last_affected"} + expected_vulnerable = {"B", "D", "E"} + self.template_five_second_branch_merge( + events, expected_vulnerable, "test_introduced_last_affected_merge") + +######## 12nd : merge tests with "introduced", and two "fixed", +# one in the created branch and one in the main branch + + def test_introduced_fixed_merge_fix_propagation(self): + """ Srange with two fixed, checking the propagation of the fix + from the created branch to the main branch. + Model : A ->B-> D->E + |->C-/^""" + events = {"B": "introduced", "C": "fixed"} + expected_vulnerable = {"B"} + self.template_five_second_branch_merge( + events, expected_vulnerable, + "test_introduced_fixed_merge_fix_propagation") + +######## 13nd : linear tests with two "introduced" and two "fixed" intercalated - def test_introduced_fixed_merge_propagation(self): - """Ensures that a vulnerability is propagated from - a branch, in spite of the main branch having a fix.""" - - repo = TestRepository( - "test_introduced_fixed_merge_propagation", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) - third = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - fourth = repo.add_empty_commit(parents=[second, third]) - repo.add_empty_commit( - parents=[fourth], vulnerability=TestRepository.VulnerabilityType.FIXED) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + def test_introduced_fixed_two_linear(self): + """ Srange with two fixed, checking the non propagation of the + fix from the created branch to the main branch. + Model : A->B->C->D->E """ + events = {"B": "introduced", "C": "fixed", "D": "introduced", "E": "fixed"} + expected_vulnerable = {"B", "D"} + self.template_five_linear(events, expected_vulnerable, + "test_introduced_fixed_two_linear") - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) +######## 14nd : linear tests with two "introduced" and two "limit" intercalated - expected = set([first.hex, third.hex, fourth.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) + def test_introduced_limit_two_linear(self): + """ Srange with two fixed, checking the non propagation of the + fix from the created branch to the main branch. + Model : A->B->C->D->E """ + events = {"B": "introduced", "C": "limit", "D": "introduced", "E": "limit"} + expected_vulnerable = {"B", "D"} + self.template_five_linear(events, expected_vulnerable, + "test_introduced_limit_two_linear") - def test_introduced_fixed_fix_propagation(self): - """Ensures that a fix gets propagated, in the case of a merge""" - repo = TestRepository("test_introduced_fixed_fix_propagation") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.FIXED) - third = repo.add_empty_commit(parents=[first, second]) - repo.add_empty_commit( - parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() +######## 15nd : linear tests with two "introduced" and +# two "last_affected" intercalated - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) + def test_introduced_last_affected_two_linear(self): + """ Srange with two fixed, checking the non propagation of the + fix from the created branch to the main branch. + Model : A->B->C->D->E """ + events = { + "B": "introduced", + "C": "last_affected", + "D": "introduced", + "E": "last_affected" + } + expected_vulnerable = {"C", "E", "B", "D"} + self.template_five_linear(events, expected_vulnerable, + "test_introduced_last_affected_two_linear") + + +######## 16nd : testing the behavior of limit with a branch - ######## 2nd : tests with "introduced" and "limit" + def test_introduced_limit_branch_limit(self): + """ range with. + Model : A ->B-> C->E + |-> D""" + events = {"B": "introduced", "D": "limit", "E": "fixed"} + expected_vulnerable = {"B"} + self.template_five_third_branch(events, expected_vulnerable, + "test_introduced_limit_branch_limit") + + ###### Utility Template methods + def template_four_linear(self, events: dict, expected, name): + """Linear template with 4 commits + A->B->C->D """ + repo = TestRepository(name, debug=False) + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.add_commit( + message="D", + parents=[repo.get_head_hex()], + event_type=events.get("D", None)) + repo.create_remote_branch() - def test_introduced_limit_linear(self): - """Ensures the basic behavior of limit - (the limit commit is considered unaffected).""" - repo = TestRepository("test_intoduced_limit_linear") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex, second.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - def test_introduced_limit_branch(self): - """Ensures that a limit commit does limit the vulnerability to a branch.""" - repo = TestRepository("test_intoduced_limit_branch") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) - repo.add_empty_commit(parents=[second]) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) + def template_five_linear(self, events, expected, name): + """Linear template with 5 commits + A->B->C->D """ + repo = TestRepository(name, debug=False) + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.add_commit( + message="D", + parents=[repo.get_head_hex()], + event_type=events.get("D", None)) + repo.add_commit( + message="E", + parents=[repo.get_head_hex()], + event_type=events.get("E", None)) + + repo.create_remote_branch() - expected = set([ - first.hex, - second.hex, - ]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) - - def test_introduced_limit_merge(self): - """Ensures that a merge without a fix does - not affect the propagation of a vulnerability.""" - repo = TestRepository("test_intoduced_limit_merge", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit() - third = repo.add_empty_commit(parents=[first, second]) - repo.add_empty_commit( - parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex, third.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) - - def test_introduced_limit_two_linear(self): - """Ensures that multiple introduced commit in - the same branch are correctly handled, wrt limit.""" - repo = TestRepository("test_introduced_limit_two_linear", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) - third = repo.add_empty_commit( - parents=[second], - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - repo.add_empty_commit( - parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex, third.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - ######## 2nd : tests with "introduced" and "last-affected" + def template_five_last_branch(self, events, expected, name): + """Template with 5 commits, the last one in a different branch + + A->B->C->D + |->E """ + repo = TestRepository(name, debug=False) + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + c = repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.create_branch_if_needed_and_checkout("feature") + repo.add_commit(message="E", parents=[c], event_type=events.get("E", None)) + repo.checkout("main") + repo.add_commit( + message="D", + parents=[repo.get_head_hex()], + event_type=events.get("D", None)) + repo.create_remote_branch() - def test_introduced_last_affected_linear(self): - """Ensures the basic behavior of last_affected - commits (the las_affected commit is considered affected).""" - repo = TestRepository("test_introduced_last_affected_linear") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - third = repo.add_empty_commit( - parents=[second], - vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, - ) (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex, second.hex, third.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) - - def test_introduced_last_affected_branch_propagation(self): - """Ensures that vulnerabilities are propagated to branches""" - repo = TestRepository( - "test_introduced_last_affected_branch_propagation", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit(parents=[first]) - third = repo.add_empty_commit( - parents=[second], - vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - fourth = repo.add_empty_commit(parents=[second]) - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex, second.hex, third.hex, fourth.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) + def template_five_second_branch_merge(self, events, expected, name): + """Template with 5 commits, the second one in a different + branch and merged right after + + A->B->D->E + |->C-/^ """ + repo = TestRepository(name, debug=False) + repo.create_branch_if_needed_and_checkout("feature") + c = repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.checkout("main") + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + repo.merge(message="D", commit=c, event_type=events.get("D", None)) + repo.add_commit( + message="E", + parents=[repo.get_head_hex()], + event_type=events.get("E", None)) + repo.create_remote_branch() - def test_introduced_last_affected_merge(self): - """Ensures that a merge without a fix does - not affect the propagation of a vulnerability.""" - repo = TestRepository("test_introduced_last_affected_merge", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit() - third = repo.add_empty_commit(parents=[first, second]) - fourth = repo.add_empty_commit( - parents=[third], - vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, - ) (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex, third.hex, fourth.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - def test_introduced_last_affected_two_linear(self): - """Ensures that multiple introduced commit in - the same branch are correctly handled, wrt last_affected.""" - repo = TestRepository( - "test_introduced_last_affected_two_linear", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], - vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, - ) - third = repo.add_empty_commit( - parents=[second], - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - fourth = repo.add_empty_commit( - parents=[third], - vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, - ) + def template_six_second_branch_merge(self, events, expected, name): + """Template with 6 commits, the second one in a different branch and + merged after two commits in the main branch + + A->B->C->E->F + |-> D -/^ """ + repo = TestRepository(name, debug=False) + repo.create_branch_if_needed_and_checkout("feature") + d = repo.add_commit( + message="D", + parents=[repo.get_head_hex()], + event_type=events.get("D", None)) + repo.checkout("main") + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.merge(message="E", commit=d, event_type=events.get("E", None)) + repo.add_commit( + message="F", + parents=[repo.get_head_hex()], + event_type=events.get("F", None)) + + repo.create_remote_branch() (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex, second.hex, third.hex, fourth.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - ######## 3nd : tests with "introduced", "limit", and "fixed" - - def test_introduced_limit_fixed_linear_lf(self): - """Ensures the behaviors of limit and fixed commits are not conflicting.""" - repo = TestRepository("test_introduced_limit_fixed_linear_lf") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) - + def template_five_third_branch(self, events, expected, name): + """Template with 5 commits, the third one in a different branch, not merged + + A->B->C->E + |->D""" + repo = TestRepository(name, debug=False) + repo.add_commit( + message="B", + parents=[repo.get_head_hex()], + event_type=events.get("B", None)) + repo.create_branch_if_needed_and_checkout("feature") + repo.add_commit( + message="D", + parents=[repo.get_head_hex()], + event_type=events.get("D", None)) + repo.checkout("main") + repo.add_commit( + message="C", + parents=[repo.get_head_hex()], + event_type=events.get("C", None)) + repo.add_commit( + message="E", + parents=[repo.get_head_hex()], + event_type=events.get("E", None)) + + repo.create_remote_branch() (all_introduced, all_fixed, all_last_affected, all_limit) = repo.get_ranges() + expected_commits = repo.get_commits_ids(expected) result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, all_fixed, all_limit, all_last_affected) - - expected = set([first.hex]) - repo.remove() + result_commit_message = repo.get_message_by_commits_id(result.commits) + repo.clean() self.assertEqual( result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), + expected_commits, + "Expected: %s, got: %s" % (expected, result_commit_message), ) - - def test_introduced_limit_fixed_linear_fl(self): - """Ensures the behaviors of limit and fixed commits are not conflicting""" - repo = TestRepository("test_introduced_limit_fixed_linear_lf") - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) - - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) - - def test_introduced_limit_branch_limit(self): - """Ensures the behaviors of limit and fixed - commits are not conflicting, in the case of a branch created.""" - repo = TestRepository("test_introduced_limit_fixed_linear_lf", debug=False) - - first = repo.add_empty_commit( - vulnerability=TestRepository.VulnerabilityType.INTRODUCED) - second = repo.add_empty_commit( - parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) - repo.add_empty_commit(parents=[first]) - repo.add_empty_commit( - parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) - - (all_introduced, all_fixed, all_last_affected, - all_limit) = repo.get_ranges() - result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, - all_fixed, all_limit, - all_last_affected) - - expected = set([first.hex]) - repo.remove() - self.assertEqual( - result.commits, - expected, - "Expected: %s, got: %s" % (expected, result.commits), - ) \ No newline at end of file diff --git a/osv/test_tools/test_repository.py b/osv/test_tools/test_repository.py index d12647ba41a..3e8915f5904 100644 --- a/osv/test_tools/test_repository.py +++ b/osv/test_tools/test_repository.py @@ -1,121 +1,215 @@ -"""test_repository""" +""" Utility class to create a test repository for the git tests + +This module contains a class that creates a test repository for the git tests +It can be used to create a test repository and add commits tagged with different +vulnerability types. + +usage: + repo = TestRepository("test_introduced_fixed_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) +""" import pygit2 import json from datetime import datetime -from enum import Enum import os import shutil import uuid +import logging +from osv import vulnerability_pb2 -class TestRepository: - """ Utilitary class to create a test repository for the git tests +class CommitsInfo: + """Internal class to store the commits information """ - class VulnerabilityType(Enum): - INTRODUCED = 1 - FIXED = 2 - LAST_AFFECTED = 3 - LIMIT = 4 - NONE = 5 + class Messages: + """Single commit information + """ + + def __init__(self): + self._commit_to_message: dict[str, str] = {} + self._message_to_commit: dict[str, str] = {} + + def add_commit(self, commit_id, commit_message): + self._commit_to_message[commit_id] = commit_message + self._message_to_commit[commit_message] = commit_id + + def get_message(self, commit_id): + return self._commit_to_message.get(commit_id) + + def get_commit_id(self, commit_message): + return self._message_to_commit.get(commit_message) + + def get_commits_ids(self, commit_messages): + commit_ids = set() + for commit_message in commit_messages: + commit_id = self.get_commit_id(commit_message) + if commit_id is not None: + commit_ids.add(commit_id) + return commit_ids + + def get_messages(self, commits_id): + commit_messages = set() + for commit_id in commits_id: + commit_message = self.get_message(commit_id) + if commit_message is not None: + commit_messages.add(commit_message) + return commit_messages + + def existing_message(self, message): + return message in self._message_to_commit + + def __init__(self): + self.messages: CommitsInfo.Messages = CommitsInfo.Messages() + self._events: list[vulnerability_pb2.Event] = [] + + def add_commit(self, commit_id, commit_message, event_type: str = None): + """Adds a commit to the repository + + Args: + commit_id (str): The id of the commit + commit_message (str): The message of the commit + event_type (str, optional): the type of the event. Defaults to None. + + Raises: + ValueError: In the case of an invalid vulnerability type + """ + if not self.messages.existing_message(commit_message): + if event_type: + keys = vulnerability_pb2.Event.DESCRIPTOR.fields_by_name.keys() + if event_type not in keys: + raise ValueError("Invalid vulnerability type") + self._events.append(vulnerability_pb2.Event(**{event_type: commit_id})) + self.messages.add_commit(commit_id, commit_message) + else: + raise ValueError("Commit message already exists") + + def get_ranges(self): + """get the ranges of the repository, + each range containing the corresponding ids + + Raises: + ValueError: In the case of an invalid vulnerability type + + Returns: + tuple : a tuple containing the introduced, fixed, + last_affected and limit commits + """ + introduced = [] + fixed = [] + last_affected = [] + limit = [] + for event in self._events: + if event.introduced and event.introduced != '0': + introduced.append(event.introduced) + continue + + if event.last_affected: + last_affected.append(event.last_affected) + continue + + if event.fixed: + fixed.append(event.fixed) + continue + if event.limit: + limit.append(event.limit) + continue + return (introduced, fixed, last_affected, limit) + + +class TestRepository: + """ Utility class to create a test repository for the git tests + """ _author = pygit2.Signature('John Smith', 'johnSmith@example.com') _commiter = pygit2.Signature('John Smith', 'johnSmith@example.com') - _initial_commit = None - def __init__(self, name: str, debug: bool = False): + self.repo_path = f"osv/testdata/test_repositories/{name}" self.debug = debug self.name = name - self.introduced = [] - self.fixed = [] - self.last_affected = [] - self.limit = [] - - if os.path.exists(f"osv/testdata/test_repositories/{name}"): - shutil.rmtree(f"osv/testdata/test_repositories/{name}") - self.repo = pygit2.init_repository( - f"osv/testdata/test_repositories/{name}", bare=False) - #empty initial commit usefull for the creation of the repository - tree = self.repo.TreeBuilder().write() - self._initial_commit = self.repo.create_commit('refs/heads/main', - self._author, self._commiter, - "message", tree, []) - self.create_branch(f"branch_{self._initial_commit.hex}", - self._initial_commit) - self.repo.references.create("refs/remotes/origin/main", - self._initial_commit) - - def create_branch(self, name: str, commit: pygit2.Oid): - self.repo.references.create(f'refs/heads/{name}', commit) - self.repo.references.create(f'refs/remotes/origin/{name}', commit) - - def add_empty_commit( - self, - parents: list[pygit2.Oid] = None, - vulnerability: VulnerabilityType = VulnerabilityType.NONE, - message: str = "Empty") -> pygit2.Oid: - """ - Adds a empty commit to the repository, tags it with the vulnerability - type and adds it to the vulnerability list if specified + self.commits_info = CommitsInfo() + + #delete the repository if it already exists + if os.path.exists(self.repo_path): + self.clean() + #initialize the repository + self.repo: pygit2._pygit2.Repository = pygit2.init_repository( + self.repo_path, bare=False) + #create an initial commit + parent = [] + self.add_commit(message="A", parents=parent) + + def merge(self, message, commit, event_type: str = None): + """merge a commit into the repository + + Args: + commit (str): the hex of the commit to be merged + event_type (str, optional): the event associated with the commit. + Defaults to None. """ + self.repo.merge(commit) + self.add_commit(message, [self.get_head_hex(), commit], event_type) - tree = self.repo.TreeBuilder().write() - self._author = pygit2.Signature( - str(uuid.uuid1()), 'johnSmith@example.com' - ) #using a random uuid to avoid commits being the same - commit = None + def get_commits_ids(self, commit_messages): + return self.commits_info.messages.get_commits_ids(commit_messages) - if not parents or len(parents) == 0: - self.repo.create_branch( - 'branch_temp', self.repo.revparse_single(self._initial_commit.hex)) - commit = self.repo.create_commit('refs/heads/branch_temp', self._author, - self._commiter, message, tree, - [self._initial_commit]) + def add_commit(self, message, parents=None, event_type: str = None): + """Add a commit to the repository - self.repo.branches.delete('branch_temp') - self.create_branch(f'branch_{commit.hex}', commit) + Args: + message (str): the message of the commit + parents (List(str), optional): the list of parents + of the current repository . Defaults to None. + event (str, optional): the type of event corresponding + to the commit. Defaults to None. - else: - self.repo.create_branch('branch_temp', - self.repo.revparse_single(parents[0].hex)) - commit = self.repo.create_commit('refs/heads/branch_temp', self._author, - self._commiter, message, tree, parents) - self.repo.branches.delete('branch_temp') - self.create_branch(commit=commit, name=f'branch_{commit.hex}') - - self.repo.references.get('refs/remotes/{0}/{1}'.format( - "origin", "main")).set_target(commit) - self.repo.references.get('refs/heads/main').set_target(commit) - - if self.debug: - os.system("echo -------------------------------" + - "-----------------------------------") - os.system(f"git -C osv/testdata/test_repositories/{self.name}" + - " log --all --graph --decorate") - - #self.repo.branches.delete(created_branch.branch_name) - - match vulnerability: - case self.VulnerabilityType.INTRODUCED: - self.introduced.append(commit.hex) - case self.VulnerabilityType.FIXED: - self.fixed.append(commit.hex) - case self.VulnerabilityType.LAST_AFFECTED: - self.last_affected.append(commit.hex) - case self.VulnerabilityType.LIMIT: - self.limit.append(commit.hex) - case self.VulnerabilityType.NONE: - pass - case _: - raise ValueError("Invalid vulnerability type") - return commit - - def remove(self): - shutil.rmtree(f"osv/testdata/test_repositories/{self.name}/") - while os.path.exists( - f"osv/testdata/test_repositories/{self.name}/"): # check if it exists - pass + Returns: + str: the hex id of the commit + """ + if parents is None: + parents = [self.get_head_hex()] + random_str = str(uuid.uuid1()) + with open(f"{self.repo_path}/{ random_str}", "w") as f: + f.write(random_str) + index = self.repo.index + index.add_all() + tree = index.write_tree() + index.write() + commit_hex = self.repo.create_commit('HEAD', self._author, self._commiter, + message, tree, parents).hex + self.commits_info.add_commit(commit_hex, message, event_type) + return commit_hex + + def get_head_hex(self): + return self.get_head().hex + + def get_head(self): + return self.repo.revparse_single('HEAD') + + def checkout(self, branchname): + branch = self.repo.lookup_branch(branchname) + ref = self.repo.lookup_reference(branch.name) + self.repo.checkout(ref) + + def create_branch_if_needed_and_checkout(self, branchname): + if not self.repo.branches.get(branchname): + self.repo.create_branch(branchname, self.get_head()) + self.checkout(branchname) + + def create_remote_branch(self): + for branch_name in self.repo.branches: + branch = self.repo.branches.get(branch_name) + self.repo.references.create(f'refs/remotes/origin/{branch_name}', + branch.raw_target) + + def clean(self): + shutil.rmtree(self.repo_path) ##cleanup self.introduced = [] self.fixed = [] @@ -126,15 +220,18 @@ def get_ranges(self): """ return the ranges of the repository """ - return (self.introduced, self.fixed, self.last_affected, self.limit) + return self.commits_info.get_ranges() + + def get_message_by_commits_id(self, commits_id): + return self.commits_info.messages.get_messages(commits_id) def print_commits(self): """ prints the commits of the repository """ - print(self.name) + logging.debug(self.name) commits = [] for ref in self.repo.listall_reference_objects(): - print(ref.target) + logging.debug(ref.target) for commit in self.repo.walk(ref.target, pygit2.GIT_SORT_TIME): current_commit = { @@ -155,4 +252,4 @@ def print_commits(self): break commits.append(current_commit) - print(json.dumps(commits, indent=2)) \ No newline at end of file + logging.debug(json.dumps(commits, indent=2))