diff --git a/src/handlers/api.py b/src/handlers/api.py index 192c9d92..be546788 100644 --- a/src/handlers/api.py +++ b/src/handlers/api.py @@ -21,6 +21,7 @@ from utils.metakg.export import edges2graphml from utils.metakg.path_finder import MetaKGPathFinder from utils.metakg.cytoscape_formatter import CytoscapeDataFormatter +from utils.metakg.biolink_helpers import get_expanded_values from utils.notification import SlackNewAPIMessage, SlackNewTranslatorAPIMessage logger = logging.getLogger("smartAPI") @@ -433,20 +434,6 @@ def initialize(self, *args, **kwargs): self.pipeline = MetaKGQueryPipeline(ns=self.biothings) self.biolink_model_toolkit = bmt.Toolkit() - def get_expanded_values(self, value: Union[str, List[str]]) -> List[str]: - """return exapnded value list for a given biolink class name""" - if isinstance(value, str): - value = [value] - _out = [] - for v in value: - try: - v = self.biolink_model_toolkit.get_descendants(v, reflexive=True, formatted=True) - v = [x.split(":")[-1] for x in v] # remove biolink: prefix - except ValueError: - v = [v] - _out.extend(v) - return _out - @capture_exceptions async def get(self, *args, **kwargs): expanded_fields = {"subject": False, "object": False, "predicate": False, "node": False} @@ -463,7 +450,7 @@ async def get(self, *args, **kwargs): value_list = getattr(self.args, field) if not value_list: continue - value_list = self.get_expanded_values(value_list) if expanded_fields[field] else value_list + value_list = get_expanded_values(value_list, self.biolink_model_toolkit) if expanded_fields[field] else value_list setattr(self.args, field, value_list) await super().get(*args, **kwargs) @@ -539,22 +526,111 @@ class MetaKGPathFinderHandler(QueryHandler): **QUERY_KWARGS.get("GET", {}), "subject": {"type": str, "required": True, "max": 1000}, "object": {"type": str, "required": True, "max": 1000}, + "predicate": {"type": list, "max": 10, "default": []}, "cutoff": {"type": int, "default": 3, "max": 5}, "api_details": {"type": bool, "default": False}, + "expand": { + "type": list, + "max": 6, + "default": [], + "enum": ["subject", "object", "predicate", "node", "edge", "all"] + }, + "rawquery": {"type": bool, "default": False}, }, } + def initialize(self, *args, **kwargs): + super().initialize(*args, **kwargs) + # change the default query pipeline from self.biothings.pipeline + self.pipeline = MetaKGQueryPipeline(ns=self.biothings) + self.biolink_model_toolkit = bmt.Toolkit() + + def setup_pathfinder_rawquery(self, expanded_fields): + # JSON-structured summary of operations and criteria applied + operations_summary = { + "input_parameters": {}, + "expansion_logic": {}, + "search_criteria": [] + } + + # Include original query parameters + operations_summary["input_parameters"] = { + "subject": self.args.subject, + "object": self.args.object, + "predicate": getattr(self.args, 'predicate', None) # Including predicate if provided + } + + # Detail the expansion logic in a way that explains what expansions are applied + operations_summary["expansion_logic"] = { + "expand_subject": "subject" in self.args.expand or "all" in self.args.expand or "node" in self.args.expand, + "expand_object": "object" in self.args.expand or "all" in self.args.expand or "node" in self.args.expand, + "expand_predicate": "predicate" in self.args.expand, + } + + # Summarize the search criteria based on expanded fields + for field, values in expanded_fields.items(): + if values: # Ensure values exist for the field before adding + operations_summary["search_criteria"].append({ + "field": field, + "description": f"Expanding '{field}' to include {len(values)} variant(s)", + "values": values + }) + + # The operations_summary is already in a format that can be directly returned as JSON + return operations_summary + @capture_exceptions async def get(self, *args, **kwargs): query_data = {"q": self.args.q} - pathfinder = MetaKGPathFinder(query_data=query_data) + + # Initialize with the original subject and object, and setup for expansion + expanded_fields = { + "subject": [self.args.subject], + "object": [self.args.object], + } + + # Check if expansion is requested + if self.args.expand: + # Define a set for fields affected by 'node' and 'all' for simpler updates + common_fields = {"subject", "object"} + + # Initialize expandable_fields based on 'node' or 'all' presence + expandable_fields = set() + if "node" in self.args.expand or "all" in self.args.expand: + expandable_fields.update(common_fields) + if "edge" in self.args.expand or "all" in self.args.expand: + expandable_fields.add("predicate") + + # Add specific fields if mentioned explicitly + expandable_fields.update({field for field in ["subject", "object", "predicate"] if field in self.args.expand}) + + # Expand the fields as required + for field in expandable_fields: + # Use the built-in utility function, get_expanded_values, to expand the fields + expanded_fields[field] = get_expanded_values(getattr(self.args, field), self.biolink_model_toolkit) + + # Initalize pathfinder + pathfinder = MetaKGPathFinder(query_data=query_data, expanded_fields=expanded_fields) + + # Initialize the pathfinder results list + paths_with_edges = [] + + # Run get_paths method to retrieve paths and edges paths_with_edges = pathfinder.get_paths( - subject=self.args.subject, - object=self.args.object, + expanded_fields=expanded_fields, cutoff=self.args.cutoff, api_details=self.args.api_details, + predicate_filter=self.args.predicate ) - # Return the result in JSON format - res = {"paths_with_edges": paths_with_edges} + + # Check if rawquery parameter is true -- respond with correct output + if self.args.rawquery: + raw_query_output = self.setup_pathfinder_rawquery(expanded_fields) + self.write(raw_query_output) + return + res = { + "total": len(paths_with_edges), + "paths": paths_with_edges, + } await asyncio.sleep(0.01) self.finish(res) diff --git a/src/utils/metakg/biolink_helpers.py b/src/utils/metakg/biolink_helpers.py new file mode 100644 index 00000000..9089911d --- /dev/null +++ b/src/utils/metakg/biolink_helpers.py @@ -0,0 +1,20 @@ +from typing import Union, List +import bmt + +# Initialize the Biolink Model Toolkit instance globally if it's used frequently +# or pass it as a parameter to functions that require it. +toolkit = bmt.Toolkit() + +def get_expanded_values(value: Union[str, List[str]], toolkit_instance=toolkit) -> List[str]: + """Return expanded value list for a given Biolink class name.""" + if isinstance(value, str): + value = [value] + _out = [] + for v in value: + try: + v = toolkit_instance.get_descendants(v, reflexive=True, formatted=True) + v = [x.split(":")[-1] for x in v] # Remove 'biolink:' prefix + except ValueError: + v = [v] + _out.extend(v) + return _out diff --git a/src/utils/metakg/path_finder.py b/src/utils/metakg/path_finder.py index 3699ed6a..7309276d 100644 --- a/src/utils/metakg/path_finder.py +++ b/src/utils/metakg/path_finder.py @@ -5,19 +5,18 @@ class MetaKGPathFinder: - def __init__(self, query_data=None): + def __init__(self, query_data=None, expanded_fields=None): """ Initialize the MetaKGPathFinder class. - This class is responsible for creating a network graph from indexed - documents and providing functionalities to find paths between two nodes - in the graph. - Parameters: - query_data: dict (default=None) Optional data to filter which documents to use while creating the graph. + - expanded_fields: dict (default=None) + Optional fields to expand subjects and objects in the graph. """ self.predicates = {} + self.expanded_fields = expanded_fields or {"subject": [], "object": []} self.get_graph(query_data=query_data) def get_graph(self, query_data=None): @@ -60,60 +59,82 @@ def get_graph(self, query_data=None): return self.G - def get_paths(self, subject, object, cutoff=3, api_details=False): + def build_edge_results(self, paths_data, data, api_details, source_node, target_node): """ - Find all simple paths between two nodes in the graph. + Adds edge details between two nodes to the paths data structure. + + Parameters: + - paths_data (dict): The paths data structure being built up. + - data (dict): Data about the edge, including the predicate and APIs. + - api_details (bool): If True, include full API details; otherwise, include minimal API information. + - source_node (str): Identifier for the source node of the edge. + - target_node (str): Identifier for the target node of the edge. - This method retrieves all possible paths between a given subject and - object in the graph, up to a specified cutoff length. + Returns: + - dict: The updated paths_data structure with the new edge added. + """ + # Case: Give full api results in response + if api_details: + api_content = data["api"] + else: + api_content = [{"name": item.get("name", None), "smartapi": {"id": item["smartapi"]["id"]}} for item in data["api"]] + paths_data["edges"].append( + { + "subject": source_node, + "object": target_node, + "predicate": data["predicate"], + "api": api_content, + } + ) + return paths_data + + def get_paths(self, expanded_fields, cutoff=3, api_details=False, predicate_filter=None): + """ + Find all simple paths between expanded subjects and objects in the graph. Parameters: - - subject: str - The starting node in the graph. - - object: str - The ending node in the graph. - - cutoff: int (default=3) - The maximum length for any path returned. - - api_details: bool (default=False) - If True, the full details of the 'api' are included in the result. - If False, only the 'name' attribute of each 'api' entry is retained. + - expanded_fields: (dict) The expanded fields containing lists of subjects and objects. + - cutoff: (int, default=3) The maximum length for any path returned. + - api_details: (bool, default=False) If True, includes full details of the 'api' in the result. + - predicate_filter: (list, default=None) A list of predicates to filter the results by. Returns: - - paths_with_edges: list of dict - A list containing paths and their edge information. + - all_paths_with_edges: (list of dict) A list containing paths and their edge information for all subject-object pairs. """ - paths_with_edges = [] - - if nx.has_path(self.G, subject, object): - raw_paths = list(nx.all_simple_paths(self.G, source=subject, target=object, cutoff=cutoff)) - for path in raw_paths: - paths_data = {"path": path, "edges": []} - - for i in range(len(path) - 1): - source_node = path[i] - target_node = path[i + 1] - edge_key = f"{source_node}-{target_node}" - edge_data = self.predicates.get(edge_key, []) - - for data in edge_data: - # if api_details add full api list, else add selected keys only - if api_details: - api_content = data["api"] - else: - api_content = [ - {"name": item.get("name", None), "smartapi": {"id": item["smartapi"]["id"]}} - for item in data["api"] - ] - paths_data["edges"].append( - { - "subject": source_node, - "object": target_node, - "predicate": data["predicate"], - "api": api_content, - } - ) - - paths_with_edges.append(paths_data) - - return paths_with_edges + all_paths_with_edges = [] + + # Convert predicate_filter to a set for faster lookups if it's not None + predicate_filter_set = set(predicate_filter) if predicate_filter else None + # Add predicates from expanded_fields['predicate'] if it exists and is not None + if 'predicate' in expanded_fields and expanded_fields['predicate']: + predicate_filter_set.update(expanded_fields['predicate']) + + # Iterate over all combinations of subjects and objects + for subject in expanded_fields["subject"]: + for object in expanded_fields["object"]: + try: + # Check if a path exists between the subject and object + if nx.has_path(self.G, subject, object): + raw_paths = nx.all_simple_paths(self.G, source=subject, target=object, cutoff=cutoff) + for path in raw_paths: + paths_data = {"path": path, "edges": []} + edge_added = False # Flag to track if any edge has been added + for i in range(len(path) - 1): + source_node = path[i] + target_node = path[i + 1] + edge_key = f"{source_node}-{target_node}" + edge_data = self.predicates.get(edge_key, []) + + for data in edge_data: + # Case: Filter edges based on predicate + if predicate_filter_set and data["predicate"] not in predicate_filter_set: + continue # Skip this edge + paths_data = self.build_edge_results(paths_data, data, api_details, source_node, target_node) + edge_added = True # Mark that we've added at least one edge + if edge_added: # Only add paths_data if at least one edge was added + all_paths_with_edges.append(paths_data) + except Exception as e: + continue # Explicitly continue to the next subject-object pair + + return all_paths_with_edges