diff --git a/gcp/website/frontend3/src/templates/vulnerability.html b/gcp/website/frontend3/src/templates/vulnerability.html index bd528c82f9f..30a4ae9e919 100644 --- a/gcp/website/frontend3/src/templates/vulnerability.html +++ b/gcp/website/frontend3/src/templates/vulnerability.html @@ -90,6 +90,14 @@

{{ vulnerability.published }}
Modified
{{ vulnerability.modified }}
+ {%- if vulnerability.upstream_hierarchy -%} +
Upstream
+
{{ vulnerability.upstream_hierarchy | safe }}
+ {%- endif -%} + {%- if vulnerability.downstream_hierarchy -%} +
Downstream
+
{{ vulnerability.downstream_hierarchy | safe }}
+ {%- endif -%} {%- if vulnerability.severity -%}
Severity
@@ -115,7 +123,7 @@

Details
- {{ vulnerability.details|markdown|safe -}} + {{ vulnerability.details | markdown | safe -}}
{% if vulnerability.database_specific -%}
@@ -387,4 +395,4 @@

}); }); -{% endblock -%} +{% endblock -%} \ No newline at end of file diff --git a/gcp/website/frontend_handlers.py b/gcp/website/frontend_handlers.py index cba784a57c4..b893309d525 100644 --- a/gcp/website/frontend_handlers.py +++ b/gcp/website/frontend_handlers.py @@ -30,6 +30,7 @@ from flask import send_from_directory from werkzeug.security import safe_join from werkzeug import exceptions +from collections import OrderedDict from google.cloud import ndb from cvss import CVSS2, CVSS3, CVSS4 @@ -302,9 +303,26 @@ def bug_to_response(bug, detailed=True): if detailed: add_links(response) add_source_info(bug, response) + add_stream_info(bug, response) + return response +def add_stream_info(bug, response): + """Add upstream hierarchy information to `response`.""" + # Check whether there are upstreams + if bug.upstream_raw: + upstream_hierarchy_string = get_upstreams_of_vulnerability( + bug.db_id, response['upstream']) + response['upstream_hierarchy'] = upstream_hierarchy_string + # Check whether there are downstreams + downstreams = _get_downstreams_of_bug_query(bug.db_id) + if downstreams: + downstream_hierarchy_string = compute_downstream_hierarchy( + bug.db_id, downstreams) + response['downstream_hierarchy'] = downstream_hierarchy_string + + def calculate_severity_details( severity: dict) -> tuple[float | None, str | None]: """Calculate score and rating of severity""" @@ -786,3 +804,250 @@ def cvss_calculator_url(severity): def relative_time(timestamp: str) -> str: """Convert the input to a human-readable relative time.""" return utils.relative_time(timestamp) + + +def construct_hierarchy_string(target_bug_id: str, root_nodes: set[str], + graph: dict[str, set[str]]) -> str: + """Constructs a hierarchy string for display. + + Args: + target_bug_id: The ID of the target bug. + root_nodes: a list of root_nodes + graph: A dictionary representing the tree to build. + + Returns: + A string representing the hierarchy for display by the frontend. + """ + output_lines = [] + + def print_subtree(vuln_id: str) -> None: + """ + Recursively formats the subtree starting from the given vuln_id. + + Args: + vuln_id (str): The starting vuln_id for printing the subtree. + """ + if vuln_id != target_bug_id: + if osv_has_vuln(vuln_id): + output_lines.append("
  • " + + vuln_id + "
  • ") + else: + output_lines.append("
  • " + vuln_id + "
  • ") + + if vuln_id in graph: + for child in graph[vuln_id]: + if child != target_bug_id: + output_lines.append("
      ") + print_subtree(child) + output_lines.append("
    ") + + for root in root_nodes: + output_lines.append("
      ") + print_subtree(root) + output_lines.append("
    ") + + final_string = "".join(output_lines) + return final_string + + +def get_upstreams_of_vulnerability(target_bug_id: str, + transitive_upstreams: list[str]) -> str: + """Gets the upstream hierarchy of a vulnerability. + + Args: + target_bug_id: The ID of the target bug. + + Returns: + A string representing the upstream hierarchy for display by + the frontend. + """ + + bugs_group_dict = {b_id: [] for b_id in transitive_upstreams} + bug_groups_keys = [ + ndb.Key(osv.UpstreamGroup, id) for id in transitive_upstreams + ] + bug_groups_upstream = ndb.get_multi(bug_groups_keys) + if bug_groups_upstream is None: + return None + for bug in bug_groups_upstream: + if bug is not None: + bugs_group_dict[bug.db_id] = bug.upstream_ids + + bugs_group_dict[target_bug_id] = transitive_upstreams + + upstream_hierarchy = _compute_upstream_hierarchy(target_bug_id, + bugs_group_dict) + reversed_graph = reverse_tree(upstream_hierarchy) + if has_cycle(reversed_graph): + logging.error("Cycle detected in upstream hierarchy for %s", target_bug_id) + + return None + + all_children = set() + for children in upstream_hierarchy.values(): + all_children.update(children) + + root_nodes = set(all_children - set(upstream_hierarchy.keys())) + + upstream_hierarchy_string = construct_hierarchy_string( + target_bug_id, root_nodes, reversed_graph) + return upstream_hierarchy_string + + +def _compute_upstream_hierarchy( + target_bug_id: str, bug_groups: dict[str, + list[str]]) -> dict[str, set[str]]: + """Computes all upstream vulnerabilities for the given bug ID. + The returned list contains all of the bug IDs that are upstream of the + target bug ID, including transitive upstreams in a map hierarchy. + bug_group: + { db_id: bug id + upstream_ids: str[bug_ids] + last_modified_date} + """ + visited = set() + upstream_map = {} + to_visit = set([target_bug_id]) + while to_visit: + bug_id = to_visit.pop() + if bug_id in visited: + continue + visited.add(bug_id) + upstreams = set(bug_groups.get(bug_id, [])) + if not upstreams: + continue + for upstream in upstreams: + if upstream not in visited and upstream not in to_visit: + to_visit.add(upstream) + else: + if bug_id not in upstream_map: + upstream_map[bug_id] = set([upstream]) + else: + upstream_map[bug_id].add(upstream) + upstream_map[bug_id] = upstreams + to_visit.update(upstreams - visited) + for k, v in upstream_map.items(): + if k is target_bug_id: + continue + upstream_map[target_bug_id] = upstream_map[target_bug_id] - v + return upstream_map + + +def _get_downstreams_of_bug_query(bug_id: str) -> dict[str, list[str]]: + """Returns a list of all downstream bugs of the given bug ID by querying the + database.""" + downstreams = {} + for bug in osv.UpstreamGroup.query(osv.UpstreamGroup.upstream_ids == bug_id): + downstreams[bug.db_id] = bug.upstream_ids + return downstreams + + +def _get_downstreams_of_bug(bug_id: str, bugs: dict[str, + list[str]]) -> list[str]: + """Returns a list of all downstream bugs of the given bug ID + given a list of bugs.""" + downstreams = [] + for bug in bugs: + if bug_id in bugs[bug]: + downstreams.append(bug) + return downstreams + + +def compute_downstream_hierarchy(target_bug_id: str, + downstreams: dict[str, list[str]]) -> str: + """Computes the hierarchy of all downstream vulnerabilities for the given + bug ID. Returns a constructed string of the downstreams formatted for the + frontend + """ + + downstream_map: dict[str, set[str]] = {} + transitive_downstreams = downstreams + + # Sort downstreams by number of upstreams + transitive_downstreams = OrderedDict( + sorted(transitive_downstreams.items(), key=lambda item: len(item[1]))) + + leaf_bugs: set[str] = set() + + for bug_id, _ in transitive_downstreams.items(): + immediate_downstreams = _get_downstreams_of_bug(bug_id, + transitive_downstreams) + if not immediate_downstreams: + leaf_bugs.add(bug_id) + else: + downstream_map[bug_id] = set(immediate_downstreams) + + root_leaves = leaf_bugs.copy() + for bug_id, downstream_bugs in downstream_map.items(): + for leaf in leaf_bugs: + if leaf in downstream_bugs: + root_leaves.discard(leaf) + root_leaves.add(bug_id) + + downstream_map[target_bug_id] = root_leaves + + hierarchy_string = construct_hierarchy_string(target_bug_id, root_leaves, + downstream_map) + return hierarchy_string + + +def reverse_tree(graph: dict[str, set[str]]) -> dict[str, set[str]]: + """ + Reverses a graph represented as a dictionary + """ + + reversed_graph = {} + for node, children in graph.items(): + for child in children: + if child not in reversed_graph: + reversed_graph[child] = set() + reversed_graph[child].add(node) + + return reversed_graph + + +def has_cycle(graph: dict[str, set[str]]) -> bool: + """ + Determines whether there are any cycles in a directed graph represented + as an adjacency list. + + Args: + graph: A dictionary representing the graph, where keys are nodes and + values are sets of their neighbors. + + Returns: + True if the graph contains a cycle, False otherwise. + """ + + visited = set() + recursion_stack = set() + + def dfs(node: str) -> bool: + """ + Performs Depth-First Search to detect cycles. + + Args: + node: The current node being visited. + + Returns: + True if a cycle is detected, False otherwise. + """ + visited.add(node) + recursion_stack.add(node) + + for neighbor in graph.get(node, set()): + if neighbor in recursion_stack: + return True # Cycle detected + if neighbor not in visited: + if dfs(neighbor): + return True + + recursion_stack.remove(node) + return False + + for node in graph: + if node not in visited: + if dfs(node): + return True + + return False