diff --git a/code/ARAX/ARAXQuery/ARAX_expander.py b/code/ARAX/ARAXQuery/ARAX_expander.py index 5f1ed5541..f3e1f2a98 100644 --- a/code/ARAX/ARAXQuery/ARAX_expander.py +++ b/code/ARAX/ARAXQuery/ARAX_expander.py @@ -697,8 +697,9 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): # Map canonical curies back to the input curies in the QG (where applicable) #1622 self._map_back_to_input_curies(message.knowledge_graph, query_graph, log) - eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response) - overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) + if mode != "RTXKG2": + eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response) + overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) # Return the response and done kg = message.knowledge_graph log.info(f"After Expand, the KG has {len(kg.nodes)} nodes and {len(kg.edges)} edges " diff --git a/code/ARAX/ARAXQuery/ARAX_query.py b/code/ARAX/ARAXQuery/ARAX_query.py index 514cd8ff2..e320a67d5 100644 --- a/code/ARAX/ARAXQuery/ARAX_query.py +++ b/code/ARAX/ARAXQuery/ARAX_query.py @@ -536,10 +536,11 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): operations = Operations.from_dict(input_operations_dict["operations"]) #### Connect to the message store just once, even if we won't use it - response_cache = ResponseCache() - response_cache.connect() + response.debug(f"Connecting to ResponseCache") + response_cache = ResponseCache() # also calls connect #### Create a messenger object for basic message processing + response.debug(f"Creating ARAXMessenger instance") messenger = ARAXMessenger() #### If there are URIs provided, try to load them @@ -636,7 +637,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### Multiple messages unsupported else: - response.debug(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implmented yet! Will use just the first Message for now.") + response.warning(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.") message = messages[0] #### Examine the options that were provided and act accordingly diff --git a/code/ARAX/BiolinkHelper/biolink_helper.py b/code/ARAX/BiolinkHelper/biolink_helper.py index 29dbe6df6..c37172573 100644 --- a/code/ARAX/BiolinkHelper/biolink_helper.py +++ b/code/ARAX/BiolinkHelper/biolink_helper.py @@ -6,6 +6,7 @@ import argparse import json import os +import sys import pathlib import pickle from collections import defaultdict @@ -15,10 +16,12 @@ import yaml from treelib import Tree +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) class BiolinkHelper: def __init__(self, biolink_version: Optional[str] = None, is_test: bool = False): + eprint("DEBUG: In BiolinkHelper init") self.biolink_version = biolink_version if biolink_version else self.get_current_arax_biolink_version() self.root_category = "biolink:NamedThing" self.root_predicate = "biolink:related_to" @@ -112,7 +115,7 @@ def get_canonical_predicates(self, predicates: Union[str, List[str], Set[str]]) valid_predicates = input_predicate_set.intersection(self.biolink_lookup_map["predicates"]) invalid_predicates = input_predicate_set.difference(valid_predicates) if invalid_predicates: - print(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}") + eprint(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}") canonical_predicates = {self.biolink_lookup_map["predicates"][predicate]["canonical_predicate"] for predicate in valid_predicates} canonical_predicates.update(invalid_predicates) # Go ahead and include those we don't have canonical info for @@ -185,7 +188,12 @@ def get_current_arax_biolink_version() -> str: def _load_biolink_lookup_map(self, is_test: bool = False): lookup_map_file = pathlib.Path(self.biolink_lookup_map_path) + if is_test or not lookup_map_file.exists(): + if is_test: + eprint(f"DEBUG: in test mode") + else: + eprint(f"DEBUG: lookup map not here! {lookup_map_file}") # Parse the relevant Biolink yaml file and create/save local indexes return self._create_biolink_lookup_map() else: @@ -195,8 +203,8 @@ def _load_biolink_lookup_map(self, is_test: bool = False): return biolink_lookup_map def _create_biolink_lookup_map(self) -> Dict[str, Dict[str, Dict[str, Union[str, List[str], bool]]]]: - print(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one " - f"doesn't yet exist") + eprint(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one " + f"doesn't yet exist") biolink_lookup_map = {"predicates": dict(), "categories": dict(), "predicate_mixins": dict(), "category_mixins": dict(), "aspects": dict(), "directions": dict()} diff --git a/code/ARAX/ResponseCache/response_cache.py b/code/ARAX/ResponseCache/response_cache.py index a4865d754..ecd369adf 100644 --- a/code/ARAX/ResponseCache/response_cache.py +++ b/code/ARAX/ResponseCache/response_cache.py @@ -19,6 +19,7 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) import requests_cache from flask import Flask,redirect import copy +import multiprocessing import boto3 import timeit @@ -46,6 +47,16 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) biolink_version = '3.5.3' +def validate_envelope(process_params): + validator = process_params['validator'] + envelope = process_params['envelope'] + try: + validator.check_compliance_of_trapi_response(envelope) + except: + eprint(f"ERROR: Validator crashed") + return(validator) + + Base = declarative_base() #### Define the database tables as classes @@ -616,9 +627,20 @@ def get_response(self, response_id): try: if enable_validation: - #### Perform the validation + #### Set up the validator validator = TRAPIResponseValidator(trapi_version=schema_version, biolink_version=biolink_version) - validator.check_compliance_of_trapi_response(envelope) + + #### Enable multiprocessing to allow parallel processing of multiple envelopes when the GUI sends a bunch at once + #### There's only ever one here, but the GUI sends a bunch of requests which are all subject to the same GIL + enable_multiprocessing = True + if enable_multiprocessing: + pool = multiprocessing.Pool() + eprint("INFO: Launching validator via multiprocessing") + pool_results = pool.map(validate_envelope, [ { 'validator': validator, 'envelope': envelope} ] ) + validator = pool_results[0] + else: + validator.check_compliance_of_trapi_response(envelope) + messages: Dict[str, List[Dict[str,str]]] = validator.get_messages() validation_messages_text = validator.dumps() diff --git a/requirements.txt b/requirements.txt index 5b2c309f7..b7a5f90a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,7 +33,7 @@ aiohttp==3.8.5 boto3==1.24.59 tornado==6.3.2 MarkupSafe==2.1.2 -reasoner-validator==3.8.0 +reasoner-validator==3.8.1 pronto==2.5.3 pygit2==1.10.0 tabulate==0.9.0