From 166dc78c89e529e373f066c1bb3955f5cca930bf Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Tue, 16 Apr 2024 20:51:34 +0000 Subject: [PATCH 01/10] Sync latest internal parquet archive instead of top-level parquet data (this allows for provenance tracking) --- sts_synindex_external.R | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index 3d4a149..051a178 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -115,10 +115,19 @@ Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId, 'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey, 'AWS_SESSION_TOKEN'=token$sessionToken) +s3 <- arrow::S3FileSystem$create(access_key = token$accessKeyId, + secret_key = token$secretAccessKey, + session_token = token$sessionToken, + region="us-east-1") + +archive_uri <- paste0(token$bucket, '/', token$baseKey, '/archive/') +archive_list <- s3$GetFileInfo(arrow::FileSelector$create(archive_uri, recursive=F)) +latest_archive <- archive_list[[length(archive_list)]]$path + # Sync bucket to local dir ------------------------------------------------ unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) -sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') +sync_cmd <- glue::glue('aws s3 sync s3://{latest_archive}/ {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) From e35f08339bbc58befc9646aea7f134969d6aa194 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Tue, 16 Apr 2024 21:30:05 +0000 Subject: [PATCH 02/10] Update file provenance to use latest archive s3 path instead of internal parquet bucket uri; store provenance in separate synapser Activity method variable --- sts_synindex_external.R | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index 051a178..c6cdb9c 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -215,6 +215,11 @@ if (nrow(synapse_fileview)>0) { latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") +act <- synapser::Activity(name = "Indexing", + description = "Indexing external parquet datasets", + used = paste0("s3://", latest_archive), + executed = latest_commit_tree_url) + if(nrow(synapse_manifest_to_upload) > 0){ for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] @@ -234,11 +239,7 @@ if(nrow(synapse_manifest_to_upload) > 0){ parentId = tmp$parent, name = new_fileName) - f <- synStore(f, - activityName = "Indexing", - activityDescription = "Indexing external parquet datasets", - used = PARQUET_FOLDER_INTERNAL, - executed = latest_commit_tree_url) + f <- synStore(f, activity = act) } } From e4fff85aa632f46d80fbf71324fd9be65175dca9 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Tue, 16 Apr 2024 21:31:28 +0000 Subject: [PATCH 03/10] Use specific script URL instead of repo url at latest commit for file provenance --- sts_synindex_external.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index c6cdb9c..c60cfba 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -213,12 +213,13 @@ if (nrow(synapse_fileview)>0) { # Index each file in Synapse latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") -latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") +# latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") +latest_commit_this_file <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/sts_synindex_external.R") act <- synapser::Activity(name = "Indexing", description = "Indexing external parquet datasets", used = paste0("s3://", latest_archive), - executed = latest_commit_tree_url) + executed = latest_commit_this_file) if(nrow(synapse_manifest_to_upload) > 0){ for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ From d1e89cdab5c6396faa526faa00dd19627c7eeeb3 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Tue, 16 Apr 2024 21:56:22 +0000 Subject: [PATCH 04/10] Update formatting of msgs printed during filtering step --- filtering.R | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/filtering.R b/filtering.R index 917c037..5c22fbb 100644 --- a/filtering.R +++ b/filtering.R @@ -78,7 +78,10 @@ cols_to_drop <- lapply(datasets_to_filter, function(x) { tmp <- lapply(seq_along(datasets_to_filter), function(i) { - cat(i, "Dropping", cols_to_drop[[i]], "from", datasets_to_filter[[i]], "\n\n") + cat(i) + cat(": Dropping from [", datasets_to_filter[[i]], "]", "\n") + cat(cols_to_drop[[i]], sep = ", ") + cat("\n\n") drop_cols_datasets(dataset = datasets_to_filter[[i]], columns = cols_to_drop[[i]], input = AWS_PARQUET_DOWNLOAD_LOCATION, From 4ab9b29ebd1292151916b448c43541f6608bfd8d Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 18:43:11 +0000 Subject: [PATCH 05/10] Remove unused libraries --- sts_synindex_external.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index c60cfba..373f1b7 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -1,8 +1,6 @@ library(synapser) library(arrow) library(dplyr) -library(synapserutils) -library(rjson) # Functions --------------------------------------------------------------- From ac3d5a1f986e1d59492d82048a969b779d8ade33 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 18:44:04 +0000 Subject: [PATCH 06/10] Use date from archive timestamp instead of getting date of current day --- sts_synindex_external.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index 373f1b7..124dc82 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -152,7 +152,7 @@ source('~/recover-parquet-external/deidentification.R') # Sync final parquets to bucket ------------------------------------------- -date <- lubridate::today() +date <- latest_archive %>% stringr::str_extract("[0-9]{4}_[0-9]{2}_[0-9]{2}") sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) From b5a5353d66322b9e5cb808450b234aceaedd77ad Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 18:44:22 +0000 Subject: [PATCH 07/10] Add param specifying location of Dictionaries folder in Synapse --- config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/config.yml b/config.yml index 21bf26e..284eaa5 100644 --- a/config.yml +++ b/config.yml @@ -25,6 +25,7 @@ prod: STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/ ARCHIVE_TO_CURRENT_DOWNLOAD_LOCATION: ./temp_archive_to_current/ POST_WITHDRAW_LOCATION: ./temp_post_withdraw + DICTIONARIES_FOLDER: syn52316269 staging: inherits: prod From eb72b8227dfa8a37cbaa4dac41693e6cc2177c1b Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 18:44:33 +0000 Subject: [PATCH 08/10] Remove numbers from dictonary file names; update provenance with latest git commit of this script --- deidentification.R | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/deidentification.R b/deidentification.R index efbf8cd..74534e0 100644 --- a/deidentification.R +++ b/deidentification.R @@ -10,6 +10,11 @@ unlink('./dictionaries/', recursive = T, force = T) # Get dictionaries -------------------------------------------------------- system('synapse get -r syn52316269 --downloadLocation ./dictionaries/ --manifest suppress') +list.files("./dictionaries", full.names = T) %>% lapply(function(x) { + y <- x %>% stringr::str_remove_all("[0-9]") + file.rename(from = x, to = y) +}) + junk <- lapply(list.files("./dictionaries/", full.names = T), function(f) { lines <- readLines(f) @@ -122,15 +127,15 @@ for (i in seq_along(deidentified_results$values_to_review)) { # Index each file in Synapse latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") -latest_commit_file_url <- latest_commit$files[[1]]$blob_url +latest_commit_file_url <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/deidentification.R") for (i in seq_along(list.files('./dictionaries/new_to_review/'))) { - synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i], + synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i], parent = DEID_VALS_TO_REVIEW), activityName = "Indexing", activityDescription = "Indexing files containing new PII values to review for deidentification step", - used = c((synGetChildren('syn52316269') %>% as.list())[[i]]$id, - synFindEntityId(names(deidentified_results$deidentified_datasets)[i], + used = c((synGetChildren(DICTIONARIES_FOLDER) %>% as.list())[[i]]$id, + synFindEntityId(names(deidentified_results$deidentified_datasets)[i], parent = PARQUET_FOLDER_INTERNAL)), executed = latest_commit_file_url ) From b06509fbc6c367fc8ce1049dcac48127a043f4ba Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 19:19:22 +0000 Subject: [PATCH 09/10] Remove commented code --- sts_synindex_external.R | 1 - 1 file changed, 1 deletion(-) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index 124dc82..b91b469 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -211,7 +211,6 @@ if (nrow(synapse_fileview)>0) { # Index each file in Synapse latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external") -# latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree") latest_commit_this_file <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/sts_synindex_external.R") act <- synapser::Activity(name = "Indexing", From f461d3c0ca1d73ec97bcb8f9fd58852c6b27e779 Mon Sep 17 00:00:00 2001 From: Pranav Anbarasu Date: Wed, 17 Apr 2024 20:23:56 +0000 Subject: [PATCH 10/10] Move scripts to new scripts folder; update max_rows_per_file param for write_dataset() calls; specify max_open_files param for write_dataset() calls --- .../deidentification/deidentification.R | 3 ++- filtering.R => scripts/filtering/filtering.R | 6 ++++-- archive-to-current.R => scripts/main/archive-to-current.R | 0 staging_to_archive.R => scripts/main/staging_to_archive.R | 0 .../main/sts_synindex_external.R | 7 +++---- .../withdrawal/remove_withdrawn_participants.R | 1 + 6 files changed, 10 insertions(+), 7 deletions(-) rename deidentification.R => scripts/deidentification/deidentification.R (98%) rename filtering.R => scripts/filtering/filtering.R (94%) rename archive-to-current.R => scripts/main/archive-to-current.R (100%) rename staging_to_archive.R => scripts/main/staging_to_archive.R (100%) rename sts_synindex_external.R => scripts/main/sts_synindex_external.R (97%) rename remove_withdrawn_participants.R => scripts/withdrawal/remove_withdrawn_participants.R (99%) diff --git a/deidentification.R b/scripts/deidentification/deidentification.R similarity index 98% rename from deidentification.R rename to scripts/deidentification/deidentification.R index 74534e0..5074050 100644 --- a/deidentification.R +++ b/scripts/deidentification/deidentification.R @@ -108,7 +108,8 @@ for (i in seq_along(deidentified_results$deidentified_datasets)) { arrow::write_dataset(dataset = deidentified_results$deidentified_datasets[[i]], path = file.path(PARQUET_FINAL_LOCATION, names(deidentified_results$deidentified_datasets)[[i]]), - max_rows_per_file = 1000000, + max_open_files = 2048, + max_rows_per_file = 5000000, partitioning = c('cohort'), existing_data_behavior = 'delete_matching', basename_template = paste0("part-0000{i}.", as.character("parquet"))) diff --git a/filtering.R b/scripts/filtering/filtering.R similarity index 94% rename from filtering.R rename to scripts/filtering/filtering.R index 5c22fbb..1b20dba 100644 --- a/filtering.R +++ b/scripts/filtering/filtering.R @@ -22,7 +22,8 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, part dplyr::collect() %>% dplyr::mutate(age = ifelse(age>89, "90+", age)) %>% arrow::write_dataset(path = input_path, - max_rows_per_file = 100000, + max_open_files = 2048, + max_rows_per_file = 1000000, partitioning = partitions, existing_data_behavior = 'delete_matching') } @@ -51,7 +52,8 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA arrow::open_dataset(sources = input_path) %>% dplyr::select(!dplyr::any_of(columns)) %>% arrow::write_dataset(path = final_path, - max_rows_per_file = 1000000, + max_open_files = 2048, + max_rows_per_file = 5000000, partitioning = partitions, existing_data_behavior = 'delete_matching', basename_template = paste0("part-0000{i}.", as.character("parquet"))) diff --git a/archive-to-current.R b/scripts/main/archive-to-current.R similarity index 100% rename from archive-to-current.R rename to scripts/main/archive-to-current.R diff --git a/staging_to_archive.R b/scripts/main/staging_to_archive.R similarity index 100% rename from staging_to_archive.R rename to scripts/main/staging_to_archive.R diff --git a/sts_synindex_external.R b/scripts/main/sts_synindex_external.R similarity index 97% rename from sts_synindex_external.R rename to scripts/main/sts_synindex_external.R index b91b469..073c318 100644 --- a/sts_synindex_external.R +++ b/scripts/main/sts_synindex_external.R @@ -130,11 +130,11 @@ system(sync_cmd) # Remove withdrawn participants ------------------------------------------- -source("~/recover-parquet-external/remove_withdrawn_participants.R") +source("~/recover-parquet-external/scripts/withdrawal/remove_withdrawn_participants.R") # Filter parquet datasets ------------------------------------------------- -source('~/recover-parquet-external/filtering.R') +source('~/recover-parquet-external/scripts/filtering/filtering.R') # Copy unfiltered parquet datasets to new location with filtered parquet datasets unlink(PARQUET_FINAL_LOCATION, recursive = T, force = T) @@ -148,7 +148,7 @@ unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) # De-identify parquet datasets -------------------------------------------- -source('~/recover-parquet-external/deidentification.R') +source('~/recover-parquet-external/scripts/deidentification/deidentification.R') # Sync final parquets to bucket ------------------------------------------- @@ -187,7 +187,6 @@ synapse_manifest <- dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key), s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) - # List all files currently indexed in Synapse synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% diff --git a/remove_withdrawn_participants.R b/scripts/withdrawal/remove_withdrawn_participants.R similarity index 99% rename from remove_withdrawn_participants.R rename to scripts/withdrawal/remove_withdrawn_participants.R index 3bf642b..b400cc7 100644 --- a/remove_withdrawn_participants.R +++ b/scripts/withdrawal/remove_withdrawn_participants.R @@ -89,6 +89,7 @@ if (length(participants_to_withdraw) > 0) { d %>% arrow::write_dataset( path = file.path(POST_WITHDRAW_LOCATION, basename(x)), + max_open_files = 2048, max_rows_per_file = 5000000, partitioning = "cohort", existing_data_behavior = 'delete_matching',