diff --git a/src/indra_cogex/apps/utils.py b/src/indra_cogex/apps/utils.py index 287c45f80..7d4aa0b49 100644 --- a/src/indra_cogex/apps/utils.py +++ b/src/indra_cogex/apps/utils.py @@ -1,4 +1,3 @@ -import codecs import json import numpy import logging @@ -21,6 +20,7 @@ from indra.assemblers.html.assembler import _format_evidence_text, _format_stmt_text from indra.statements import Statement from indra.util.statement_presentation import _get_available_ev_source_counts +from indra_cogex.util import unicode_escape, UnicodeEscapeError from indra_cogex.apps.constants import VUE_SRC_JS, VUE_SRC_CSS, sources_dict from indra_cogex.apps.curation_cache.curation_cache import Curations from indra_cogex.apps.proxies import curation_cache @@ -127,38 +127,6 @@ def render_statements( ) -class UnicodeEscapeError(Exception): - pass - - -def unicode_escape(s: str, attempt: int = 1, max_attempts: int = 5) -> str: - """Remove extra escapes from unicode characters in a string - - Parameters - ---------- - s : - A string to remove extra escapes in unicode characters from - attempt : - The current attempt number. - max_attempts : - The maximum number of attempts to remove extra escapes. - - Returns - ------- - : - The string with extra escapes removed. - """ - escaped = codecs.escape_decode(s)[0].decode() - # No more escaping needed - if escaped.count('\\\\u') == 0: - return bytes(escaped, "utf-8").decode("unicode_escape") - # Too many attempts, return the input - if attempt >= max_attempts: - raise UnicodeEscapeError(f"Could not remove extra escapes from {s}") - # Try again - return unicode_escape(escaped, attempt + 1, max_attempts) - - def format_stmts( stmts: Iterable[Statement], evidence_counts: Optional[Mapping[int, int]] = None, diff --git a/src/indra_cogex/sources/indra_db/__init__.py b/src/indra_cogex/sources/indra_db/__init__.py index 1522bf32e..014560077 100644 --- a/src/indra_cogex/sources/indra_db/__init__.py +++ b/src/indra_cogex/sources/indra_db/__init__.py @@ -2,7 +2,6 @@ """Processor for the INDRA database.""" -import codecs import csv import gzip import json @@ -15,7 +14,6 @@ from pathlib import Path from typing import Iterable, Optional, Tuple, Union -from indra.databases.identifiers import ensure_prefix_if_needed from indra.statements import ( Agent, default_ns_order, @@ -37,6 +35,7 @@ processed_stmts_fname, stmts_from_json, ) +from indra_cogex.util import load_stmt_json_str logger = logging.getLogger(__name__) @@ -86,7 +85,7 @@ def get_nodes(self): # noqa:D102 batch_iter(reader, batch_size=batch_size, return_func=list), desc="Getting BioEntity nodes", ): - sj_list = [load_statement_json(sjs) for _, sjs in batch] + sj_list = [load_stmt_json_str(sjs) for _, sjs in batch] stmts = stmts_from_json(sj_list) for stmt in stmts: for agent in stmt.real_agent_list(): @@ -125,7 +124,7 @@ def get_relations(self, max_complex_members: int = 3): # noqa:D102 f"statement hash {stmt_hash}. Are the source files updated?" ) continue - stmt_json = load_statement_json(stmt_json_str) + stmt_json = load_stmt_json_str(stmt_json_str) if stmt_json["evidence"][0]["source_api"] == "medscan": stmt_json["evidence"] = [] data = { @@ -237,11 +236,7 @@ def get_nodes(self, num_rows: Optional[int] = None) -> Iterable[Node]: stmt_hash = int(stmt_hash_str) if stmt_hash not in included_hashes: continue - try: - stmt_json = load_statement_json(stmt_json_str) - except StatementJSONDecodeError as e: - logger.warning(e) - continue + stmt_json = load_stmt_json_str(stmt_json_str) # Loop all evidences # NOTE: there should be a single evidence for each @@ -367,10 +362,6 @@ def _get_node_paths(cls, node_type: str) -> Path: ) -class StatementJSONDecodeError(Exception): - pass - - def get_ag_ns_id(ag: Agent) -> Tuple[str, str]: """Return a namespace, identifier tuple for a given agent. @@ -390,20 +381,6 @@ def get_ag_ns_id(ag: Agent) -> Tuple[str, str]: return None, None -def load_statement_json(json_str: str, attempt: int = 1, max_attempts: int = 5) -> json: - try: - return json.loads(json_str) - except json.JSONDecodeError: - if attempt < max_attempts: - json_str = codecs.escape_decode(json_str)[0].decode() - return load_statement_json( - json_str, attempt=attempt + 1, max_attempts=max_attempts - ) - raise StatementJSONDecodeError( - f"Could not decode statement JSON after " f"{attempt} attempts: {json_str}" - ) - - def load_text_refs_for_reading_dict(fname: str): text_refs = {} for line in tqdm( diff --git a/src/indra_cogex/sources/indra_db/assembly.py b/src/indra_cogex/sources/indra_db/assembly.py index b49245829..798373911 100644 --- a/src/indra_cogex/sources/indra_db/assembly.py +++ b/src/indra_cogex/sources/indra_db/assembly.py @@ -2,7 +2,6 @@ import gzip import logging import math -import json import pickle import itertools from pathlib import Path @@ -11,9 +10,7 @@ import networkx as nx import numpy as np import tqdm -import codecs import pystow -import sqlite3 from collections import defaultdict, Counter from indra.belief import BeliefEngine @@ -27,6 +24,7 @@ unique_stmts_fname, source_counts_fname, ) +from indra_cogex.util import load_stmt_json_str StmtList = List[Statement] @@ -36,10 +34,6 @@ refinement_cycles_fname = base_folder.join(name="refinement_cycles.pkl") -class StatementJSONDecodeError(Exception): - pass - - logger = logging.getLogger(__name__) @@ -86,7 +80,7 @@ def get_refinement_graph() -> nx.DiGraph: try: _, sjs = next(reader1) stmt = stmt_from_json( - load_statement_json(sjs, remove_evidence=True) + load_stmt_json_str(sjs, remove_evidence=True) ) stmts1.append(stmt) except StopIteration: @@ -118,7 +112,8 @@ def get_refinement_graph() -> nx.DiGraph: for _, sjs in batch: try: stmt = stmt_from_json( - load_statement_json(sjs, remove_evidence=True) + load_stmt_json_str(sjs, + remove_evidence=True) ) stmts2.append(stmt) except StopIteration: @@ -173,37 +168,6 @@ def get_refinement_graph() -> nx.DiGraph: return ref_graph -def load_statement_json( - json_str: str, - attempt: int = 1, - max_attempts: int = 5, - remove_evidence: bool = False, -): - try: - return json.loads(json_str) - except json.JSONDecodeError: - if attempt < max_attempts: - json_str = codecs.escape_decode(json_str)[0].decode() - sj = load_statement_json( - json_str, attempt=attempt + 1, max_attempts=max_attempts - ) - if remove_evidence: - sj["evidence"] = [] - return sj - raise StatementJSONDecodeError( - f"Could not decode statement JSON after " f"{attempt} attempts: {json_str}" - ) - - -def get_stmts(db, limit, offset): - cur = db.execute("select * from processed limit %s offset %s" % (limit, offset)) - stmts = [ - stmt_from_json(load_statement_json(sjs, remove_evidence=True)) - for _, sjs in tqdm.tqdm(cur.fetchall(), total=limit, desc="Loading statements") - ] - return stmts - - def get_related(stmts: StmtList) -> Set[Tuple[int, int]]: stmts_by_type = defaultdict(list) for stmt in stmts: @@ -232,34 +196,6 @@ def get_related_split(stmts1: StmtList, stmts2: StmtList) -> Set[Tuple[int, int] return refinements -def sqlite_approach(): - """ - Assembly notes: - - Step 1: Create a SQLITE DB - - sqlite3 -batch statements.db "create table processed (hash integer, stmt text);" - zcat < unique_statements.tsv.gz | sqlite3 -cmd ".mode tabs" -batch statements.db ".import '|cat -' processed" - sqlite3 -batch statements.db "create index processed_idx on processed (hash);" - """ - db = sqlite3.connect(base_folder.join(name="statements.db")) - - cur = db.execute("select count(1) from processed") - num_rows = cur.fetchone()[0] - - offset0 = 0 - num_batches = math.ceil(num_rows / batch_size) - refinements = set() - for i in tqdm.tqdm(range(num_batches)): - offset1 = i * batch_size - stmts1 = get_stmts(db, batch_size, offset1) - refinements |= get_related(stmts1) - for j in tqdm.tqdm(range(i + 1, num_batches)): - offset2 = j * batch_size - stmts2 = get_stmts(db, batch_size, offset2) - refinements |= get_related_split(stmts1, stmts2) - - def sample_unique_stmts( num: int = 100000, n_rows: Optional[int] = None ) -> List[Tuple[int, Statement]]: @@ -293,7 +229,7 @@ def sample_unique_stmts( reader = csv.reader(f, delimiter="\t") for index, (sh, sjs) in enumerate(reader): if index in indices: - stmts.append((int(sh), stmt_from_json(load_statement_json(sjs)))) + stmts.append((int(sh), stmt_from_json(load_stmt_json_str(sjs)))) t.update() if len(stmts) == num: break @@ -390,7 +326,7 @@ def _add_belief_scores_for_batch(batch: List[Tuple[int, Statement]]): try: stmt_hash_string, statement_json_string = next(reader) statement = stmt_from_json( - load_statement_json( + load_stmt_json_str( statement_json_string, remove_evidence=True ) ) diff --git a/src/indra_cogex/sources/indra_db/raw_export.py b/src/indra_cogex/sources/indra_db/raw_export.py index 2b17de16b..2a2ba3fcf 100644 --- a/src/indra_cogex/sources/indra_db/raw_export.py +++ b/src/indra_cogex/sources/indra_db/raw_export.py @@ -11,8 +11,9 @@ import pystow from adeft.download import get_available_models from indra.util import batch_iter -from indra.statements import stmts_from_json +from indra.statements import stmts_from_json, stmt_from_json from indra.tools import assemble_corpus as ac +from indra_cogex.util import load_stmt_json_str base_folder = pystow.module("indra", "db") reading_text_content_fname = base_folder.join(name="reading_text_content_meta.tsv.gz") @@ -30,24 +31,6 @@ logger = logging.getLogger(__name__) -class StatementJSONDecodeError(Exception): - pass - - -def load_statement_json(json_str: str, attempt: int = 1, max_attempts: int = 5): - try: - return json.loads(json_str) - except json.JSONDecodeError: - if attempt < max_attempts: - json_str = codecs.escape_decode(json_str)[0].decode() - return load_statement_json( - json_str, attempt=attempt + 1, max_attempts=max_attempts - ) - raise StatementJSONDecodeError( - f"Could not decode statement JSON after " f"{attempt} attempts: {json_str}" - ) - - def reader_prioritize(reader_contents): drop = set() # We first organize the contents by source/text type @@ -322,7 +305,7 @@ def get_update(start_date): text_ref_id = reading_id_to_text_ref_id.get(int(reading_id)) if text_ref_id: refs = text_refs.get(text_ref_id) - stmt_json = load_statement_json(stmt_json_raw) + stmt_json = load_stmt_json_str(stmt_json_raw) if refs: stmt_json["evidence"][0]["text_refs"] = refs if refs.get("PMID"): @@ -366,7 +349,7 @@ def get_update(start_date): for sh, stmt_json_str in tqdm.tqdm( reader, total=60405451, desc="Gathering grounded and unique statements" ): - stmt = stmts_from_json([load_statement_json(stmt_json_str)])[0] + stmt = stmt_from_json(load_stmt_json_str(stmt_json_str)) if len(stmt.real_agent_list()) < 2: continue if all( diff --git a/src/indra_cogex/util.py b/src/indra_cogex/util.py new file mode 100644 index 000000000..166a6f9e6 --- /dev/null +++ b/src/indra_cogex/util.py @@ -0,0 +1,123 @@ +import codecs +import json +from typing import Any, Dict + + +def unicode_escape(s: str, attempt: int = 1, max_attempts: int = 5) -> str: + """Remove extra escapes from unicode characters in a string + + Parameters + ---------- + s : + A string to remove extra escapes in unicode characters from + attempt : + The current attempt number. + max_attempts : + The maximum number of attempts to remove extra escapes. + + Returns + ------- + : + The string with extra escapes removed. + """ + escaped = codecs.escape_decode(s)[0].decode() + # No more escaping needed + if escaped.count('\\\\u') == 0: + return bytes(escaped, "utf-8").decode("unicode_escape") + # Too many attempts, return the input + if attempt >= max_attempts: + raise UnicodeEscapeError(f"Could not remove extra escapes from {s}") + # Try again + return unicode_escape(escaped, attempt + 1, max_attempts) + + +class UnicodeEscapeError(Exception): + pass + + +def clean_stmt_json_str(stmt_json_str: str) -> str: + """Cleans up a stmt json string by removing double escapes + + Parameters + ---------- + stmt_json_str : + A json string to clean up + + Returns + ------- + : + The cleaned json string + """ + escaped_str = stmt_json_str.replace("\\\\", "\\") + return escaped_str + + +def load_stmt_json_str( + stmt_json_str: str, + remove_evidence: bool = False +) -> Dict[str, Any]: + """Removes extra escapes in a statement json string if necessary + + Parameters + ---------- + stmt_json_str : + A statement json string to load. + remove_evidence : + If True, remove the evidence from the statement json. Default: False. + + Returns + ------- + : + The loaded json object + """ + # The logic in this function comes from looking at two aspects of + # de-serializing the raw statement json string dumped from the principal + # database: + # 1. Can the loaded statement reproduce the original matches hash of the + # raw statement json with stmt.get_hash(refresh=True) after being + # initialized via `indra.statements.io.stmt_from_json`? + # 2. Does json.loads error? + # Denoting a matching hash as T or F for matching or not, and an error + # as 'error' the following table is observed: + # + # | # | json.loads | cleanup + json.loads | pick | + # | | > stmt_from_json | > stmt_from_json | | + # |---|------------------|----------------------|----------------------| + # | 1 | T | T | cleanup + json.loads | + # | 2 | F | T | cleanup + json.loads | + # | 3 | error | T | cleanup + json.loads | + # | 4 | T | error | json.loads | + # + # This means the json string has to be loaded twice, once without + # cleanup and once with cleanup, to check both conditions before + # returning the correct json object. + # + # NOTE: F | F is also possible, and has happened in a few cases (<100 out + # of >75 M raw statements). On inspection, none of these had any escaped + # characters in the json string, so the reason for the mismatch with the + # matches hash is unknown, but is at least not related to the issue of + # doubly escaped characters which this function is meant to address. + # All other combinations of T, F and error have not been observed. + if not stmt_json_str: + raise ValueError("Empty json string") + + # Try clean+load first. If there is no error (this is the vast majority + # of cases), return the cleaned json (case 1, 2 and 3 above). Otherwise, + # return the uncleaned json (case 4 above). + + # Cleaned load + try: + cleaned_str = clean_stmt_json_str(stmt_json_str) + stmt_json = json.loads(cleaned_str) + except (json.JSONDecodeError, UnicodeDecodeError): + # Uncleaned load + try: + stmt_json = json.loads(stmt_json_str) + except Exception as err: + raise UnicodeEscapeError( + f"Could not load statement json string:{err}" + ) from err + + if remove_evidence: + stmt_json["evidence"] = [] + return stmt_json diff --git a/tests/test_doubly_escaped_json_str.py b/tests/test_doubly_escaped_json_str.py new file mode 100644 index 000000000..6c1147aac --- /dev/null +++ b/tests/test_doubly_escaped_json_str.py @@ -0,0 +1,151 @@ +import json + +import pytest + +from indra.statements import stmt_from_json +from indra.tools import assemble_corpus as ac +from indra_cogex.util import load_stmt_json_str + + +def test_escaped_unicode(): + """Test that doubly escaped unicode is handled correctly.""" + source_hash = 8921534277374933489 + sjs = ( + '{"type": "Complex", "members": [{"name": "PPP1CA", "db_refs": {' + '"UP": "P62136", "TEXT": "PP1\\u03b1", "HGNC": "9281"}}, ' + '{"name": "PPP1", "db_refs": {"TEXT": "PP1", "NXPFA": "03001", ' + '"FPLX": "PPP1"}}], "belief": 1.0, "evidence": [{"source_api": ' + '"sparser", "text": "These results suggest that multiple PC1 ' + 'sites are involved in PP1\\u03b1 binding and that PP1\\u03b1 ' + 'interacts with the conserved PP1-binding motif plus additional ' + 'elements within the membrane distal portion of the PC1 ' + 'C-tail.", "annotations": {"found_by": "INTERACT"}, "text_refs": ' + '{"PMID": "PMC18307576"}, "source_hash": 8921534277374933489}], ' + '"id": "eaf7529d-fd65-45b7-86ff-84dbeb764550"}' + ) + sj = load_stmt_json_str(sjs) + stmt = stmt_from_json(sj) + assert stmt.evidence[0].source_hash == source_hash + + # Check that the statement survives a round trip to json.dumps + sjs2 = json.dumps(stmt.to_json()) + sj3 = load_stmt_json_str(sjs2) + stmt3 = stmt_from_json(sj3) + assert stmt3.evidence[0].source_hash == source_hash + + +def test_quadruple_escaped_chemical_name_doubly_escaped_unicode(): + matches_hash = 16637653806582621 + sjs = ( + '{"type": "Activation", "subj": {"name": "N-[2-hydroxy-5-(' + '1-hydroxy-2-\\\\{[1-(' + '4-methoxyphenyl)propan-2-yl]amino\\\\}ethyl)phenyl' + ']formamide", "db_refs": {"CHEBI": "CHEBI:63082", "HMDB": ' + '"HMDB0015118", "PUBCHEM": "3410", "DRUGBANK": "DB00983", "CHEMBL": ' + '"CHEMBL1256786", "CAS": "73573-87-2"}}, "obj": {"name": "ADRB2", ' + '"db_refs": {"UP": "P07550", "HGNC": "286", "EGID": "154"}}, ' + '"obj_activity": "activity", "belief": 1, "evidence": [{' + '"source_api": "signor", "pmid": "20590599", "source_id": ' + '"SIGNOR-257853", "text": "Thus, overall, salmeterol is a highly ' + 'selective \\u03b22-adrenoceptor agonist because of its higher ' + '\\u03b22-affinity and not because of higher \\u03b22-intrinsic ' + 'efficacy. A similar reasoning can be applied to formoterol, although ' + 'this agonist has higher intrinsic efficacy at all three receptors ' + '(rank 6, 8 and 5 at \\u03b21, \\u03b22\\u00a0and \\u03b23).", ' + '"annotations": {"SEQUENCE": null, "MODULATOR_COMPLEX": null, ' + '"TARGET_COMPLEX": null, "MODIFICATIONA": null, "MODASEQ": null, ' + '"MODIFICATIONB": null, "MODBSEQ": null, "NOTES": null, "ANNOTATOR": ' + '"Luana"}, "epistemics": {"direct": true}, "context": {"cell_type": ' + '{"name": null, "db_refs": {"BTO": "BTO:0000457"}}, "species": ' + '{"name": null, "db_refs": {"TAXONOMY": "10030"}}, "type": "bio"}, ' + '"text_refs": {"PMID": "20590599"}, "source_hash": ' + '-4455644815662527647}], "id": ' + '"4697a750-f01c-4d06-80b7-416143e33dd1", "matches_hash": ' + '"16637653806582621"}' + ) + sj = load_stmt_json_str(sjs) + stmt = stmt_from_json(sj) + assert stmt.evidence[0].source_hash == -4455644815662527647 + assert stmt.get_hash(refresh=True) == matches_hash + + # Check that the statement survives a round trip to json.dumps + sjs2 = json.dumps(stmt.to_json()) + sj3 = load_stmt_json_str(sjs2) + stmt3 = stmt_from_json(sj3) + assert stmt3.evidence[0].source_hash == -4455644815662527647 + assert stmt3.get_hash(refresh=True) == matches_hash + + +def test_quad_escaped_unicode(): + sjs = ( + '{"type": "Inhibition", "subj": {"name": "\\\\u0394", "db_refs": {' + '"TEXT": "\\\\u0394"}}, "obj": {"name": "Infections", "db_refs": {' + '"MESH": "D007239", "TEXT": "infection", "EFO": "0000544"}}, ' + '"obj_activity": "activity", "belief": 1, "evidence": [{' + '"source_api": "reach", "text": "A previous study demonstrated that ' + 'Syn61\\\\u03943 resists infection by multiple bacteriophages, ' + 'including Enterobacteria phage T6 .", "annotations": {"found_by": ' + '"Negative_activation_syntax_1_verb", "agents": {"coords": [[40, ' + '41], [51, 60]]}}, "epistemics": {"direct": false, "section_type": ' + 'null}, "text_refs": {"PMID": "78437624"}, "source_hash": ' + '-803868470175671675}], "id": ' + '"0652bc92-7078-4c46-989e-b1a0bebbe348", "matches_hash": ' + '"-24102351504334505"}' + ) + sj = load_stmt_json_str(sjs) + stmt = stmt_from_json(sj) + assert stmt.evidence[0].source_hash == -803868470175671675 + assert stmt.get_hash(refresh=True) == -24102351504334505 + + # Check that the statement survives a round trip to json.dumps + sjs2 = json.dumps(stmt.to_json()) + sj3 = load_stmt_json_str(sjs2) + stmt3 = stmt_from_json(sj3) + assert stmt3.evidence[0].source_hash == -803868470175671675 + assert stmt3.get_hash(refresh=True) == -24102351504334505 + + +@pytest.mark.slow +def test_escaped_db_refs_grounding_mapping(): + sjs = ( + '{"type": "Activation", "subj": {"name": "TGFB1", "db_refs": {' + '"TEXT": "TGF-\\\\u03b21"}}, "obj": {"name": "NOX4", "db_refs": {' + '"HGNC": "7891", "UP": "Q9NPH5", "TEXT": "Nox4"}}, "obj_activity": ' + '"activity", "belief": 1, "evidence": [{"source_api": "medscan", ' + '"pmid": "28063381", "source_id": "info:pmid/28063381", "text": ' + '"Moreover, Nox4, which is constitutively active in renal cells and ' + 'is involvedin the generation of hydrogen peroxide, was up-regulated ' + 'during ureteral obstruction-mediated fibrosis and induced by ' + 'TGF-\\\\u03b21 in HK-2 cells, and this up-regulation could be ' + 'blunted by Brd4 inhibition.", "annotations": {"verb": ' + '"UnknownRegulation-positive", "last_verb": "TK{induce}", "agents": ' + '{"coords": [[196, 202], [10, 14]]}}, "epistemics": {"direct": ' + 'false}, "text_refs": {"PMID": "28063381"}, "source_hash": ' + '4793198277843896406}], "id": "66d48a98-12d4-4a68-8485-cc57d37f677e"}' + ) + sj = load_stmt_json_str(sjs) + stmt = stmt_from_json(sj) + + # Check that the statement survives a round trip to json.dumps + sjs2 = json.dumps(stmt.to_json()) + sj2 = json.loads(sjs2) + stmt2 = stmt_from_json(sj2) + assert stmt2.get_hash(refresh=True) == stmt.get_hash(refresh=True) + + # Check that the cleaning allows for grounding mapping + unesc_sj = json.loads(sjs) + unesc_stmt = stmt_from_json(unesc_sj) + unesc_stmts = ac.fix_invalidities([unesc_stmt], in_place=True) + unesc_stmts = ac.map_grounding(unesc_stmts) + mapped_unesc_stmt = ac.map_sequence(unesc_stmts)[0] + unesc_subj_db_refs = mapped_unesc_stmt.subj.db_refs + + esc_stmt = stmt + esc_stmts = ac.fix_invalidities([esc_stmt], in_place=True) + esc_stmts = ac.map_grounding(esc_stmts) + mapped_esc_stmt = ac.map_sequence(esc_stmts)[0] + esc_subj_db_refs = mapped_esc_stmt.subj.db_refs + + # Relies on that the assemble_corpus pipeline doesn't fix the escaped + # characters + assert unesc_subj_db_refs != esc_subj_db_refs diff --git a/tests/test_web_service_helpers.py b/tests/test_web_service_helpers.py index cfe5ee49b..d8eaff50a 100644 --- a/tests/test_web_service_helpers.py +++ b/tests/test_web_service_helpers.py @@ -4,7 +4,8 @@ import json from indra.statements import Evidence, Agent, Activation -from indra_cogex.apps.utils import unicode_escape, _stmt_to_row +from indra_cogex.apps.utils import _stmt_to_row +from indra_cogex.util import unicode_escape def test_unicode_double_escape():