From 1e5f6129823a1ef2d8e8092a22fed6293085a61a Mon Sep 17 00:00:00 2001 From: Eric Deutsch Date: Thu, 10 Aug 2023 06:20:16 +0000 Subject: [PATCH 1/6] added multiprocessing to the TRAPI validator from ARS to offload the work onto different CPUs --- code/ARAX/ResponseCache/response_cache.py | 25 +++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/code/ARAX/ResponseCache/response_cache.py b/code/ARAX/ResponseCache/response_cache.py index a4865d754..445d95053 100644 --- a/code/ARAX/ResponseCache/response_cache.py +++ b/code/ARAX/ResponseCache/response_cache.py @@ -19,6 +19,7 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) import requests_cache from flask import Flask,redirect import copy +import multiprocessing import boto3 import timeit @@ -46,6 +47,16 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) biolink_version = '3.5.3' +def validate_envelope(process_params): + validator = process_params['validator'] + envelope = process_params['envelope'] + try: + validator.check_compliance_of_trapi_response(envelope) + return(True) + except: + return(False) + + Base = declarative_base() #### Define the database tables as classes @@ -616,9 +627,19 @@ def get_response(self, response_id): try: if enable_validation: - #### Perform the validation + #### Set up the validator validator = TRAPIResponseValidator(trapi_version=schema_version, biolink_version=biolink_version) - validator.check_compliance_of_trapi_response(envelope) + + #### Enable multiprocessing to allow parallel processing of multiple envelopes when the GUI sends a bunch at once + #### There's only ever one here, but the GUI sends a bunch of requests which are all subject to the same GIL + enable_multiprocessing = True + if enable_multiprocessing: + pool = multiprocessing.Pool() + eprint("INFO: Launching validator via multiprocessing") + pool_results = pool.map(validate_envelope, [ { 'validator': validator, 'envelope': envelope} ] ) + else: + validator.check_compliance_of_trapi_response(envelope) + messages: Dict[str, List[Dict[str,str]]] = validator.get_messages() validation_messages_text = validator.dumps() From ed205fea413ca5e1a393ccdf1a78acd017b406d6 Mon Sep 17 00:00:00 2001 From: Eric Deutsch Date: Thu, 10 Aug 2023 16:18:36 +0000 Subject: [PATCH 2/6] Need to explcitly return the validator object from the child process in order to report results --- code/ARAX/ResponseCache/response_cache.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/code/ARAX/ResponseCache/response_cache.py b/code/ARAX/ResponseCache/response_cache.py index 445d95053..ecd369adf 100644 --- a/code/ARAX/ResponseCache/response_cache.py +++ b/code/ARAX/ResponseCache/response_cache.py @@ -52,9 +52,9 @@ def validate_envelope(process_params): envelope = process_params['envelope'] try: validator.check_compliance_of_trapi_response(envelope) - return(True) except: - return(False) + eprint(f"ERROR: Validator crashed") + return(validator) Base = declarative_base() @@ -637,6 +637,7 @@ def get_response(self, response_id): pool = multiprocessing.Pool() eprint("INFO: Launching validator via multiprocessing") pool_results = pool.map(validate_envelope, [ { 'validator': validator, 'envelope': envelope} ] ) + validator = pool_results[0] else: validator.check_compliance_of_trapi_response(envelope) From 03b9b4586a0a7880a7e81e4a2a1cd9b33083d766 Mon Sep 17 00:00:00 2001 From: Kevin Vizhalil Date: Thu, 10 Aug 2023 16:38:48 -0400 Subject: [PATCH 3/6] filtering semmeddb only when run in ARAX --- code/ARAX/ARAXQuery/ARAX_expander.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/code/ARAX/ARAXQuery/ARAX_expander.py b/code/ARAX/ARAXQuery/ARAX_expander.py index 5f1ed5541..f3e1f2a98 100644 --- a/code/ARAX/ARAXQuery/ARAX_expander.py +++ b/code/ARAX/ARAXQuery/ARAX_expander.py @@ -697,8 +697,9 @@ def apply(self, response, input_parameters, mode: str = "ARAX"): # Map canonical curies back to the input curies in the QG (where applicable) #1622 self._map_back_to_input_curies(message.knowledge_graph, query_graph, log) - eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response) - overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) + if mode != "RTXKG2": + eu.remove_semmeddb_edges_and_nodes_with_low_publications(message.knowledge_graph, response) + overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph) # Return the response and done kg = message.knowledge_graph log.info(f"After Expand, the KG has {len(kg.nodes)} nodes and {len(kg.edges)} edges " From 140cd4b3a3c40d91fcf16f08aad40472e672aefb Mon Sep 17 00:00:00 2001 From: isbluis Date: Thu, 10 Aug 2023 22:00:00 +0000 Subject: [PATCH 4/6] Add more debug logging to help resolve #2093 Also promote multiple messages log from debug to warning --- code/ARAX/ARAXQuery/ARAX_query.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/code/ARAX/ARAXQuery/ARAX_query.py b/code/ARAX/ARAXQuery/ARAX_query.py index 514cd8ff2..00d962d86 100644 --- a/code/ARAX/ARAXQuery/ARAX_query.py +++ b/code/ARAX/ARAXQuery/ARAX_query.py @@ -536,10 +536,12 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): operations = Operations.from_dict(input_operations_dict["operations"]) #### Connect to the message store just once, even if we won't use it + response.debug(f"Connecting to ResponseCache") response_cache = ResponseCache() response_cache.connect() #### Create a messenger object for basic message processing + response.debug(f"Creating ARAXMessenger instance") messenger = ARAXMessenger() #### If there are URIs provided, try to load them @@ -636,7 +638,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### Multiple messages unsupported else: - response.debug(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implmented yet! Will use just the first Message for now.") + response.warning(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.") message = messages[0] #### Examine the options that were provided and act accordingly From f0a4ea1df8b475a63c6a1874bb66f926932fe645 Mon Sep 17 00:00:00 2001 From: isbluis Date: Fri, 11 Aug 2023 22:55:56 +0000 Subject: [PATCH 5/6] Add debug diagnostic messages to BiolinkHelper; send all output to STDERR #2093 ARAX_query does not need to call connect on ResponseCache --- code/ARAX/ARAXQuery/ARAX_query.py | 3 +-- code/ARAX/BiolinkHelper/biolink_helper.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/code/ARAX/ARAXQuery/ARAX_query.py b/code/ARAX/ARAXQuery/ARAX_query.py index 00d962d86..e320a67d5 100644 --- a/code/ARAX/ARAXQuery/ARAX_query.py +++ b/code/ARAX/ARAXQuery/ARAX_query.py @@ -537,8 +537,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### Connect to the message store just once, even if we won't use it response.debug(f"Connecting to ResponseCache") - response_cache = ResponseCache() - response_cache.connect() + response_cache = ResponseCache() # also calls connect #### Create a messenger object for basic message processing response.debug(f"Creating ARAXMessenger instance") diff --git a/code/ARAX/BiolinkHelper/biolink_helper.py b/code/ARAX/BiolinkHelper/biolink_helper.py index 29dbe6df6..c37172573 100644 --- a/code/ARAX/BiolinkHelper/biolink_helper.py +++ b/code/ARAX/BiolinkHelper/biolink_helper.py @@ -6,6 +6,7 @@ import argparse import json import os +import sys import pathlib import pickle from collections import defaultdict @@ -15,10 +16,12 @@ import yaml from treelib import Tree +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) class BiolinkHelper: def __init__(self, biolink_version: Optional[str] = None, is_test: bool = False): + eprint("DEBUG: In BiolinkHelper init") self.biolink_version = biolink_version if biolink_version else self.get_current_arax_biolink_version() self.root_category = "biolink:NamedThing" self.root_predicate = "biolink:related_to" @@ -112,7 +115,7 @@ def get_canonical_predicates(self, predicates: Union[str, List[str], Set[str]]) valid_predicates = input_predicate_set.intersection(self.biolink_lookup_map["predicates"]) invalid_predicates = input_predicate_set.difference(valid_predicates) if invalid_predicates: - print(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}") + eprint(f"WARNING: Provided predicate(s) {invalid_predicates} do not exist in Biolink {self.biolink_version}") canonical_predicates = {self.biolink_lookup_map["predicates"][predicate]["canonical_predicate"] for predicate in valid_predicates} canonical_predicates.update(invalid_predicates) # Go ahead and include those we don't have canonical info for @@ -185,7 +188,12 @@ def get_current_arax_biolink_version() -> str: def _load_biolink_lookup_map(self, is_test: bool = False): lookup_map_file = pathlib.Path(self.biolink_lookup_map_path) + if is_test or not lookup_map_file.exists(): + if is_test: + eprint(f"DEBUG: in test mode") + else: + eprint(f"DEBUG: lookup map not here! {lookup_map_file}") # Parse the relevant Biolink yaml file and create/save local indexes return self._create_biolink_lookup_map() else: @@ -195,8 +203,8 @@ def _load_biolink_lookup_map(self, is_test: bool = False): return biolink_lookup_map def _create_biolink_lookup_map(self) -> Dict[str, Dict[str, Dict[str, Union[str, List[str], bool]]]]: - print(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one " - f"doesn't yet exist") + eprint(f"INFO: Building local Biolink {self.biolink_version} ancestor/descendant lookup map because one " + f"doesn't yet exist") biolink_lookup_map = {"predicates": dict(), "categories": dict(), "predicate_mixins": dict(), "category_mixins": dict(), "aspects": dict(), "directions": dict()} From 3f5b6c7cd3099619fa3520732a06fde091ba7bbc Mon Sep 17 00:00:00 2001 From: Eric Deutsch Date: Sat, 12 Aug 2023 03:06:49 +0000 Subject: [PATCH 6/6] upgrade validator to 3.8.1 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 5b2c309f7..b7a5f90a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,7 +33,7 @@ aiohttp==3.8.5 boto3==1.24.59 tornado==6.3.2 MarkupSafe==2.1.2 -reasoner-validator==3.8.0 +reasoner-validator==3.8.1 pronto==2.5.3 pygit2==1.10.0 tabulate==0.9.0