From 55bd1a6b2a9566b9f683cecb1199598958698878 Mon Sep 17 00:00:00 2001 From: Lauren Biggers Date: Wed, 21 Sep 2022 17:09:22 -0500 Subject: [PATCH] [DC-2692] synthetic script * changes required when running the synthetic script all the way through * the script did finish * more changes are expected --- data_steward/cdr_cleaner/clean_cdr.py | 3 +- data_steward/cdr_cleaner/clean_cdr_engine.py | 11 +- .../convert_pre_post_coordinated_concepts.py | 3 +- .../create_deid_questionnaire_response_map.py | 3 +- .../cleaning_rules/create_person_ext_table.py | 3 +- .../deid/questionnaire_response_id_map.py | 3 +- .../cleaning_rules/drop_orphaned_pids.py | 3 +- .../populate_survey_conduct_ext.py | 3 +- .../survey_version_info.py | 2 +- data_steward/tools/create_synthetic.py | 137 ++++++++++++++++-- data_steward/tools/import_rdr_omop.py | 2 + 11 files changed, 147 insertions(+), 26 deletions(-) diff --git a/data_steward/cdr_cleaner/clean_cdr.py b/data_steward/cdr_cleaner/clean_cdr.py index a93b0be69a..843057f22c 100644 --- a/data_steward/cdr_cleaner/clean_cdr.py +++ b/data_steward/cdr_cleaner/clean_cdr.py @@ -386,7 +386,8 @@ CONTROLLED_TIER_FITBIT_CLEANING_CLASSES, DataStage.SYNTHETIC.value: RDR_CLEANING_CLASSES + COMBINED_CLEANING_CLASSES + - REGISTERED_TIER_DEID_CLEANING_CLASSES, + REGISTERED_TIER_DEID_CLEANING_CLASSES + + REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES, } diff --git a/data_steward/cdr_cleaner/clean_cdr_engine.py b/data_steward/cdr_cleaner/clean_cdr_engine.py index c677dce6f0..395b2732b4 100644 --- a/data_steward/cdr_cleaner/clean_cdr_engine.py +++ b/data_steward/cdr_cleaner/clean_cdr_engine.py @@ -217,15 +217,8 @@ def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer, kwargs = get_custom_kwargs(clazz, **kwargs) # setting default values that won't raise errors - def query_function(): - """ - Imitates base class get_query_specs() - Can be removed once all classes are base classed - :return: list of query dicts generated by rule - """ - return [] - - setup_function = lambda: object + query_function = lambda: [] + setup_function = lambda client: object function_name = '' module_name = '' line_no = '' diff --git a/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py b/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py index 8e001c6316..0083378e6a 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py +++ b/data_steward/cdr_cleaner/cleaning_rules/convert_pre_post_coordinated_concepts.py @@ -153,7 +153,8 @@ def __init__(self, SetConceptIdsForSurveyQuestionsAnswers, UpdateFamilyHistoryCodes ], - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self, *args, **keyword_args) -> query_spec_list: """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/create_deid_questionnaire_response_map.py b/data_steward/cdr_cleaner/cleaning_rules/create_deid_questionnaire_response_map.py index e2af6a7a47..9cf034f7c3 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/create_deid_questionnaire_response_map.py +++ b/data_steward/cdr_cleaner/cleaning_rules/create_deid_questionnaire_response_map.py @@ -53,7 +53,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self, *args, **keyword_args): """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/create_person_ext_table.py b/data_steward/cdr_cleaner/cleaning_rules/create_person_ext_table.py index 9c577a3c61..8d1ef7c0d9 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/create_person_ext_table.py +++ b/data_steward/cdr_cleaner/cleaning_rules/create_person_ext_table.py @@ -104,7 +104,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self): """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/deid/questionnaire_response_id_map.py b/data_steward/cdr_cleaner/cleaning_rules/deid/questionnaire_response_id_map.py index 4a2f1f5c3d..9df92d60b4 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/deid/questionnaire_response_id_map.py +++ b/data_steward/cdr_cleaner/cleaning_rules/deid/questionnaire_response_id_map.py @@ -107,7 +107,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self, *args, **keyword_args): """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_pids.py b/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_pids.py index 342fce0710..0f7427196b 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_pids.py +++ b/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_pids.py @@ -96,7 +96,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self): """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/populate_survey_conduct_ext.py b/data_steward/cdr_cleaner/cleaning_rules/populate_survey_conduct_ext.py index 9dbfb54f28..3d2658a830 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/populate_survey_conduct_ext.py +++ b/data_steward/cdr_cleaner/cleaning_rules/populate_survey_conduct_ext.py @@ -64,7 +64,8 @@ def __init__(self, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, depends_on=[GenerateExtTables, COPESurveyVersionTask], - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self): """ diff --git a/data_steward/cdr_cleaner/manual_cleaning_rules/survey_version_info.py b/data_steward/cdr_cleaner/manual_cleaning_rules/survey_version_info.py index ef351f19ca..1b026b8d8f 100644 --- a/data_steward/cdr_cleaner/manual_cleaning_rules/survey_version_info.py +++ b/data_steward/cdr_cleaner/manual_cleaning_rules/survey_version_info.py @@ -95,7 +95,7 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - affected_tables=[OBSERVATION + '_ext'], + affected_tables=[f'{OBSERVATION}_ext'], depends_on=[GenerateExtTables], table_namer=table_namer, run_for_synthetic=True) diff --git a/data_steward/tools/create_synthetic.py b/data_steward/tools/create_synthetic.py index 5e0639c438..9d9a2a24c1 100644 --- a/data_steward/tools/create_synthetic.py +++ b/data_steward/tools/create_synthetic.py @@ -5,14 +5,16 @@ from datetime import datetime # Third party imports +from google.cloud import bigquery # Project imports from cdr_cleaner import clean_cdr from cdr_cleaner.args_parser import add_kwargs_to_args -from common import CDR_SCOPES +from common import CDR_SCOPES, FITBIT_TABLES from constants.cdr_cleaner import clean_cdr as consts from constants.tools import create_combined_backup_dataset as combine_consts from gcloud.bq import BigQueryClient +from resources import fields_for from tools import import_rdr_omop from tools.create_combined_backup_dataset import generate_combined_mapping_tables from tools.recreate_person import update_person @@ -112,6 +114,109 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str, return datasets +def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds): + # sandbox and drop orphaned records + que = (f""" + CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` AS ( + SELECT * FROM `{project_id}.{staging_ds}.survey_conduct` sc + WHERE NOT EXISTS ( + SELECT 1 FROM `{project_id}.{staging_ds}.observation` o + WHERE sc.survey_conduct_id = o.questionnaire_response_id)); + + DELETE FROM `{project_id}.{staging_ds}.survey_conduct` sc + WHERE EXISTS ( + SELECT 1 FROM `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` sb + WHERE sc.survey_conduct_id = sb.survey_conduct_id + );""") + + resp = client.query(que) + resp.result() + + # fix cope survey responses + que = (f""" + /* save all cope and minute survey responses */ + CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct` AS ( + SELECT * + FROM `{project_id}.{staging_ds}.survey_conduct` + WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)') + ); + + /* update cope and minute survey responses */ + UPDATE `{project_id}.{staging_ds}.survey_conduct` s + SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002 + WHEN m.cope_month = 'june' THEN 2100000003 + WHEN m.cope_month = 'july' THEN 2100000004 + WHEN m.cope_month = 'nov' THEN 2100000005 + WHEN m.cope_month = 'dec' THEN 2100000006 + WHEN m.cope_month = 'feb' THEN 2100000007 + WHEN m.cope_month = 'vaccine1' THEN 905047 + WHEN m.cope_month = 'vaccine2' THEN 905055 + WHEN m.cope_month = 'vaccine3' THEN 765936 + WHEN m.cope_month = 'vaccine4' THEN 1741006 + ELSE s.survey_concept_id + END, + survey_source_concept_id = CASE + WHEN m.cope_month = 'may' THEN 2100000002 + WHEN m.cope_month = 'june' THEN 2100000003 + WHEN m.cope_month = 'july' THEN 2100000004 + WHEN m.cope_month = 'nov' THEN 2100000005 + WHEN m.cope_month = 'dec' THEN 2100000006 + WHEN m.cope_month = 'feb' THEN 2100000007 + WHEN m.cope_month = 'vaccine1' THEN 905047 + WHEN m.cope_month = 'vaccine2' THEN 905055 + WHEN m.cope_month = 'vaccine3' THEN 765936 + WHEN m.cope_month = 'vaccine4' THEN 1741006 + ELSE s.survey_concept_id + END + FROM `{project_id}.{staging_ds}.cope_survey_semantic_version_map` m + WHERE s.survey_conduct_id = m.questionnaire_response_id; + + /* save all records that will be changed */ + CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct_source_value` AS ( + SELECT * + FROM `{project_id}.{staging_ds}.survey_conduct` s + LEFT JOIN `{project_id}.{staging_ds}.concept` c + ON s.survey_concept_id = c.concept_id + AND survey_concept_id = 0 + ); + + /* update the survey_source_value field */ + UPDATE `{project_id}.{staging_ds}.survey_conduct` s + SET s.survey_source_value = c.concept_code + FROM `{project_id}.{staging_ds}.concept` c + WHERE s.survey_concept_id = c.concept_id + AND s.survey_concept_id > 0; + """) + + resp = client.query(que) + resp.result() + + +def _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name): + for table in FITBIT_TABLES: + schema_list = fields_for(table) + ta = bigquery.Table(f'{project_id}.{final_dataset_name}.{table}', + schema_list) + bq_client.create_table(ta) + + +def _remove_mapping_tables(bq_client, project_id, final_dataset_name): + for table in bq_client.list_tables(f'{project_id}.{final_dataset_name}'): + if table.table_id.startswith('_mapping_'): + LOGGER.info(f"Removing table {table.full_table_id}") + bq_client.delete_table(table) + + +def update_domain_table_id(client: BigQueryClient, fq_table: str): + que = ( + f"UPDATE `{fq_table}` " + f"SET {fq_table.split('.')[-1]}_id = {fq_table.split('.')[-1]}_id + 1000000000000000 " + f"WHERE 1=1") + + resp = client.query(que) + resp.result() + + def create_tier(project_id: str, input_dataset: str, release_tag: str, run_as: str, **kwargs) -> dict: """ @@ -162,9 +267,22 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, update_person(bq_client, datasets[consts.STAGING]) # Snapshot the staging dataset to final dataset + LOGGER.info("Snapshotting the final dataset") bq_client.build_and_copy_contents(datasets[consts.STAGING], final_dataset_name) - + LOGGER.info( + f"Snapshot complete. Final dataset is at `{project_id}.{final_dataset_name}`" + ) + + LOGGER.info("Generating empty fitbit tables") + _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name) + LOGGER.info("Empty fitbit table generation complete") + + LOGGER.info( + "Removing mapping tables because they are not part of the dataset released to researchers" + ) + _remove_mapping_tables(bq_client, project_id, final_dataset_name) + LOGGER.info("Done removing mapping tables") return datasets @@ -226,20 +344,21 @@ def main(raw_args=None) -> dict: clean_cdr.validate_custom_params(cleaning_classes, **kwargs) # load synthetic data from the bucket - import_rdr_omop.main([ + dataset_obj = import_rdr_omop.main([ '--rdr_bucket', args.bucket_name, '--run_as', args.target_principal, '--export_date', datetime.now().strftime("%Y-%m-%d"), '--curation_project', args.project_id, '--vocab_dataset', args.vocab_dataset, '--console_log' ]) - # # Creates synthetic dataset and runs a subset of cleaning rules marked for synthetic data - # datasets = create_tier(args.credentials_filepath, args.project_id, - # args.idataset, args.release_tag, - # args.target_principal, **kwargs) + dataset_obj = dataset_obj.dataset_id - # return datasets + # Creates synthetic dataset and runs a subset of cleaning rules marked for synthetic data + datasets = create_tier(args.project_id, dataset_obj, args.release_tag, + args.target_principal, **kwargs) + + return datasets if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/data_steward/tools/import_rdr_omop.py b/data_steward/tools/import_rdr_omop.py index 4edd815ba1..037eae579c 100755 --- a/data_steward/tools/import_rdr_omop.py +++ b/data_steward/tools/import_rdr_omop.py @@ -262,6 +262,8 @@ def main(raw_args=None): copy_vocab_tables(bq_client, new_dataset_name, args.vocabulary) + return dataset_object + if __name__ == '__main__': main()