Skip to content

Commit

Permalink
Merge pull request #231 from SmartAPI/add-feature-expand
Browse files Browse the repository at this point in the history
Pathfinder feature upgrade-expand and predicate filter
  • Loading branch information
NikkiBytes authored Mar 20, 2024
2 parents a09330b + b94b8fb commit a910f0b
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 74 deletions.
116 changes: 96 additions & 20 deletions src/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from utils.metakg.export import edges2graphml
from utils.metakg.path_finder import MetaKGPathFinder
from utils.metakg.cytoscape_formatter import CytoscapeDataFormatter
from utils.metakg.biolink_helpers import get_expanded_values
from utils.notification import SlackNewAPIMessage, SlackNewTranslatorAPIMessage

logger = logging.getLogger("smartAPI")
Expand Down Expand Up @@ -433,20 +434,6 @@ def initialize(self, *args, **kwargs):
self.pipeline = MetaKGQueryPipeline(ns=self.biothings)
self.biolink_model_toolkit = bmt.Toolkit()

def get_expanded_values(self, value: Union[str, List[str]]) -> List[str]:
"""return exapnded value list for a given biolink class name"""
if isinstance(value, str):
value = [value]
_out = []
for v in value:
try:
v = self.biolink_model_toolkit.get_descendants(v, reflexive=True, formatted=True)
v = [x.split(":")[-1] for x in v] # remove biolink: prefix
except ValueError:
v = [v]
_out.extend(v)
return _out

@capture_exceptions
async def get(self, *args, **kwargs):
expanded_fields = {"subject": False, "object": False, "predicate": False, "node": False}
Expand All @@ -463,7 +450,7 @@ async def get(self, *args, **kwargs):
value_list = getattr(self.args, field)
if not value_list:
continue
value_list = self.get_expanded_values(value_list) if expanded_fields[field] else value_list
value_list = get_expanded_values(value_list, self.biolink_model_toolkit) if expanded_fields[field] else value_list
setattr(self.args, field, value_list)

await super().get(*args, **kwargs)
Expand Down Expand Up @@ -539,22 +526,111 @@ class MetaKGPathFinderHandler(QueryHandler):
**QUERY_KWARGS.get("GET", {}),
"subject": {"type": str, "required": True, "max": 1000},
"object": {"type": str, "required": True, "max": 1000},
"predicate": {"type": list, "max": 10, "default": []},
"cutoff": {"type": int, "default": 3, "max": 5},
"api_details": {"type": bool, "default": False},
"expand": {
"type": list,
"max": 6,
"default": [],
"enum": ["subject", "object", "predicate", "node", "edge", "all"]
},
"rawquery": {"type": bool, "default": False},
},
}

def initialize(self, *args, **kwargs):
super().initialize(*args, **kwargs)
# change the default query pipeline from self.biothings.pipeline
self.pipeline = MetaKGQueryPipeline(ns=self.biothings)
self.biolink_model_toolkit = bmt.Toolkit()

def setup_pathfinder_rawquery(self, expanded_fields):
# JSON-structured summary of operations and criteria applied
operations_summary = {
"input_parameters": {},
"expansion_logic": {},
"search_criteria": []
}

# Include original query parameters
operations_summary["input_parameters"] = {
"subject": self.args.subject,
"object": self.args.object,
"predicate": getattr(self.args, 'predicate', None) # Including predicate if provided
}

# Detail the expansion logic in a way that explains what expansions are applied
operations_summary["expansion_logic"] = {
"expand_subject": "subject" in self.args.expand or "all" in self.args.expand or "node" in self.args.expand,
"expand_object": "object" in self.args.expand or "all" in self.args.expand or "node" in self.args.expand,
"expand_predicate": "predicate" in self.args.expand,
}

# Summarize the search criteria based on expanded fields
for field, values in expanded_fields.items():
if values: # Ensure values exist for the field before adding
operations_summary["search_criteria"].append({
"field": field,
"description": f"Expanding '{field}' to include {len(values)} variant(s)",
"values": values
})

