Skip to content

Commit

Permalink
[DC-32271] Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Schmidt committed Nov 7, 2023
2 parents 0fd38e3 + 39e85f5 commit 0910aa3
Show file tree
Hide file tree
Showing 109 changed files with 5,036 additions and 2,062 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# ---
# jupyter:
# jupytext:
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.7.1
# kernelspec:
# display_name: Python 3
# language: python
# name: python3
# ---

# Purpose: Use this notebook to search for ids in sandbox datasets

# + tags=["parameters"]
project_id = ''
sandbox_dataset_id = '' # Sandbox dataset to search in for the problem ids
search_field = '' # field in the sandbox tables expected to contain the ids. Example: observation_id
run_as = ''

# +
from utils import auth
import pandas as pd
from gcloud.bq import BigQueryClient
from common import JINJA_ENV
from analytics.cdr_ops.notebook_utils import execute, IMPERSONATION_SCOPES, render_message

pd.set_option('display.max_rows', None)
# -

impersonation_creds = auth.get_impersonation_credentials(
run_as, target_scopes=IMPERSONATION_SCOPES)

client = BigQueryClient(project_id, credentials=impersonation_creds)

# # Create list of ids to search
# Run the following cell to create a list of ids to search for. Recommend using a LIMIT if the list is quite large.<br>
# OR <br>
# Manually create a list of ids called ids_list

# +
tpl = JINJA_ENV.from_string('''
{INSERT QUERY HERE}
''')
query = tpl.render()
ids = execute(client, query)

ids_list = ids[search_field].to_list()


# -

# # Get the tables that contain the search_field, from the sandbox dataset

# +
tpl = JINJA_ENV.from_string('''
SELECT
*
FROM
`{{project_id}}.{{sandbox_dataset_id}}.INFORMATION_SCHEMA.COLUMNS`
WHERE
column_name = '{{search_field}}'
ORDER BY table_name
''')
query = tpl.render(sandbox_dataset_id=sandbox_dataset_id,
project_id=project_id,
search_field=search_field)
tables_in_dataset = execute(client, query)

tables_list = tables_in_dataset['table_name'].to_list()
tables_list
# -

# # Search in each sandbox table and print results

queries = []
for table in tables_list:
tpl = JINJA_ENV.from_string('''
SELECT
'{{table}}' as table,
COUNT(*) AS n_{{search_field}}s_found
FROM
`{{project_id}}.{{sandbox_dataset_id}}.{{table}}`
WHERE {{search_field}} IN UNNEST ({{ids_list}})
''')
query = tpl.render(sandbox_dataset_id=sandbox_dataset_id,
project_id=project_id,
table=table,
ids_list=ids_list,
search_field=search_field)
queries.append(query)
execute(client, '\nUNION ALL\n'.join(queries))

814 changes: 445 additions & 369 deletions data_steward/analytics/cdr_ops/clean_rdr_export_qc.py

Large diffs are not rendered by default.

87 changes: 67 additions & 20 deletions data_steward/analytics/cdr_ops/combined.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from common import JINJA_ENV, MAPPED_CLINICAL_DATA_TABLES
from cdr_cleaner.cleaning_rules.negative_ages import date_fields
from cdr_cleaner.cleaning_rules.remove_ehr_data_without_consent import EHR_UNCONSENTED_PARTICIPANTS_LOOKUP_TABLE as UNCONSENTED
from utils import auth
from gcloud.bq import BigQueryClient
from analytics.cdr_ops.notebook_utils import execute, IMPERSONATION_SCOPES, render_message
Expand All @@ -50,7 +51,7 @@

# ## Check for duplicates across all unique identifier fields.
# This query gathers any duplicates of the {table}_id from each OMOP table listed.
# The OMOP tables `death` and `fact_relationship` are excluded from the check because they do not have primary key fields.
# The OMOP tables `death` and `fact_relationship` are excluded from the check because they do not have primary key fields. `aou_death` is also excluded since it uses 'uuid'.
# The output of this query should be empty. If any duplicates are found there may be a bug in the pipeline.
#
# Specific to duplicates in observation:<br>
Expand All @@ -60,6 +61,7 @@
# If any duplicates are found there may be a bug in the pipeline-
# particularly in `ehr_union.move_ehr_person_to_observation`.


