Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ClinicalTrials.gov ingest #144

Merged
merged 15 commits into from
Dec 13, 2023
143 changes: 85 additions & 58 deletions src/indra_cogex/sources/clinicaltrials/__init__.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,4 @@
"""This module implements input for ClinicalTrials.gov data

NOTE: ClinicalTrials.gov are working on a more modern API that is currently
in Beta: https://beta.clinicaltrials.gov/data-about-studies/learn-about-api
Once this API is released, we should switch to using it. The instructions for
using the current/old API are below.

To obtain the custom download for ingest, do the following

1. Go to https://clinicaltrials.gov/api/gui/demo/simple_study_fields

2. Enter the following in the form:

expr=
fields=NCTId,BriefTitle,Condition,ConditionMeshTerm,ConditionMeshId,InterventionName,InterventionType,InterventionMeshTerm,InterventionMeshId
min_rnk=1
max_rnk=500000 # or any number larger than the current number of studies
fmt=csv

3. Send Request

4. Enter the captcha characters into the text box and then press enter
(make sure to use the enter key and not press any buttons).

5. The website will display "please wait… " for a couple of minutes, finally,
the Save to file button will be active.

6. Click the Save to file button to download the response as a txt file.

7. Rename the txt file to clinical_trials.csv and then compress it as
gzip clinical_trials.csv to get clinical_trials.csv.gz, then place
this file into <pystow home>/indra/cogex/clinicaltrials/
"""
"""This module implements input for ClinicalTrials.gov data."""

import logging
from collections import Counter
Expand All @@ -39,12 +7,12 @@

import gilda
import pandas as pd
import pystow
import tqdm

from indra.databases import mesh_client
from indra_cogex.sources.processor import Processor
from indra_cogex.representation import Node, Relation
from indra_cogex.sources.clinicaltrials.download import ensure_clinical_trials_df


logger = logging.getLogger(__name__)
Expand All @@ -55,19 +23,12 @@ class ClinicaltrialsProcessor(Processor):
node_types = ["BioEntity", "ClinicalTrial"]

def __init__(self, path: Union[str, Path, None] = None):
default_path = pystow.join(
"indra",
"cogex",
"clinicaltrials",
name="clinical_trials.csv.gz",
)

if not path:
path = default_path
elif isinstance(path, str):
path = Path(path)
if path is not None:
self.df = pd.read_csv(path, sep=",", skiprows=10)
else:
self.df = ensure_clinical_trials_df()
process_df(self.df)

self.df = pd.read_csv(path, sep=",", skiprows=10)
self.has_trial_cond_ns = []
self.has_trial_cond_id = []
self.has_trial_nct = []
Expand Down Expand Up @@ -95,7 +56,18 @@ def ground_drug(self, drug):
return None

def get_nodes(self):
for index, row in tqdm.tqdm(self.df.iterrows(), total=len(self.df)):
nctid_to_data = {}
for _, row in tqdm.tqdm(self.df.iterrows(), total=len(self.df)):
nctid_to_data[row["NCTId"]] = {
"study_type": or_na(row["StudyType"]), # observational, interventional
kkaris marked this conversation as resolved.
Show resolved Hide resolved
"randomized:boolean": row["randomized"],
"status": or_na(row["OverallStatus"]), # Completed, Active, Recruiting
"phase:int": row["Phase"],
"why_stopped": or_na(row["WhyStopped"]),
"start_year:int": row["start_year"],
"start_year_anticipated:boolean": row["start_year_anticipated"],
}

found_disease_gilda = False
for condition in str(row["Condition"]).split("|"):
cond_term = self.ground_condition(condition)
Expand All @@ -108,16 +80,20 @@ def get_nodes(self):
)
found_disease_gilda = True
if not found_disease_gilda and not pd.isna(row["ConditionMeshId"]):
for mesh_id, mesh_term in zip(row["ConditionMeshId"].split("|"),
row["ConditionMeshTerm"].split("|")):
for mesh_id, mesh_term in zip(
row["ConditionMeshId"].split("|"),
row["ConditionMeshTerm"].split("|"),
):
correct_mesh_id = get_correct_mesh_id(mesh_id, mesh_term)
if not correct_mesh_id:
self.problematic_mesh_ids.append((mesh_id, mesh_term))
continue
self.has_trial_nct.append(row["NCTId"])
self.has_trial_cond_ns.append("MESH")
self.has_trial_cond_id.append(correct_mesh_id)
yield Node(db_ns="MESH", db_id=correct_mesh_id, labels=["BioEntity"])
yield Node(
db_ns="MESH", db_id=correct_mesh_id, labels=["BioEntity"]
)

