Skip to content

Commit

Permalink
[DC-2692] more changes
Browse files Browse the repository at this point in the history
* changed f-string usage to jinja2 templates
* used pre-defined variable for constant value
* removed redundant code to reuse existing dataset copy utility
* removed conflict code
  • Loading branch information
lrwb-aou committed Feb 27, 2023
1 parent 3cc0f63 commit cdbebb2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 68 deletions.
40 changes: 2 additions & 38 deletions data_steward/tools/create_rdr_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,8 @@ def main(raw_args=None):
datasets = create_datasets(bq_client, args.rdr_dataset, args.release_tag)

# copy raw data into staging dataset
copy_raw_rdr_tables(bq_client, args.rdr_dataset, datasets.get('staging'))
LOGGER.info("going to sleep")
import time
time.sleep(30)
LOGGER.info("done sleeping")
bq_client.copy_dataset(args.rdr_datasets, datasets.get('staging'))

# clean the RDR staging dataset
cleaning_args = [
'-p', args.curation_project_id, '-d',
Expand Down Expand Up @@ -140,39 +137,6 @@ def main(raw_args=None):
f'`{bq_client.project}.{datasets.get("clean")}`, is complete.')


def copy_raw_rdr_tables(client, rdr_dataset, rdr_staging):
LOGGER.info(
f'Beginning COPY of raw rdr tables from `{rdr_dataset}` to `{rdr_staging}`'
)
# get list of tables
src_tables = client.list_tables(rdr_dataset)

# create a copy job config
job_config = bigquery.job.CopyJobConfig(
write_disposition=bigquery.job.WriteDisposition.WRITE_EMPTY)

for table_item in src_tables:
job_config.labels = {
'table_name': table_item.table_id,
'copy_from': rdr_dataset,
'copy_to': rdr_staging
}

destination_table = f'{client.project}.{rdr_staging}.{table_item.table_id}'
# job_id defined to the second precision
job_id = (f'rdr_staging_copy_{table_item.table_id.lower()}_'
f'{datetime.now().strftime("%Y%m%d_%H%M%S")}')
# copy each table to rdr dataset
client.copy_table(table_item.reference,
destination_table,
job_id=job_id,
job_config=job_config)
LOGGER.info(
f'RDR raw table COPY from `{rdr_dataset}` to `{rdr_staging}` is complete'
)
def create_datasets(client, rdr_dataset, release_tag):
rdr_clean = f'{release_tag}_rdr'
rdr_staging = f'{rdr_clean}_staging'
Expand Down
68 changes: 38 additions & 30 deletions data_steward/tools/create_synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# Project imports
from cdr_cleaner import clean_cdr
from cdr_cleaner.args_parser import add_kwargs_to_args
from common import CDR_SCOPES, FITBIT_TABLES
from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV
from constants.cdr_cleaner import clean_cdr as consts
from constants.tools import create_combined_backup_dataset as combine_consts
from gcloud.bq import BigQueryClient
Expand Down Expand Up @@ -116,33 +116,36 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str,

def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds):
# sandbox and drop orphaned records
que = (f"""
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` AS (
SELECT * FROM `{project_id}.{staging_ds}.survey_conduct` sc
que = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` AS (
SELECT * FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc
WHERE NOT EXISTS (
SELECT 1 FROM `{project_id}.{staging_ds}.observation` o
SELECT 1 FROM `{{project_id}}.{{staging_ds}}.observation` o
WHERE sc.survey_conduct_id = o.questionnaire_response_id));
DELETE FROM `{project_id}.{staging_ds}.survey_conduct` sc
DELETE FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc
WHERE EXISTS (
SELECT 1 FROM `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` sb
SELECT 1 FROM `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` sb
WHERE sc.survey_conduct_id = sb.survey_conduct_id
);""")
que = que.render(project_id=project_id,
staging_ds=staging_ds,
sandbox_ds=sandbox_ds)

resp = client.query(que)
resp.result()