# The operations_summary is already in a format that can be directly returned as JSON
return operations_summary

@capture_exceptions
async def get(self, *args, **kwargs):
query_data = {"q": self.args.q}
pathfinder = MetaKGPathFinder(query_data=query_data)

# Initialize with the original subject and object, and setup for expansion
expanded_fields = {
"subject": [self.args.subject],
"object": [self.args.object],
}

# Check if expansion is requested
if self.args.expand:
# Define a set for fields affected by 'node' and 'all' for simpler updates
common_fields = {"subject", "object"}

# Initialize expandable_fields based on 'node' or 'all' presence
expandable_fields = set()
if "node" in self.args.expand or "all" in self.args.expand:
expandable_fields.update(common_fields)
if "edge" in self.args.expand or "all" in self.args.expand:
expandable_fields.add("predicate")

# Add specific fields if mentioned explicitly
expandable_fields.update({field for field in ["subject", "object", "predicate"] if field in self.args.expand})

# Expand the fields as required
for field in expandable_fields:
# Use the built-in utility function, get_expanded_values, to expand the fields
expanded_fields[field] = get_expanded_values(getattr(self.args, field), self.biolink_model_toolkit)

# Initalize pathfinder
pathfinder = MetaKGPathFinder(query_data=query_data, expanded_fields=expanded_fields)

# Initialize the pathfinder results list
paths_with_edges = []

# Run get_paths method to retrieve paths and edges
paths_with_edges = pathfinder.get_paths(
subject=self.args.subject,
object=self.args.object,
expanded_fields=expanded_fields,
cutoff=self.args.cutoff,
api_details=self.args.api_details,
predicate_filter=self.args.predicate
)
# Return the result in JSON format
res = {"paths_with_edges": paths_with_edges}

# Check if rawquery parameter is true -- respond with correct output
if self.args.rawquery:
raw_query_output = self.setup_pathfinder_rawquery(expanded_fields)
self.write(raw_query_output)
return
res = {
"total": len(paths_with_edges),
"paths": paths_with_edges,
}
await asyncio.sleep(0.01)
self.finish(res)
20 changes: 20 additions & 0 deletions src/utils/metakg/biolink_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Union, List
import bmt

# Initialize the Biolink Model Toolkit instance globally if it's used frequently
# or pass it as a parameter to functions that require it.
toolkit = bmt.Toolkit()

def get_expanded_values(value: Union[str, List[str]], toolkit_instance=toolkit) -> List[str]:
"""Return expanded value list for a given Biolink class name."""
if isinstance(value, str):
value = [value]
_out = []
for v in value:
try:
v = toolkit_instance.get_descendants(v, reflexive=True, formatted=True)
v = [x.split(":")[-1] for x in v] # Remove 'biolink:' prefix
except ValueError:
v = [v]
_out.extend(v)
return _out
129 changes: 75 additions & 54 deletions src/utils/metakg/path_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@


class MetaKGPathFinder:
def __init__(self, query_data=None):
def __init__(self, query_data=None, expanded_fields=None):
"""
Initialize the MetaKGPathFinder class.
This class is responsible for creating a network graph from indexed
documents and providing functionalities to find paths between two nodes
in the graph.
Parameters:
- query_data: dict (default=None)
Optional data to filter which documents to use while creating the graph.
- expanded_fields: dict (default=None)
Optional fields to expand subjects and objects in the graph.
"""
self.predicates = {}
self.expanded_fields = expanded_fields or {"subject": [], "object": []}
self.get_graph(query_data=query_data)

def get_graph(self, query_data=None):
Expand Down Expand Up @@ -60,60 +59,82 @@ def get_graph(self, query_data=None):

return self.G