# We first try grounding the names with Gilda, if any match, we
# use it, if there are no matches, we go by provided MeSH ID
Expand All @@ -138,22 +114,33 @@ def get_nodes(self):
found_drug_gilda = True
# If there is no Gilda much but there are some MeSH IDs given
if not found_drug_gilda and not pd.isna(row["InterventionMeshId"]):
for mesh_id, mesh_term in zip(row["InterventionMeshId"].split("|"),
row["InterventionMeshTerm"].split("|")):
for mesh_id, mesh_term in zip(
row["InterventionMeshId"].split("|"),
row["InterventionMeshTerm"].split("|"),
):
correct_mesh_id = get_correct_mesh_id(mesh_id, mesh_term)
if not correct_mesh_id:
self.problematic_mesh_ids.append((mesh_id, mesh_term))
continue
self.tested_in_int_ns.append("MESH")
self.tested_in_int_id.append(correct_mesh_id)
self.tested_in_nct.append(row["NCTId"])
yield Node(db_ns="MESH", db_id=correct_mesh_id, labels=["BioEntity"])
yield Node(
db_ns="MESH", db_id=correct_mesh_id, labels=["BioEntity"]
)

for nctid in set(self.tested_in_nct) | set(self.has_trial_nct):
yield Node(db_ns="CLINICALTRIALS", db_id=nctid, labels=["ClinicalTrial"])
yield Node(
db_ns="CLINICALTRIALS",
db_id=nctid,
labels=["ClinicalTrial"],
data=nctid_to_data[nctid],
)

logger.info('Problematic MeSH IDs: %s' % str(
Counter(self.problematic_mesh_ids).most_common()))
logger.info(
"Problematic MeSH IDs: %s"
% str(Counter(self.problematic_mesh_ids).most_common())
)

def get_relations(self):
added = set()
Expand Down Expand Up @@ -203,9 +190,49 @@ def get_correct_mesh_id(mesh_id, mesh_term=None):
# ID lookup - done here as grounding just to not have to assume
# perfect / up to date naming conventions in the source data.
if mesh_term:
matches = gilda.ground(mesh_term, namespaces=['MESH'])
matches = gilda.ground(mesh_term, namespaces=["MESH"])
if len(matches) == 1:
for k, v in matches[0].get_groundings():
if k == 'MESH':
if k == "MESH":
return v
return None


def _get_phase(phase_string: str) -> int:
if pd.notna(phase_string) and phase_string[-1].isdigit():
return int(phase_string[-1])
return -1


def process_df(df: pd.DataFrame):
"""Clean up values in DataFrame"""
# Create start year column from StartDate
df["start_year"] = (
df["StartDate"]
.map(lambda s: None if pd.isna(s) else int(s[-4:]))
.astype("Int64")
)

# randomized, Non-Randomized
df["randomized"] = df["DesignAllocation"].map(
lambda s: "true" if pd.notna(s) and s == "Randomized" else "false"
)

# Indicate if the start_year is anticipated or not
df["start_year_anticipated"] = df["StartDateType"].map(
lambda s: "true" if pd.notna(s) and s == "Anticipated" else "false"
)

# Map the phase info for trial to integer (-1 for unknown)
df["Phase"] = df["Phase"].apply(_get_phase)

# Create a Neo4j compatible list of references
df["ReferencePMID"] = df["ReferencePMID"].map(
lambda s: ";".join(f"PUBMED:{pubmed_id}" for pubmed_id in s.split("|")),
na_action="ignore",
)


def or_na(x):
"""Return None if x is NaN, otherwise return x"""
return None if pd.isna(x) else x
156 changes: 156 additions & 0 deletions src/indra_cogex/sources/clinicaltrials/download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
NOTE: ClinicalTrials.gov are working on a more modern API that is currently
in Beta: https://beta.clinicaltrials.gov/data-about-studies/learn-about-api
Once this API is released, we should switch to using it. The instructions for
using the current/old API are below.

Downloading the clinical trials data is now fully automated, but for posterity,
here are the instructions for getting the file manually:

To obtain the custom download for ingest, do the following::

1. Go to https://clinicaltrials.gov/api/gui/demo/simple_study_fields

2. Enter the following in the form:

expr=
fields=NCTId,BriefTitle,Condition,ConditionMeshTerm,ConditionMeshId,InterventionName,InterventionType,InterventionMeshTerm,InterventionMeshId,StudyType
min_rnk=1
max_rnk=500000 # or any number larger than the current number of studies
fmt=csv