# fix cope survey responses
que = (f"""
/* save all cope and minute survey responses */
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct` AS (
-- save all cope and minute survey responses --
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct` AS (
SELECT *
FROM `{project_id}.{staging_ds}.survey_conduct`
FROM `{{project_id}}.{{staging_ds}}.survey_conduct`
WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)')
);
/* update cope and minute survey responses */
UPDATE `{project_id}.{staging_ds}.survey_conduct` s
-- update cope and minute survey responses --
UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s
SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002
WHEN m.cope_month = 'june' THEN 2100000003
WHEN m.cope_month = 'july' THEN 2100000004
Expand All @@ -168,26 +171,28 @@ def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds):
WHEN m.cope_month = 'vaccine4' THEN 1741006
ELSE s.survey_concept_id
END
FROM `{project_id}.{staging_ds}.cope_survey_semantic_version_map` m
FROM `{{project_id}}.{{staging_ds}}.cope_survey_semantic_version_map` m
WHERE s.survey_conduct_id = m.questionnaire_response_id;
/* save all records that will be changed */
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct_source_value` AS (
-- save all records that will be changed --
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct_source_value` AS (
SELECT *
FROM `{project_id}.{staging_ds}.survey_conduct` s
LEFT JOIN `{project_id}.{staging_ds}.concept` c
FROM `{{project_id}}.{{staging_ds}}.survey_conduct` s
LEFT JOIN `{{project_id}}.{{staging_ds}}.concept` c
ON s.survey_concept_id = c.concept_id
AND survey_concept_id = 0
);
/* update the survey_source_value field */
UPDATE `{project_id}.{staging_ds}.survey_conduct` s
-- update the survey_source_value field --
UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s
SET s.survey_source_value = c.concept_code
FROM `{project_id}.{staging_ds}.concept` c
FROM `{{project_id}}.{{staging_ds}}.concept` c
WHERE s.survey_concept_id = c.concept_id
AND s.survey_concept_id > 0;
""")

que = que.render(project_id=project_id,
staging_ds=staging_ds,
sandbox_ds=sandbox_ds)
resp = client.query(que)
resp.result()

Expand All @@ -208,11 +213,12 @@ def _remove_mapping_tables(bq_client, project_id, final_dataset_name):


def _update_domain_table_id(client: BigQueryClient, fq_table: str):
que = (
f"UPDATE `{fq_table}` "
f"SET {fq_table.split('.')[-1]}_id = {fq_table.split('.')[-1]}_id + 1000000000000000 "
f"WHERE 1=1")
que = JINJA_ENV.from_string(
"UPDATE `{{fq_table}}` "
"SET {{fq_table.split('.')[-1]}}_id = {{fq_table.split('.')[-1]}}_id + {{constant_factor}} "
"WHERE 1=1")

que = que.render(fq_table=fq_table, constant_factor=ID_CONSTANT_FACTOR)
resp = client.query(que)
resp.result()

Expand Down Expand Up @@ -260,6 +266,9 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
bq_client,
f'{project_id}.{datasets[consts.STAGING]}.{domain_table}')

LOGGER.warning(
"Is `_fix_survey_conduct_records` still needed for generating synthetic data? "
"If unnecessary, remove the function and the call line.")
_fix_survey_conduct_records(bq_client, project_id, datasets[consts.STAGING],
datasets[consts.SANDBOX])

Expand All @@ -278,11 +287,14 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
# run synthetic data rules. will run synthetic extension table generation too.
clean_cdr.main(args=synthetic_cleaning_args)

# TODO:
# 2. mimic publishing guidelines so the person table looks correct. publish internally first to
# verify all required datatypes exist. Afterward, can copy to the correct dev environment.
update_person(bq_client, datasets[consts.STAGING])

LOGGER.info("Generating empty fitbit tables")
_create_empty_fitbit_tables(bq_client, project_id, datasets[consts.STAGING])
LOGGER.info("Empty fitbit table generation complete")

# Snapshot the staging dataset to final dataset
LOGGER.info("Snapshotting the final dataset")
bq_client.build_and_copy_contents(datasets[consts.STAGING],
Expand All @@ -291,10 +303,6 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
f"Snapshot complete. Final dataset is at `{project_id}.{final_dataset_name}`"
)

LOGGER.info("Generating empty fitbit tables")
_create_empty_fitbit_tables(bq_client, project_id, final_dataset_name)
LOGGER.info("Empty fitbit table generation complete")

LOGGER.info(
"Removing mapping tables because they are not part of the dataset released to researchers"
)
Expand Down

0 comments on commit cdbebb2

Please sign in to comment.