query = f"""
DECLARE i INT64 DEFAULT 0;
DECLARE tables ARRAY<STRING>;
Expand All @@ -68,7 +70,7 @@
"care_site", "condition_era", "device_cost", "device_exposure", "dose_era",
"drug_exposure", "location", "measurement", "note", "note_nlp", "person",
"procedure_cost", "procedure_occurrence", "provider", "specimen",
"survey_conduct", "visit_cost", "visit_detail", "visit_occurrence", "aou_death"];
"survey_conduct", "visit_cost", "visit_detail", "visit_occurrence"];
CREATE TEMPORARY TABLE non_unique_primary_keys(table_name STRING, key_column int64);
Expand Down Expand Up @@ -120,33 +122,36 @@
SELECT
"{{table_name}}" AS table_name
,"{{date_field}}" AS date_field
,t.{{date_field}} AS date_value
,DATE(t.{{date_field}}) AS date_value
,p.birth_datetime AS birth_datetime
FROM `{{dataset_id}}.{{table_name}}` t
JOIN `{{dataset_id}}.person` p
USING (person_id)
WHERE
(
-- age <= 0y --
t.{{date_field}} < DATE(p.birth_datetime)
DATE(t.{{date_field}}) < DATE(p.birth_datetime)
-- age >= 150y --
OR {{PIPELINE_TABLES}}.calculate_age(t.{{date_field}}, EXTRACT(DATE FROM p.birth_datetime)) >= 150
OR pipeline_tables.calculate_age(DATE(t.{{date_field}}), EXTRACT(DATE FROM p.birth_datetime)) >= 150
)
AND
p.birth_datetime IS NOT NULL
AND
t.{{date_field}} IS NOT NULL
{% if not loop.last -%}
UNION ALL
{% endif %}
{% endfor %}
''')
query = tpl.render(dataset_id=DATASET_ID, date_fields=date_fields)
query = tpl.render(dataset_id=DATASET_ID, date_fields=date_fields, PIPELINE_TABLES="pipeline_tables")
execute(client, query)

# ## PPI records should never follow death date
# Make sure no one could die before the program began or have PPI records after their death.

# +
query = JINJA_ENV.from_string("""
query = f'''
WITH
ppi_concept AS
Expand Down Expand Up @@ -251,7 +256,7 @@
|| 'USING (' || table_name ||'_id) '
|| 'LEFT JOIN consented c '
|| ' USING (person_id)'
|| 'WHERE m.src_hpo_id <> "rdr" AND c.person_id IS NULL)'
|| 'WHERE m.src_hpo_id NOT IN (\\"ce\\", \\"vibrent\\", \\"healthpro\\") AND c.person_id IS NULL)'
, ' UNION ALL ')
FROM `{{DATASET_ID}}.INFORMATION_SCHEMA.COLUMNS` c
JOIN `{{DATASET_ID}}.__TABLES__` t
Expand Down Expand Up @@ -307,7 +312,7 @@
(SELECT
d.table_schema AS table_schema
,d.table_name AS table_name
,pk.column_name AS key_field
,CASE WHEN pk.column_name = 'aou_death_id' THEN '0' ELSE pk.column_name END AS key_field
,d.column_name AS date_field
,ts.column_name AS timestamp_field
FROM `{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS` d
Expand Down Expand Up @@ -456,7 +461,7 @@
cols = 3
rows = math.ceil(total_plots / cols)

fig, axes = plt.subplots(rows, cols, figsize=(5, 5), squeeze=False)
fig, axes = plt.subplots(rows, cols, figsize=(10, 10), squeeze=False)

k = 0
while k < total_plots:
Expand Down Expand Up @@ -767,21 +772,21 @@ def verify_dataset_labels(dataset):
# +
query = JINJA_ENV.from_string("""
WITH qc_aou_death AS (
SELECT
aou_death_id,
SELECT
aou_death_id,
CASE WHEN aou_death_id IN (
SELECT aou_death_id FROM `{{project_id}}.{{dataset_id}}.aou_death`
SELECT aou_death_id FROM `{{project_id}}.{{dataset}}.aou_death`
WHERE death_date IS NOT NULL -- NULL death_date records must not become primary --
QUALIFY RANK() OVER (
PARTITION BY person_id
PARTITION BY person_id
ORDER BY
LOWER(src_id) NOT LIKE '%healthpro%' DESC, -- EHR records are chosen over HealthPro ones --
death_date ASC, -- Earliest death_date records are chosen over later ones --
death_datetime ASC NULLS LAST, -- Earliest non-NULL death_datetime records are chosen over later or NULL ones --
src_id ASC -- EHR site that alphabetically comes first is chosen --
) = 1
) = 1
) THEN TRUE ELSE FALSE END AS primary_death_record
FROM `{{project}}.{{dataset}}.aou_death`
FROM `{{project}}.{{dataset}}.aou_death`
)
SELECT ad.aou_death_id
FROM `{{project_id}}.{{dataset}}.aou_death` ad
Expand All @@ -793,7 +798,7 @@ def verify_dataset_labels(dataset):

success_msg = 'All death records have the correct `primary_death_record` values.'
failure_msg = '''
<b>{code_count}</b> records do not have the correct `primary_death_record` values.
<b>{code_count}</b> records do not have the correct `primary_death_record` values.
Investigate and confirm if (a) any logic is incorrect, (b) the requirement has changed, or (c) something else.
'''
render_message(df,
Expand Down Expand Up @@ -828,9 +833,9 @@ def verify_dataset_labels(dataset):
ext_template = JINJA_ENV.from_string("""
SELECT
table_id
FROM
FROM
`{{project_id}}.{{dataset}}.__TABLES__`
WHERE
WHERE
table_id LIKE '%_ext%'
""")
ext_tables_query = ext_template.render(project_id=PROJECT_ID,
Expand All @@ -847,7 +852,7 @@ def verify_dataset_labels(dataset):
`{{project_id}}.{{dataset}}.{{table_name}}`
WHERE NOT
REGEXP_CONTAINS(src_id, r'(?i)(Portal)|(EHR site)')
OR
OR
src_id IS NULL
GROUP BY 1,2
""")
Expand All @@ -857,3 +862,45 @@ def verify_dataset_labels(dataset):
result.append(query)
results = '\nUNION ALL\n'.join(result)
execute(client, results)

# ## Verify no participant in the pdr_ehr_dup_report list has EHR data.
#
# Curation will receive a table of unconsented PID's stored in the table `...combined_sandbox._ehr_unconsented_pids` from the PDR. This check will verify that every mapped EHR table will not contain these PID's. If any of these PID's are found, a failue is raised, otherwise this check ends with a pass. See [DC-3435](https://precisionmedicineinitiative.atlassian.net/browse/DC-3435)

unconsented_records_tpl = JINJA_ENV.from_string("""
SELECT \'{{domain_table}}\' AS domain_table, person_id
FROM
`{{project}}.{{dataset}}.{{domain_table}}` d
JOIN
`{{project}}.{{dataset}}.{{mapping_domain_table}}` md
USING
({{domain_table}}_id)
WHERE
person_id IN (
SELECT
person_id
FROM
`{{project}}.{{sandbox_dataset}}.{{unconsented_lookup}}`)
AND src_dataset_id LIKE '%ehr%'
""")

# +
success_msg_if_empty = "All PID's with EHR data are found consenting"
failure_msg_if_empty = "EHR data is found for PIDs who have not consented to contribute EHR data."

sub_queries = []
for table in MAPPED_CLINICAL_DATA_TABLES:
query = unconsented_records_tpl.render(
project=PROJECT_ID,
dataset=DATASET_ID,
domain_table=table,
mapping_domain_table=f'_mapping_{table}',
sandbox_dataset=f'{DATASET_ID}_sandbox',
unconsented_lookup=UNCONSENTED)

sub_queries.append(query)

full_query = '\nUNION ALL\n'.join(sub_queries)
result = execute(client, full_query)
render_message(result, success_msg_if_empty, failure_msg_if_empty)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ There are 2 notebooks for the controlled tier QC process.
1. check_controlled_tier.py
This notebook is the main notebook for the controlled tier QC. It covers the quality checks from the following tickets:
[DC-1370](https://precisionmedicineinitiative.atlassian.net/browse/DC-1370), [DC-1377](https://precisionmedicineinitiative.atlassian.net/browse/DC-1377), [DC-1346](https://precisionmedicineinitiative.atlassian.net/browse/DC-1346), [DC-1348](https://precisionmedicineinitiative.atlassian.net/browse/DC-1348), [DC-1355](https://precisionmedicineinitiative.atlassian.net/browse/DC-1355), [DC-1357](https://precisionmedicineinitiative.atlassian.net/browse/DC-1357), [DC-1359](https://precisionmedicineinitiative.atlassian.net/browse/DC-1359), [DC-1362](https://precisionmedicineinitiative.atlassian.net/browse/DC-1362), [DC-1364](https://precisionmedicineinitiative.atlassian.net/browse/DC-1364), [DC-1366](https://precisionmedicineinitiative.atlassian.net/browse/DC-1366),
[DC-1368](https://precisionmedicineinitiative.atlassian.net/browse/DC-1368), [DC-1373](https://precisionmedicineinitiative.atlassian.net/browse/DC-1373), [DC-1382](https://precisionmedicineinitiative.atlassian.net/browse/DC-1382), [DC-1388](https://precisionmedicineinitiative.atlassian.net/browse/DC-1388), [DC-1496](https://precisionmedicineinitiative.atlassian.net/browse/DC-1496), [DC-1527](https://precisionmedicineinitiative.atlassian.net/browse/DC-1527), [DC-1535](https://precisionmedicineinitiative.atlassian.net/browse/DC-1535), [DC-2112](https://precisionmedicineinitiative.atlassian.net/browse/DC-2112)
[DC-1368](https://precisionmedicineinitiative.atlassian.net/browse/DC-1368), [DC-1373](https://precisionmedicineinitiative.atlassian.net/browse/DC-1373), [DC-1388](https://precisionmedicineinitiative.atlassian.net/browse/DC-1388), [DC-1496](https://precisionmedicineinitiative.atlassian.net/browse/DC-1496), [DC-1535](https://precisionmedicineinitiative.atlassian.net/browse/DC-1535), [DC-2112](https://precisionmedicineinitiative.atlassian.net/browse/DC-2112)

2. check_controlled_tier_covid_concept_no_suppression.py
This notebook is for [DC-2119](https://precisionmedicineinitiative.atlassian.net/browse/DC-2119). DC-2119 is not included in `check_controlled_tier.py` because of the following reasons:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
pd.set_option('display.width', None)

# If you want to ignore specific QC rule(s): Remove those rules from to_include
to_include = ['DC-1370', 'DC-1377', 'DC-1346', 'DC-1348', 'DC-1355', 'DC-1357', 'DC-1359',
'DC-1362', 'DC-1364', 'DC-1366', 'DC-1368', 'DC-1373', 'DC-1382', 'DC-1388',
'DC-1496', 'DC-1527', 'DC-1535', 'DC-2112']
to_include = [
'DC-1370', 'DC-1377', 'DC-1346', 'DC-1348', 'DC-1355', 'DC-1357', 'DC-1359',
'DC-1362', 'DC-1364', 'DC-1366', 'DC-1368', 'DC-1373', 'DC-1388', 'DC-1496',
'DC-1535', 'DC-2112'
]
checks = run_qc(project_id,
post_deid_dataset,
pre_deid_dataset,
Expand Down Expand Up @@ -96,10 +98,6 @@

display_check_detail_of_rule(checks, 'DC-1373', to_include)

# # [DC-1382: Record Suppression of some cancer condition](https://precisionmedicineinitiative.atlassian.net/browse/DC-1382)

display_check_detail_of_rule(checks, 'DC-1382', to_include)

# # [DC-1388: Free Text survey response are suppressed](https://precisionmedicineinitiative.atlassian.net/browse/DC-1388)

display_check_detail_of_rule(checks, 'DC-1388', to_include)
Expand All @@ -108,10 +106,6 @@

display_check_detail_of_rule(checks, 'DC-1496', to_include)

# # [DC-1527: Suppression of organ transplant rows](https://precisionmedicineinitiative.atlassian.net/browse/DC-1527)

display_check_detail_of_rule(checks, 'DC-1527', to_include)

# # [DC-1535: Suppression of geolocation records](https://precisionmedicineinitiative.atlassian.net/browse/DC-1535)

display_check_detail_of_rule(checks, 'DC-1535', to_include)
Expand Down
Loading

0 comments on commit 0910aa3

Please sign in to comment.