3. Send Request

4. Enter the captcha characters into the text box and then press enter
(make sure to use the enter key and not press any buttons).

5. The website will display "please wait… " for a couple of minutes, finally,
the Save to file button will be active.

6. Click the Save to file button to download the response as a txt file.

7. Rename the txt file to clinical_trials.csv and then compress it as
gzip clinical_trials.csv to get clinical_trials.csv.gz, then place
this file into <pystow home>/indra/cogex/clinicaltrials/
"""

from typing import Optional, List

import pystow
import requests
from tqdm.auto import tqdm, trange
import pandas as pd
import io

__all__ = [
"CLINICAL_TRIALS_PATH",
"ensure_clinical_trials_df",
"get_clinical_trials_df",
]

CLINICAL_TRIALS_PATH = pystow.join(
"indra",
"cogex",
"clinicaltrials",
name="clinical_trials.tsv",
)

#: The fields that are used by default. A full list can be found
#: here: https://classic.clinicaltrials.gov/api/info/study_fields_list
DEFAULT_FIELDS = [
"NCTId",
"BriefTitle",
"Condition",
"ConditionMeshTerm",
"ConditionMeshId",
"InterventionName",
"InterventionType",
"InterventionMeshTerm",
"InterventionMeshId",
"StudyType",
"DesignAllocation",
"OverallStatus",
"Phase",
"WhyStopped",
"SecondaryIdType",
"SecondaryId",
"StartDate", # Month [day], year: "November 1, 2023", "May 1984" or NaN
"StartDateType", # "Actual" or "Anticipated" (or NaN)
"ReferencePMID", # these are tagged as relevant by the author, but not necessarily about the trial
]


def ensure_clinical_trials_df(*, refresh: bool = False) -> pd.DataFrame:
"""Download and parse the ClinicalTrials.gov dataframe or load
it, if it's already available.

If refresh is set to true, it will overwrite the existing file.
"""
if CLINICAL_TRIALS_PATH.is_file() and not refresh:
return pd.read_csv(CLINICAL_TRIALS_PATH, sep="\t")
df = get_clinical_trials_df()
df.to_csv(CLINICAL_TRIALS_PATH, sep="\t", index=False)
return df


def get_clinical_trials_df(
page_size: int = 1_000, fields: Optional[List[str]] = None
) -> pd.DataFrame:
"""Download the ClinicalTrials.gov dataframe.

If fields is None, will default to :data:`FIELDS`.

Download takes about 10 minutes and is shown with a progress bar.
"""
if page_size > 1_000:
page_size = 1_000
if fields is None:
fields = DEFAULT_FIELDS
base_params = {
"expr": "",
"min_rnk": 1,
"max_rnk": page_size,
"fmt": "csv",
"fields": ",".join(fields),
}
url = "https://classic.clinicaltrials.gov/api/query/study_fields"

#: This is the number of dummy rows at the beginning of the document
#: before the actual CSV starts
skiprows = 9

beginning = '"NStudiesAvail: '
res = requests.get(url, params=base_params)
for line in res.text.splitlines()[:skiprows]:
if line.startswith(beginning):
total = int(line.removeprefix(beginning).strip('"'))
break
else:
raise ValueError("could not parse total trials")

pages = 1 + total // page_size

tqdm.write(
f"There are {total:,} clinical trials available, iterable in {pages:,} pages of size {page_size:,}."
)

first_page_df = pd.read_csv(io.StringIO(res.text), skiprows=skiprows)

dfs = [first_page_df]

# start on page "1" because we already did page 0 above. Note that we're zero-indexed,
# so "1" is actually is the second page
for page in trange(1, pages, unit="page", desc="Downloading ClinicalTrials.gov"):
min_rnk = page_size * page + 1
max_rnk = page_size * (page + 1)
res = requests.get(
url, params={**base_params, "min_rnk": min_rnk, "max_rnk": max_rnk}
)
page_df = pd.read_csv(io.StringIO(res.text), skiprows=skiprows)
dfs.append(page_df)

return pd.concat(dfs)


if __name__ == "__main__":
ensure_clinical_trials_df(refresh=True)
1 change: 0 additions & 1 deletion src/indra_cogex/sources/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ def _dump_nodes_to_path(self, nodes, nodes_path, sample_path=None, write_mode="w

def _dump_edges(self) -> Path:
sample_path = self.module.join(name="edges_sample.tsv")
logger.info(f"Dumping into {self.edges_path}...")
rels = self.get_relations()
return self._dump_edges_to_path(rels, self.edges_path, sample_path)

Expand Down
Loading
Loading