-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(frontend): Upstream and Downstream hierarchy display on frontend #3208
base: master
Are you sure you want to change the base?
Changes from all commits
e9971ad
3c1e7e7
d31ddb0
1b5b526
983f26f
1fe98e3
b123bf5
2850e0d
4dabe6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
from flask import send_from_directory | ||
from werkzeug.security import safe_join | ||
from werkzeug import exceptions | ||
from collections import OrderedDict | ||
from google.cloud import ndb | ||
from cvss import CVSS2, CVSS3, CVSS4 | ||
|
||
|
@@ -302,9 +303,26 @@ def bug_to_response(bug, detailed=True): | |
if detailed: | ||
add_links(response) | ||
add_source_info(bug, response) | ||
add_stream_info(bug, response) | ||
|
||
return response | ||
|
||
|
||
def add_stream_info(bug, response): | ||
"""Add upstream hierarchy information to `response`.""" | ||
# Check whether there are upstreams | ||
if bug.upstream_raw: | ||
upstream_hierarchy_string = get_upstreams_of_vulnerability( | ||
bug.db_id, response['upstream']) | ||
response['upstream_hierarchy'] = upstream_hierarchy_string | ||
# Check whether there are downstreams | ||
downstreams = _get_downstreams_of_bug_query(bug.db_id) | ||
if downstreams: | ||
downstream_hierarchy_string = compute_downstream_hierarchy( | ||
bug.db_id, downstreams) | ||
response['downstream_hierarchy'] = downstream_hierarchy_string | ||
|
||
|
||
def calculate_severity_details( | ||
severity: dict) -> tuple[float | None, str | None]: | ||
"""Calculate score and rating of severity""" | ||
|
@@ -786,3 +804,250 @@ def cvss_calculator_url(severity): | |
def relative_time(timestamp: str) -> str: | ||
"""Convert the input to a human-readable relative time.""" | ||
return utils.relative_time(timestamp) | ||
|
||
|
||
def construct_hierarchy_string(target_bug_id: str, root_nodes: set[str], | ||
graph: dict[str, set[str]]) -> str: | ||
"""Constructs a hierarchy string for display. | ||
|
||
Args: | ||
target_bug_id: The ID of the target bug. | ||
root_nodes: a list of root_nodes | ||
graph: A dictionary representing the tree to build. | ||
|
||
Returns: | ||
A string representing the hierarchy for display by the frontend. | ||
""" | ||
output_lines = [] | ||
|
||
def print_subtree(vuln_id: str) -> None: | ||
""" | ||
Recursively formats the subtree starting from the given vuln_id. | ||
|
||
Args: | ||
vuln_id (str): The starting vuln_id for printing the subtree. | ||
""" | ||
if vuln_id != target_bug_id: | ||
if osv_has_vuln(vuln_id): | ||
output_lines.append("<li><a href=\"/vulnerability/" + vuln_id + "\">" + | ||
vuln_id + " </a></li>") | ||
else: | ||
output_lines.append("<li>" + vuln_id + "</li>") | ||
|
||
if vuln_id in graph: | ||
for child in graph[vuln_id]: | ||
if child != target_bug_id: | ||
output_lines.append("<ul>") | ||
print_subtree(child) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this handle cycles? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just added a change that will prevent the cycle from displaying, and will log an error, but won't prevent the whole vuln from being displayed. |
||
output_lines.append("</ul>") | ||
|
||
for root in root_nodes: | ||
output_lines.append("<ul class=\"aliases\">") | ||
print_subtree(root) | ||
output_lines.append("</ul>") | ||
|
||
final_string = "".join(output_lines) | ||
return final_string | ||
|
||
|
||
def get_upstreams_of_vulnerability(target_bug_id: str, | ||
transitive_upstreams: list[str]) -> str: | ||
"""Gets the upstream hierarchy of a vulnerability. | ||
|
||
Args: | ||
target_bug_id: The ID of the target bug. | ||
|
||
Returns: | ||
A string representing the upstream hierarchy for display by | ||
the frontend. | ||
""" | ||
|
||
bugs_group_dict = {b_id: [] for b_id in transitive_upstreams} | ||
bug_groups_keys = [ | ||
ndb.Key(osv.UpstreamGroup, id) for id in transitive_upstreams | ||
] | ||
bug_groups_upstream = ndb.get_multi(bug_groups_keys) | ||
if bug_groups_upstream is None: | ||
return None | ||
for bug in bug_groups_upstream: | ||
if bug is not None: | ||
bugs_group_dict[bug.db_id] = bug.upstream_ids | ||
|
||
bugs_group_dict[target_bug_id] = transitive_upstreams | ||
|
||
upstream_hierarchy = _compute_upstream_hierarchy(target_bug_id, | ||
bugs_group_dict) | ||
reversed_graph = reverse_tree(upstream_hierarchy) | ||
if has_cycle(reversed_graph): | ||
logging.error("Cycle detected in upstream hierarchy for %s", target_bug_id) | ||
|
||
return None | ||
|
||
all_children = set() | ||
for children in upstream_hierarchy.values(): | ||
all_children.update(children) | ||
|
||
root_nodes = set(all_children - set(upstream_hierarchy.keys())) | ||
|
||
upstream_hierarchy_string = construct_hierarchy_string( | ||
target_bug_id, root_nodes, reversed_graph) | ||
return upstream_hierarchy_string | ||
|
||
|
||
def _compute_upstream_hierarchy( | ||
target_bug_id: str, bug_groups: dict[str, | ||
list[str]]) -> dict[str, set[str]]: | ||
"""Computes all upstream vulnerabilities for the given bug ID. | ||
The returned list contains all of the bug IDs that are upstream of the | ||
target bug ID, including transitive upstreams in a map hierarchy. | ||
bug_group: | ||
{ db_id: bug id | ||
upstream_ids: str[bug_ids] | ||
last_modified_date} | ||
""" | ||
visited = set() | ||
upstream_map = {} | ||
to_visit = set([target_bug_id]) | ||
while to_visit: | ||
bug_id = to_visit.pop() | ||
if bug_id in visited: | ||
continue | ||
visited.add(bug_id) | ||
upstreams = set(bug_groups.get(bug_id, [])) | ||
if not upstreams: | ||
continue | ||
for upstream in upstreams: | ||
if upstream not in visited and upstream not in to_visit: | ||
to_visit.add(upstream) | ||
else: | ||
if bug_id not in upstream_map: | ||
upstream_map[bug_id] = set([upstream]) | ||
else: | ||
upstream_map[bug_id].add(upstream) | ||
upstream_map[bug_id] = upstreams | ||
to_visit.update(upstreams - visited) | ||
for k, v in upstream_map.items(): | ||
if k is target_bug_id: | ||
continue | ||
upstream_map[target_bug_id] = upstream_map[target_bug_id] - v | ||
return upstream_map | ||
|
||
|
||
def _get_downstreams_of_bug_query(bug_id: str) -> dict[str, list[str]]: | ||
"""Returns a list of all downstream bugs of the given bug ID by querying the | ||
database.""" | ||
downstreams = {} | ||
for bug in osv.UpstreamGroup.query(osv.UpstreamGroup.upstream_ids == bug_id): | ||
downstreams[bug.db_id] = bug.upstream_ids | ||
return downstreams | ||
|
||
|
||
def _get_downstreams_of_bug(bug_id: str, bugs: dict[str, | ||
list[str]]) -> list[str]: | ||
"""Returns a list of all downstream bugs of the given bug ID | ||
given a list of bugs.""" | ||
downstreams = [] | ||
for bug in bugs: | ||
if bug_id in bugs[bug]: | ||
downstreams.append(bug) | ||
return downstreams | ||
|
||
|
||
def compute_downstream_hierarchy(target_bug_id: str, | ||
downstreams: dict[str, list[str]]) -> str: | ||
"""Computes the hierarchy of all downstream vulnerabilities for the given | ||
bug ID. Returns a constructed string of the downstreams formatted for the | ||
frontend | ||
""" | ||
|
||
downstream_map: dict[str, set[str]] = {} | ||
transitive_downstreams = downstreams | ||
|
||
# Sort downstreams by number of upstreams | ||
transitive_downstreams = OrderedDict( | ||
sorted(transitive_downstreams.items(), key=lambda item: len(item[1]))) | ||
|
||
leaf_bugs: set[str] = set() | ||
|
||
for bug_id, _ in transitive_downstreams.items(): | ||
immediate_downstreams = _get_downstreams_of_bug(bug_id, | ||
transitive_downstreams) | ||
if not immediate_downstreams: | ||
leaf_bugs.add(bug_id) | ||
else: | ||
downstream_map[bug_id] = set(immediate_downstreams) | ||
|
||
root_leaves = leaf_bugs.copy() | ||
for bug_id, downstream_bugs in downstream_map.items(): | ||
for leaf in leaf_bugs: | ||
if leaf in downstream_bugs: | ||
root_leaves.discard(leaf) | ||
root_leaves.add(bug_id) | ||
|
||
downstream_map[target_bug_id] = root_leaves | ||
|
||
hierarchy_string = construct_hierarchy_string(target_bug_id, root_leaves, | ||
downstream_map) | ||
return hierarchy_string | ||
|
||
|
||
def reverse_tree(graph: dict[str, set[str]]) -> dict[str, set[str]]: | ||
""" | ||
Reverses a graph represented as a dictionary | ||
""" | ||
|
||
reversed_graph = {} | ||
for node, children in graph.items(): | ||
for child in children: | ||
if child not in reversed_graph: | ||
reversed_graph[child] = set() | ||
reversed_graph[child].add(node) | ||
|
||
return reversed_graph | ||
|
||
|
||
def has_cycle(graph: dict[str, set[str]]) -> bool: | ||
""" | ||
Determines whether there are any cycles in a directed graph represented | ||
as an adjacency list. | ||
|
||
Args: | ||
graph: A dictionary representing the graph, where keys are nodes and | ||
values are sets of their neighbors. | ||
|
||
Returns: | ||
True if the graph contains a cycle, False otherwise. | ||
""" | ||
|
||
visited = set() | ||
recursion_stack = set() | ||
|
||
def dfs(node: str) -> bool: | ||
""" | ||
Performs Depth-First Search to detect cycles. | ||
|
||
Args: | ||
node: The current node being visited. | ||
|
||
Returns: | ||
True if a cycle is detected, False otherwise. | ||
""" | ||
visited.add(node) | ||
recursion_stack.add(node) | ||
|
||
for neighbor in graph.get(node, set()): | ||
if neighbor in recursion_stack: | ||
return True # Cycle detected | ||
if neighbor not in visited: | ||
if dfs(neighbor): | ||
return True | ||
|
||
recursion_stack.remove(node) | ||
return False | ||
|
||
for node in graph: | ||
if node not in visited: | ||
if dfs(node): | ||
return True | ||
|
||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how long does this take, relative to the other add info functions?