From bb7b8aedcf9d15c799ca6d7d97a50f0e3af0820f Mon Sep 17 00:00:00 2001 From: Devon Joseph Leadman Date: Mon, 22 Jul 2024 16:47:22 -0400 Subject: [PATCH] gfkg and pakg bug fixes --- .gitignore | 5 + RDAS_GARD/methods.py | 4 +- RDAS_GFKG/methods.py | 6 +- RDAS_GFKG/prep_neo4j_data.py | 33 ++++-- RDAS_GFKG/steps.py | 9 +- RDAS_GFKG/update_grant.py | 4 +- RDAS_PAKG/methods.py | 204 ++++++++++++++++++++++++----------- RDAS_PAKG/update.py | 2 +- config.ini | 3 +- 9 files changed, 182 insertions(+), 88 deletions(-) diff --git a/.gitignore b/.gitignore index 38b4cb6..ad90dfc 100644 --- a/.gitignore +++ b/.gitignore @@ -113,3 +113,8 @@ RDAS_CTKG/eligibility_extraction/ RDAS_CTKG/metamap_cond_out.json RDAS_CTKG/metamap_cond.txt RDAS_GFKG/convert_csv_fields.py +fix_normmap_file_sep.py +project_check.py +project_check_missing.csv +project_check_new.csv +project_check_old.csv diff --git a/RDAS_GARD/methods.py b/RDAS_GARD/methods.py index 3fe5280..57fb464 100755 --- a/RDAS_GARD/methods.py +++ b/RDAS_GARD/methods.py @@ -118,7 +118,7 @@ def create_disease_node(db, data, xrefs): # Include xrefs into GARD node instead "syns":data[6], "orpha":results['Orphanet'] if 'Orphanet' in results else None, "icd10":results['ICD-10'] if 'ICD-10' in results else None, - "umls":results['UMLS'] if 'UMLS' in results else None, + "umls":list(set(results['UMLS'])) if 'UMLS' in results else None, "omim":results['OMIM'] if 'OMIM' in results else None, "snomed":results['SNOMED-CT'] if 'SNOMED-CT' in results else None, "diseaseontology":results['DiseaseOntology'] if 'DiseaseOntology' in results else None, @@ -315,7 +315,7 @@ def get_remaining_umls(db, umls_update=True): INSTANCE.form['SingLinePMID'] = True print('GATHERING GARD UMLS DATA') - db.run('MATCH (x:GARD) WHERE x.UMLS IS NOT NULL SET x.UMLS_Source = "DATALAKE"') + db.run('MATCH (x:GARD) WHERE x.UMLS IS NOT NULL SET x.UMLS_Source = "GARD"') res = db.run('MATCH (x:GARD) WHERE x.UMLS IS NULL SET x.UMLS_Source = "METAMAP" RETURN x.GardId AS gard_id, x.GardName as gard_name').data() gard_strs = [f"{i['gard_id'].replace('GARD:','')}|{normalize(i['gard_name'])}\n" for i in res if i['gard_name']] diff --git a/RDAS_GFKG/methods.py b/RDAS_GFKG/methods.py index 21063f2..472774d 100755 --- a/RDAS_GFKG/methods.py +++ b/RDAS_GFKG/methods.py @@ -764,7 +764,7 @@ def grad_id(title_, Public_health_relevance_statement, abstract_): else: return normalize_combined_dictionary(title_,title_,name,{},{},{},1,1,'title') if Public_health_relevance_statement and isinstance(Public_health_relevance_statement, str): - A, B, C,D = check_sen(Public_health_relevance_statement, nlp) + A, B, C,D = check_sen(Public_health_relevance_statement) name1 = get_gard_abstract_stem_exact(A) name2 = get_gard_abstract_stem_exact(B) name3 = get_gard_abstract_stem_exact(C) @@ -773,7 +773,7 @@ def grad_id(title_, Public_health_relevance_statement, abstract_): if name and (name !={}): return name if abstract_ and isinstance(abstract_, str): - A, B, C , D = check_sen(abstract_, nlp) + A, B, C , D = check_sen(abstract_) name1 = get_gard_abstract_stem_exact(A) name2 = get_gard_abstract_stem_exact(B) name3 = get_gard_abstract_stem_exact(C) @@ -781,7 +781,7 @@ def grad_id(title_, Public_health_relevance_statement, abstract_): name=normalize_combined_dictionary(abstract_,title_,name1,name2,name3,name4,0,0.7,'abstract') if name and (name !={}): return name -def GardNameExtractor(project_title,phr_text,abstract_text, nlp): +def GardNameExtractor(project_title,phr_text,abstract_text): #Abstract1['Gard_name']=Abstract1.apply(lambda x: gard_id(x['project_title'],x['phr_text'],x['abstract_text']), axis=1) gard_ids = grad_id(project_title,phr_text,abstract_text) if gard_ids: diff --git a/RDAS_GFKG/prep_neo4j_data.py b/RDAS_GFKG/prep_neo4j_data.py index 025f13d..2e01e57 100755 --- a/RDAS_GFKG/prep_neo4j_data.py +++ b/RDAS_GFKG/prep_neo4j_data.py @@ -152,25 +152,32 @@ def run_normmap(): print(abs_file, ' -merged- ',prj_file) tmp = pd.read_csv(('{filename}'.format(filename=abs_file)),index_col=False, encoding = "ISO-8859-1") tmp2 = pd.read_csv(('{filename}'.format(filename=prj_file)),index_col=False, usecols=['APPLICATION_ID','PHR', 'PROJECT_TITLE'], encoding = "ISO-8859-1", low_memory=False) - merged_df = pd.merge(tmp, tmp2, on=['APPLICATION_ID']) + merged_df = pd.merge(tmp, tmp2, on=['APPLICATION_ID'], how='outer', indicator='EXISTS_IN_ABSTRACT_FILE') + #merged_df.fillna('', inplace=True) merged_df['APPLICATION_ID'] = merged_df['APPLICATION_ID'].astype(int) merged_df.to_csv(data_raw(f'normmap/RePORTER_NORMMAP_{year}.csv'), index=False) - + norm_files = glob.glob(data_raw('normmap') + '/*.csv') norm_files = sorted(norm_files) for norm_file in norm_files: year = re.findall(r'\d+', norm_file)[0] - if os.path.exists(data_neo4j(f'normmap/normmap_results_{year}.csv')): + + if os.path.exists(data_neo4j(f'normmap/normmap_results_{year}.csv')): #COMMENTED OUT FOR TESTING print(f'{year} Gard-Project mapping file already exists... bypassing') continue # Create CSV files headers - with open(data_neo4j(f'normmap/normmap_results_{year}.csv'), "w") as f: + with open(data_neo4j(f'normmap/normmap_results_{year}.csv'), "w") as f: #COMMENTED OUT FOR TESTING f.writelines(['ID|GARD_id|CONF_SCORE|SEM_SIM\n']) df = pd.read_csv(norm_file, index_col=False, low_memory=False) - chunk_size = int(len(df)/5) thread_list = list() + + #df = df[df['EXISTS_IN_ABSTRACT_FILE']=='right_only'] #TEST + #df = df[['APPLICATION_ID', 'ABSTRACT_TEXT', 'PHR', 'PROJECT_TITLE']] #TEST + + chunk_size = int(len(df)/5) + list_df = [df[i:i+chunk_size] for i in range(0,len(df),chunk_size)] # Create threads to process results @@ -185,10 +192,13 @@ def run_normmap(): combine_normmap_results() print('GARD to Project connections made') + + def get_RD_project_ids(): # Get GARD to Project mappings run_normmap() aggregate_disease_data() + apps = pd.read_csv(data_neo4j("normmap_results.csv"), usecols=["ID"]) # Drop duplicate results and sort by Application ID @@ -275,7 +285,7 @@ def find_RD_apps(input_file, rd_ids): print('Finished ', output_file) def clean_pi (pi_info): - pi_info = pi_info[:len(pi_info)-1] + pi_info = pi_info.replace(";","") return pi_info def cleanup_project_IC_NAME_totalcost(): @@ -608,13 +618,14 @@ def annotate_grant_abstracts(): # Annotate text with four scispaCy models - for model in MODELS: + for model in MODELS[2:]: print(f'*** Annotate with {model} model ***') nlp = load_model(model) for file in input_files: year = file[-8:-4] - + if int(year) < 2006 and model == 'en_ner_bc5cdr_md': + continue try: text = pd.read_csv(file, encoding=ENCODING, dtype={'APPLICATION_ID':int, 'ABSTRACT_TEXT':str}) @@ -779,10 +790,8 @@ def prep_data(data_raw_path: str, data_neo4j_path: str) -> FilesToAdd: merge_project_funding() print("Running select_RD_projects") select_RD_projects() - print("Running cleanup_project_IC_NAME_totalcost") cleanup_project_IC_NAME_totalcost() - print("Running find_RD_core_projects") find_RD_core_projects() print("Running select_RD_patents") @@ -797,7 +806,7 @@ def prep_data(data_raw_path: str, data_neo4j_path: str) -> FilesToAdd: cleanup_pub_country() print("Running select_RD_abstracts") select_RD_abstracts() - + """ # The below stages are extremely slow, so we will only run them for # years that have changed data. @@ -807,6 +816,7 @@ def prep_data(data_raw_path: str, data_neo4j_path: str) -> FilesToAdd: and v in [pygit2.GIT_STATUS_WT_MODIFIED, pygit2.GIT_STATUS_WT_NEW]} ''' + """ print("Running annotation_preprocess_grant") annotation_preprocess_grant() @@ -818,6 +828,7 @@ def prep_data(data_raw_path: str, data_neo4j_path: str) -> FilesToAdd: clean_annotation_source() print("Running map_semantic_types") map_semantic_types() + print("Running fix_escaped_endings") fix_escaped_endings() diff --git a/RDAS_GFKG/steps.py b/RDAS_GFKG/steps.py index 7f5e824..837d4db 100755 --- a/RDAS_GFKG/steps.py +++ b/RDAS_GFKG/steps.py @@ -310,13 +310,10 @@ """, "query": """ - WITH split(data.PI_IDS, ';') as ids, - split(data.PI_NAMEs, ';') as names, data - UNWIND [x in range(0, coalesce(size(ids) - 1, -1)) | - [trim(split(ids[x], '(')[0]), trim(split(names[x], '(')[0])] - ] as pi_data + WITH [data.PI_IDS] as ids, [data.PI_NAMEs] as names, data + UNWIND [x in range(0, coalesce(size(ids) - 1, -1)) | [trim(split(ids[x], '(')[0]), trim(split(names[x], '(')[0])]] as pi_data MERGE (p:PrincipalInvestigator { - pi_id: pi_data[0], + pi_id: coalesce(pi_data[0], ""), pi_name: coalesce(pi_data[1], ""), org_state: coalesce(data.ORG_STATE, ""), org_name: coalesce(data.ORG_NAME, "")}) diff --git a/RDAS_GFKG/update_grant.py b/RDAS_GFKG/update_grant.py index b85bf96..a9dd6fc 100755 --- a/RDAS_GFKG/update_grant.py +++ b/RDAS_GFKG/update_grant.py @@ -58,6 +58,6 @@ def main(db: AlertCypher, restart_raw=False, restart_processed=False): fta = prep_data(f"{sysvars.gnt_files_path}raw", f"{sysvars.gnt_files_path}processed") # run database upgrade steps on only new/modified files - for step in steps[10:]: + for step in steps[11:]: print("\n\n" + step["description"] + "...") - step_to_fn(**step)(db, fta) + step_to_fn(**step)(db, fta) \ No newline at end of file diff --git a/RDAS_PAKG/methods.py b/RDAS_PAKG/methods.py index a4b1b05..0b94d71 100644 --- a/RDAS_PAKG/methods.py +++ b/RDAS_PAKG/methods.py @@ -668,7 +668,7 @@ def fetch_abstracts(pubmedIDs): -def fetch_pubtator_annotations(pubmedId): +def fetch_pubtator_annotations(pubmedIDs): """ Fetch annotations from PubTator for a given PubMed ID. @@ -685,29 +685,36 @@ def fetch_pubtator_annotations(pubmedId): >> print(annotations) {'documents': [{'infons': {}, 'passages': [...], 'annotations': [...], ...}]} """ - + # Splits pubmedIDs into batches of < 100 due to API limit + batches = [pubmedIDs[i * 99:(i + 1) * 99] for i in range((len(pubmedIDs) + 99 - 1) // 99 )] try: + for batch_num, batch in enumerate(batches): + print('BATCH NUM::', str(batch_num)) + + str_batch = ",".join(batch) # Construct the PubTator API URL for the given PubMed ID - pubtatorUrl = "https://www.ncbi.nlm.nih.gov/research/pubtator-api/publications/export/biocjson?pmids=" + pubmedId - - # Make a GET request to fetch PubTator annotations - r = requests.get(pubtatorUrl) + pubtatorUrl = "https://www.ncbi.nlm.nih.gov/research/pubtator-api/publications/export/biocjson?pmids=" + str_batch + + # Make a GET request to fetch PubTator annotations + r = requests.get(pubtatorUrl) + #time.sleep(0.4) - # Check if the response is sucessful and not empty - if (not r or r is None or r ==''): - logging.error(f'Can not find PubTator for: {pubmedId}') - return None - else: - return r.json() + # Check if the response is sucessful and not empty + if (not r or r is None or r ==''): + logging.error(f'fetch_pubtator_annotations: api response empty or not successful') + else: + yield r.json() except TimeoutError as e: #Retry after a short delay if a timeout error occurs - time.sleep(1) - fetch_pubtator_annotations(pubmedId) + print(e) + exit() except ValueError as e: # Return None if theres an issue parsing the response as JSON - return None + print(e) + exit() + #return None @@ -1049,13 +1056,16 @@ def create_keywords(tx, abstractDataRel, article_node): MERGE (k:Keyword {keyword:$keyword}) MERGE (k)- [r:KEYWORD_FOR] -> (a) ''' - - for keyword in abstractDataRel: - if keyword: - tx.run(create_keyword_query, args={ - "article_id":article_node, - "keyword": keyword - }) + # Some articles have all the keywords in one field, therefore we must convert the text to a list if needed + for keyword_field in abstractDataRel: + if keyword_field: + #keyword_field_list = [x.strip() for x in keyword_field.split(', ')] + for keyword in keyword_field: + keyword = keyword.lower() + tx.run(create_keyword_query, args={ + "article_id":article_node, + "keyword": keyword + }) @@ -1349,13 +1359,13 @@ def create_chemicals(tx, abstractDataRel, article_node): create_chemicals_query = ''' MATCH (a:Article) WHERE id(a) = $article_id - MERGE (u:Substance {name:$name, registryNumber:$registryNumber}) - [r:SUBSTANCE_ANNOTATED_BY_PUBMED] -> (a) + MERGE (u:Substance {name:$name, registryNumber:$registryNumber}) MERGE (u)-[r:SUBSTANCE_ANNOTATED_BY_PUBMED]->(a) ''' for chemical in abstractDataRel: tx.run(create_chemicals_query, args={ "article_id":article_node, - "name": chemical['name'] if 'name' in chemical else '', + "name": chemical['name'].lower() if 'name' in chemical else '', "registryNumber": chemical['registryNumber'] if 'registryNumber' in chemical else '', }) @@ -1384,21 +1394,6 @@ def create_annotations(tx, pubtatorData, article_node, today): """ if pubtatorData: - create_annotations_query = ''' - MATCH(a:Article) WHERE id(a) = $article_id - MERGE (pa:PubtatorAnnotation { - text = $text - }) - ON MATCH - SET LastUpdatedRDAS = $rdasupdated - ON CREATE - SET infons_identifier:$infons_identifier - SET DateCreatedRDAS = $rdascreated - SET LastUpdatedRDAS = $rdasupdated - SET infons_type = $infons_type - MERGE (pa)- [r:ANNOTATION_FOR { type: $type }] -> (a) - ''' - for passage in pubtatorData['passages']: type = passage['infons']['type'] if 'type' in passage['infons'] else '' @@ -1420,10 +1415,72 @@ def create_annotations(tx, pubtatorData, article_node, today): temp = temp.split(",") except: pass - parameters['text'] = temp + parameters['text'] = [x.lower() for x in temp] #lowercases all elements in list + + # Check for other connected pubtator annotation relationships and identify the type sources ('title', 'abstract', or 'title and abstract') + # Ex. List of Values; if value is 'title and abstract' then it will be ['title','abstract'] + check = tx.run('MATCH (pa:PubtatorAnnotation {{ text: {text}, infons_type: \'{infons_type}\', infons_identifier: \'{infons_identifier}\' }})-[r:ANNOTATION_FOR]->(a:Article) WHERE ID(a) = {article_id} RETURN DISTINCT r.type as rel_type, ID(r) as rel_id' + .format(text=parameters['text'], + article_id=parameters['article_id'], + infons_type=parameters['infons_type'], + infons_identifier=parameters['infons_identifier'], + type=parameters['type'])).data() + + if len(check) > 0: + existing_type = check[0]['rel_type'] # is a list + incoming_type = parameters['type'] + existing_id = check[0]['rel_id'] + + if existing_type == ['Abstract'] and incoming_type == 'abstract': + continue + if existing_type == ['Title'] and incoming_type == 'title': + continue + if existing_type == ['Title', 'Abstract'] and incoming_type == 'title and abstract': + continue + if existing_type == ['Title', 'Abstract'] and incoming_type == 'title': + continue + if existing_type == ['Title', 'Abstract'] and incoming_type == 'abstract': + continue + + if existing_type == ['Title'] and incoming_type == 'abstract': + parameters['type'] = ['Title', 'Abstract'] + elif existing_type == ['Abstract'] and incoming_type == 'title': + parameters['type'] = ['Title', 'Abstract'] + + tx.run('MATCH ()-[r:ANNOTATION_FOR]->() WHERE ID(r) = {existing_id} SET r.type = {new_type}'.format(existing_id=existing_id, new_type=parameters['type'])) + continue + + else: + type_temp = parameters['type'] + if type_temp == 'title and abstract': + parameters['type'] = ['Title', 'Abstract'] + elif type_temp == 'title': + parameters['type'] = ['Title'] + elif type_temp == 'abstract': + parameters['type'] = ['Abstract'] + + + # Develop Neo4j Query to Populate Annotations (New Node Only) + create_annotations_query = ''' + MATCH (a:Article) WHERE id(a) = {article_id} + MERGE (pa:PubtatorAnnotation {{ text: {text}, infons_type: \'{infons_type}\', infons_identifier: \'{infons_identifier}\' }}) + ON CREATE + SET pa.infons_identifier = \'{infons_identifier}\' + SET pa.DateCreatedRDAS = \'{rdascreated}\' + SET pa.LastUpdatedRDAS = \'{rdasupdated}\' + SET pa.text = {text} + SET pa.infons_type = \'{infons_type}\' + MERGE (pa)-[r:ANNOTATION_FOR {{ type: {type} }} ]-> (a) + '''.format(text=parameters['text'], + article_id=parameters['article_id'], + infons_type=parameters['infons_type'], + rdasupdated=parameters['rdasupdated'], + rdascreated=parameters['rdascreated'], + infons_identifier=parameters['infons_identifier'], + type=parameters['type']) # Execute the Cypher query to create PubtatorAnnotation nodes and associate them with the Article node - txout = tx.run(create_annotations_query, args=parameters) + txout = tx.run(create_annotations_query) @@ -1877,37 +1934,56 @@ def gather_pubtator(db, today): Returns: None """ + in_progress = db.getConf('UPDATE_PROGRESS', 'pubmed_in_progress') + if in_progress == 'True': + current_step = db.getConf('UPDATE_PROGRESS', 'pubmed_pubtator_article_progress') + if not current_step == '': + current_step = int(current_step) + else: + current_step = 0 + else: + current_step = 0 # Retrieve articles without Pubtator annotations - res = db.run('MATCH (x:Article) WHERE NOT (x)--(:PubtatorAnnotation) AND x.pubmed_id IS NOT NULL AND x.hasPubtatorAnnotation IS NULL RETURN x.pubmed_id AS pmid, ID(x) AS id').data() + #res = db.run('MATCH (x:Article) WHERE NOT (x)--(:PubtatorAnnotation) AND x.pubmed_id IS NOT NULL AND x.hasPubtatorAnnotation IS NULL RETURN x.pubmed_id AS pmid, ID(x) AS id').data() + #print(len(res)) + + res = db.run('MATCH (x:Article) WHERE x.pubmed_id IS NOT NULL RETURN x.pubmed_id AS pmid, ID(x) AS id').data() print(len(res)) # Set OMIM only articles to hasPubtatorAnnotation = False since they dont have pubmed_id's db.run('MATCH (x:Article) WHERE x.pubmed_id IS NULL AND x.hasPubtatorAnnotation IS NULL SET x.hasPubtatorAnnotation = FALSE') - + # Iterate over the articles and fetch Pubtator annotations - for idx,r in enumerate(res): - print(idx) + res = res[current_step:] + pmids = [r['pmid'] for r in res] + pmid_to_id = {r['pmid']:r['id'] for r in res} - pmid = r['pmid'] - ID = r['id'] - try: - # Fetch Pubtator annotations for the article - print(ID) - annos = fetch_pubtator_annotations(pmid) - - if annos: - # Create PubtatorAnnotation nodes in the database - create_annotations(db, annos, ID, today) - db.run(f'MATCH (a:Article) WHERE ID(a) = {ID} SET a.hasPubtatorAnnotation = TRUE') - print('annotations created') #TEST - else: - db.run(f'MATCH (a:Article) WHERE ID(a) = {ID} SET a.hasPubtatorAnnotation = FALSE') - print('annotation not created') #TEST + try: + # Fetch Pubtator annotations for the article + for batch in fetch_pubtator_annotations(pmids): + annos = batch['PubTator3'] + + for anno in annos: + cur_pmid = str(anno['pmid']) + article_id = pmid_to_id[cur_pmid] + + if anno: + # Create PubtatorAnnotation nodes in the database + print('ARTICLE_ID::', article_id, 'CURRENT_STEP::', current_step) + + create_annotations(db, anno, article_id, today) + db.run(f'MATCH (a:Article) WHERE ID(a) = {article_id} SET a.hasPubtatorAnnotation = TRUE') + else: + db.run(f'MATCH (a:Article) WHERE ID(a) = {article_id} SET a.hasPubtatorAnnotation = FALSE') + current_step += 1 + db.setConf('UPDATE_PROGRESS', 'pubmed_pubtator_article_progress', str(current_step)) + - except Exception as e: - logging.warning(f'\nException creating annotations for article {pmid}: {e}') + except Exception as e: + #logging.warning(f'\nException creating annotations for article {pmid}: {e}') + print('error in gather_pubtator') @@ -2075,7 +2151,11 @@ def retrieve_articles(db, last_update, updating_to, today): # End of the pipeline, resets the config in_progress values db.setConf('UPDATE_PROGRESS', 'pubmed_current_step', '') + db.setConf('UPDATE_PROGRESS', 'pubmed_disease_article_progress', '') + db.setConf('UPDATE_PROGRESS', 'pubmed_omim_article_progress', '') + db.setConf('UPDATE_PROGRESS', 'pubmed_pubtator_article_progress', '') db.setConf('UPDATE_PROGRESS', 'pubmed_in_progress', 'False') + else: print('Update in progress... bypassing save_gene') diff --git a/RDAS_PAKG/update.py b/RDAS_PAKG/update.py index a0290bd..a7df451 100644 --- a/RDAS_PAKG/update.py +++ b/RDAS_PAKG/update.py @@ -25,5 +25,5 @@ def main (update_from=False, update_to=False): RDAS_PAKG.init.main(update_from=update_from, update_to=update_to) -#main() #TEST +main() #TEST #get_node_counts() diff --git a/config.ini b/config.ini index c440a1b..3e36596 100644 --- a/config.ini +++ b/config.ini @@ -17,7 +17,8 @@ clinical_current_step = pubmed_in_progress = True pubmed_disease_article_progress = 12003 pubmed_omim_article_progress = 12003 -pubmed_current_step = save_epi +pubmed_pubtator_article_progress = 1306087 +pubmed_current_step = save_pubtator grant_in_progress = False ct_update = 02/06/24 pm_update = 02/01/24