Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into itrb-test
Browse files Browse the repository at this point in the history
  • Loading branch information
edeutsch committed Aug 14, 2023
2 parents 63a0814 + 3f5b6c7 commit 02aa7a6
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 11 deletions.
5 changes: 3 additions & 2 deletions code/ARAX/ARAXQuery/ARAX_expander.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,9 @@ def apply(self, response, input_parameters, mode: str = "ARAX"):

# Map canonical curies back to the input curies in the QG (where applicable) #1622
self._map_back_to_input_curies(message.knowledge_graph, query_graph, log)
eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response)
overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph)
if mode != "RTXKG2":
eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response)
overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph)
# Return the response and done
kg = message.knowledge_graph
log.info(f"After Expand, the KG has {len(kg.nodes)} nodes and {len(kg.edges)} edges "
Expand Down
7 changes: 4 additions & 3 deletions code/ARAX/ARAXQuery/ARAX_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,11 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
operations = Operations.from_dict(input_operations_dict["operations"])

#### Connect to the message store just once, even if we won't use it
response_cache = ResponseCache()
response_cache.connect()
response.debug(f"Connecting to ResponseCache")
response_cache = ResponseCache() # also calls connect

#### Create a messenger object for basic message processing
response.debug(f"Creating ARAXMessenger instance")
messenger = ARAXMessenger()

#### If there are URIs provided, try to load them
Expand Down Expand Up @@ -636,7 +637,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

#### Multiple messages unsupported
else:
response.debug(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implmented yet! Will use just the first Message for now.")
response.warning(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.")
message = messages[0]

#### Examine the options that were provided and act accordingly
Expand Down
14 changes: 11 additions & 3 deletions code/ARAX/BiolinkHelper/biolink_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import argparse
import json
import os
import sys
import pathlib
import pickle
from collections import defaultdict
Expand All @@ -15,10 +16,12 @@
import yaml
from treelib import Tree

def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)

class BiolinkHelper:

def __init__(self, biolink_version: Optional[str] = None, is_test: bool = False):
eprint("DEBUG: In BiolinkHelper init")
self.biolink_version = biolink_version if biolink_version else self.get_current_arax_biolink_version()
self.root_category = "biolink:NamedThing"
self.root_predicate = "biolink:related_to"
Expand Down Expand Up @@ -112,7 +115,7 @@ def get_canonical_predicates(self, predicates: Union[str, List[str], Set[str]])
valid_predicates = input_predicate_set.intersection(self.biolink_lookup_map["predicates"])
invalid_predicates = input_predicate_set.difference(valid_predicates)
if invalid_predicates:
print(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}")
eprint(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}")
canonical_predicates = {self.biolink_lookup_map["predicates"][predicate]["canonical_predicate"]
for predicate in valid_predicates}
canonical_predicates.update(invalid_predicates) # Go ahead and include those we don't have canonical info for
Expand Down Expand Up @@ -185,7 +188,12 @@ def get_current_arax_biolink_version() -> str:

def _load_biolink_lookup_map(self, is_test: bool = False):
lookup_map_file = pathlib.Path(self.biolink_lookup_map_path)

if is_test or not lookup_map_file.exists():
if is_test:
eprint(f"DEBUG: in test mode")
else:
eprint(f"DEBUG: lookup map not here! {lookup_map_file}")
# Parse the relevant Biolink yaml file and create/save local indexes
return self._create_biolink_lookup_map()
else:
Expand All @@ -195,8 +203,8 @@ def _load_biolink_lookup_map(self, is_test: bool = False):
return biolink_lookup_map

def _create_biolink_lookup_map(self) -> Dict[str, Dict[str, Dict[str, Union[str, List[str], bool]]]]:
print(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one "
f"doesn't yet exist")
eprint(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one "
f"doesn't yet exist")
biolink_lookup_map = {"predicates": dict(), "categories": dict(),
"predicate_mixins": dict(), "category_mixins": dict(),
"aspects": dict(), "directions": dict()}
Expand Down
26 changes: 24 additions & 2 deletions code/ARAX/ResponseCache/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
import requests_cache
from flask import Flask,redirect
import copy
import multiprocessing

import boto3
import timeit
Expand Down Expand Up @@ -46,6 +47,16 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
biolink_version = '3.5.3'


def validate_envelope(process_params):
validator = process_params['validator']
envelope = process_params['envelope']
try:
validator.check_compliance_of_trapi_response(envelope)
except:
eprint(f"ERROR: Validator crashed")
return(validator)


Base = declarative_base()

#### Define the database tables as classes
Expand Down Expand Up @@ -616,9 +627,20 @@ def get_response(self, response_id):
try:
if enable_validation:

#### Perform the validation
#### Set up the validator
validator = TRAPIResponseValidator(trapi_version=schema_version, biolink_version=biolink_version)
validator.check_compliance_of_trapi_response(envelope)

#### Enable multiprocessing to allow parallel processing of multiple envelopes when the GUI sends a bunch at once
#### There's only ever one here, but the GUI sends a bunch of requests which are all subject to the same GIL
enable_multiprocessing = True
if enable_multiprocessing:
pool = multiprocessing.Pool()
eprint("INFO: Launching validator via multiprocessing")
pool_results = pool.map(validate_envelope, [ { 'validator': validator, 'envelope': envelope} ] )
validator = pool_results[0]
else:
validator.check_compliance_of_trapi_response(envelope)

messages: Dict[str, List[Dict[str,str]]] = validator.get_messages()
validation_messages_text = validator.dumps()

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ aiohttp==3.8.5
boto3==1.24.59
tornado==6.3.2
MarkupSafe==2.1.2
reasoner-validator==3.8.0
reasoner-validator==3.8.1
pronto==2.5.3
pygit2==1.10.0
tabulate==0.9.0
Expand Down

0 comments on commit 02aa7a6

Please sign in to comment.