def get_paths(self, subject, object, cutoff=3, api_details=False):
def build_edge_results(self, paths_data, data, api_details, source_node, target_node):
"""
Find all simple paths between two nodes in the graph.
Adds edge details between two nodes to the paths data structure.
Parameters:
- paths_data (dict): The paths data structure being built up.
- data (dict): Data about the edge, including the predicate and APIs.
- api_details (bool): If True, include full API details; otherwise, include minimal API information.
- source_node (str): Identifier for the source node of the edge.
- target_node (str): Identifier for the target node of the edge.
This method retrieves all possible paths between a given subject and
object in the graph, up to a specified cutoff length.
Returns:
- dict: The updated paths_data structure with the new edge added.
"""
# Case: Give full api results in response
if api_details:
api_content = data["api"]
else:
api_content = [{"name": item.get("name", None), "smartapi": {"id": item["smartapi"]["id"]}} for item in data["api"]]
paths_data["edges"].append(
{
"subject": source_node,
"object": target_node,
"predicate": data["predicate"],
"api": api_content,
}
)
return paths_data

def get_paths(self, expanded_fields, cutoff=3, api_details=False, predicate_filter=None):
"""
Find all simple paths between expanded subjects and objects in the graph.
Parameters:
- subject: str
The starting node in the graph.
- object: str
The ending node in the graph.
- cutoff: int (default=3)
The maximum length for any path returned.
- api_details: bool (default=False)
If True, the full details of the 'api' are included in the result.
If False, only the 'name' attribute of each 'api' entry is retained.
- expanded_fields: (dict) The expanded fields containing lists of subjects and objects.
- cutoff: (int, default=3) The maximum length for any path returned.
- api_details: (bool, default=False) If True, includes full details of the 'api' in the result.
- predicate_filter: (list, default=None) A list of predicates to filter the results by.
Returns:
- paths_with_edges: list of dict
A list containing paths and their edge information.
- all_paths_with_edges: (list of dict) A list containing paths and their edge information for all subject-object pairs.
"""

paths_with_edges = []

if nx.has_path(self.G, subject, object):
raw_paths = list(nx.all_simple_paths(self.G, source=subject, target=object, cutoff=cutoff))
for path in raw_paths:
paths_data = {"path": path, "edges": []}

for i in range(len(path) - 1):
source_node = path[i]
target_node = path[i + 1]
edge_key = f"{source_node}-{target_node}"
edge_data = self.predicates.get(edge_key, [])

for data in edge_data:
# if api_details add full api list, else add selected keys only
if api_details:
api_content = data["api"]
else:
api_content = [
{"name": item.get("name", None), "smartapi": {"id": item["smartapi"]["id"]}}
for item in data["api"]
]
paths_data["edges"].append(
{
"subject": source_node,
"object": target_node,
"predicate": data["predicate"],
"api": api_content,
}
)

paths_with_edges.append(paths_data)

return paths_with_edges
all_paths_with_edges = []

# Convert predicate_filter to a set for faster lookups if it's not None
predicate_filter_set = set(predicate_filter) if predicate_filter else None
# Add predicates from expanded_fields['predicate'] if it exists and is not None
if 'predicate' in expanded_fields and expanded_fields['predicate']:
predicate_filter_set.update(expanded_fields['predicate'])

# Iterate over all combinations of subjects and objects
for subject in expanded_fields["subject"]:
for object in expanded_fields["object"]:
try:
# Check if a path exists between the subject and object
if nx.has_path(self.G, subject, object):
raw_paths = nx.all_simple_paths(self.G, source=subject, target=object, cutoff=cutoff)
for path in raw_paths:
paths_data = {"path": path, "edges": []}
edge_added = False # Flag to track if any edge has been added
for i in range(len(path) - 1):
source_node = path[i]
target_node = path[i + 1]
edge_key = f"{source_node}-{target_node}"
edge_data = self.predicates.get(edge_key, [])

for data in edge_data:
# Case: Filter edges based on predicate
if predicate_filter_set and data["predicate"] not in predicate_filter_set:
continue # Skip this edge
paths_data = self.build_edge_results(paths_data, data, api_details, source_node, target_node)
edge_added = True # Mark that we've added at least one edge
if edge_added: # Only add paths_data if at least one edge was added
all_paths_with_edges.append(paths_data)
except Exception as e:
continue # Explicitly continue to the next subject-object pair

return all_paths_with_edges

0 comments on commit a910f0b

Please sign in to comment.