Skip to content

Commit

Permalink
Merge pull request #26 from pranavanba/use-internal-archive-for-data-…
Browse files Browse the repository at this point in the history
…sync

RMHDR-252 Update external parquet pipeline to use the internal parquet archive
  • Loading branch information
pranavanba authored Apr 17, 2024
2 parents 1574e66 + f461d3c commit 4f0e58d
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 22 deletions.
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ prod:
STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/
ARCHIVE_TO_CURRENT_DOWNLOAD_LOCATION: ./temp_archive_to_current/
POST_WITHDRAW_LOCATION: ./temp_post_withdraw
DICTIONARIES_FOLDER: syn52316269

staging:
inherits: prod
Expand Down
16 changes: 11 additions & 5 deletions deidentification.R → scripts/deidentification/deidentification.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ unlink('./dictionaries/', recursive = T, force = T)
# Get dictionaries --------------------------------------------------------
system('synapse get -r syn52316269 --downloadLocation ./dictionaries/ --manifest suppress')

list.files("./dictionaries", full.names = T) %>% lapply(function(x) {
y <- x %>% stringr::str_remove_all("[0-9]")
file.rename(from = x, to = y)
})

junk <- lapply(list.files("./dictionaries/", full.names = T), function(f) {
lines <- readLines(f)

Expand Down Expand Up @@ -103,7 +108,8 @@ for (i in seq_along(deidentified_results$deidentified_datasets)) {

arrow::write_dataset(dataset = deidentified_results$deidentified_datasets[[i]],
path = file.path(PARQUET_FINAL_LOCATION, names(deidentified_results$deidentified_datasets)[[i]]),
max_rows_per_file = 1000000,
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = c('cohort'),
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet")))
Expand All @@ -122,15 +128,15 @@ for (i in seq_along(deidentified_results$values_to_review)) {

# Index each file in Synapse
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external")
latest_commit_file_url <- latest_commit$files[[1]]$blob_url
latest_commit_file_url <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/deidentification.R")

for (i in seq_along(list.files('./dictionaries/new_to_review/'))) {
synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i],
synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i],
parent = DEID_VALS_TO_REVIEW),
activityName = "Indexing",
activityDescription = "Indexing files containing new PII values to review for deidentification step",
used = c((synGetChildren('syn52316269') %>% as.list())[[i]]$id,
synFindEntityId(names(deidentified_results$deidentified_datasets)[i],
used = c((synGetChildren(DICTIONARIES_FOLDER) %>% as.list())[[i]]$id,
synFindEntityId(names(deidentified_results$deidentified_datasets)[i],
parent = PARQUET_FOLDER_INTERNAL)),
executed = latest_commit_file_url
)
Expand Down
11 changes: 8 additions & 3 deletions filtering.R → scripts/filtering/filtering.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, part
dplyr::collect() %>%
dplyr::mutate(age = ifelse(age>89, "90+", age)) %>%
arrow::write_dataset(path = input_path,
max_rows_per_file = 100000,
max_open_files = 2048,
max_rows_per_file = 1000000,
partitioning = partitions,
existing_data_behavior = 'delete_matching')
}
Expand Down Expand Up @@ -51,7 +52,8 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA
arrow::open_dataset(sources = input_path) %>%
dplyr::select(!dplyr::any_of(columns)) %>%
arrow::write_dataset(path = final_path,
max_rows_per_file = 1000000,
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = partitions,
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet")))
Expand All @@ -78,7 +80,10 @@ cols_to_drop <- lapply(datasets_to_filter, function(x) {

tmp <-
lapply(seq_along(datasets_to_filter), function(i) {
cat(i, "Dropping", cols_to_drop[[i]], "from", datasets_to_filter[[i]], "\n\n")
cat(i)
cat(": Dropping from [", datasets_to_filter[[i]], "]", "\n")
cat(cols_to_drop[[i]], sep = ", ")
cat("\n\n")
drop_cols_datasets(dataset = datasets_to_filter[[i]],
columns = cols_to_drop[[i]],
input = AWS_PARQUET_DOWNLOAD_LOCATION,
Expand Down
File renamed without changes.
File renamed without changes.
35 changes: 21 additions & 14 deletions sts_synindex_external.R → scripts/main/sts_synindex_external.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(synapser)
library(arrow)
library(dplyr)
library(synapserutils)
library(rjson)


# Functions ---------------------------------------------------------------
Expand Down Expand Up @@ -115,19 +113,28 @@ Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId,
'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey,
'AWS_SESSION_TOKEN'=token$sessionToken)

s3 <- arrow::S3FileSystem$create(access_key = token$accessKeyId,
secret_key = token$secretAccessKey,
session_token = token$sessionToken,
region="us-east-1")

archive_uri <- paste0(token$bucket, '/', token$baseKey, '/archive/')
archive_list <- s3$GetFileInfo(arrow::FileSelector$create(archive_uri, recursive=F))
latest_archive <- archive_list[[length(archive_list)]]$path


# Sync bucket to local dir ------------------------------------------------
unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
sync_cmd <- glue::glue('aws s3 sync s3://{latest_archive}/ {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)


# Remove withdrawn participants -------------------------------------------
source("~/recover-parquet-external/remove_withdrawn_participants.R")
source("~/recover-parquet-external/scripts/withdrawal/remove_withdrawn_participants.R")


# Filter parquet datasets -------------------------------------------------
source('~/recover-parquet-external/filtering.R')
source('~/recover-parquet-external/scripts/filtering/filtering.R')

# Copy unfiltered parquet datasets to new location with filtered parquet datasets
unlink(PARQUET_FINAL_LOCATION, recursive = T, force = T)
Expand All @@ -141,11 +148,11 @@ unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T)


# De-identify parquet datasets --------------------------------------------
source('~/recover-parquet-external/deidentification.R')
source('~/recover-parquet-external/scripts/deidentification/deidentification.R')


# Sync final parquets to bucket -------------------------------------------
date <- lubridate::today()
date <- latest_archive %>% stringr::str_extract("[0-9]{4}_[0-9]{2}_[0-9]{2}")
sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)

Expand Down Expand Up @@ -180,7 +187,6 @@ synapse_manifest <-
dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))


# List all files currently indexed in Synapse
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
Expand All @@ -204,7 +210,12 @@ if (nrow(synapse_fileview)>0) {

# Index each file in Synapse
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external")
latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree")
latest_commit_this_file <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/sts_synindex_external.R")

act <- synapser::Activity(name = "Indexing",
description = "Indexing external parquet datasets",
used = paste0("s3://", latest_archive),
executed = latest_commit_this_file)

if(nrow(synapse_manifest_to_upload) > 0){
for(file_number in seq_len(nrow(synapse_manifest_to_upload))){
Expand All @@ -225,11 +236,7 @@ if(nrow(synapse_manifest_to_upload) > 0){
parentId = tmp$parent,
name = new_fileName)

f <- synStore(f,
activityName = "Indexing",
activityDescription = "Indexing external parquet datasets",
used = PARQUET_FOLDER_INTERNAL,
executed = latest_commit_tree_url)
f <- synStore(f, activity = act)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ if (length(participants_to_withdraw) > 0) {
d %>%
arrow::write_dataset(
path = file.path(POST_WITHDRAW_LOCATION, basename(x)),
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = "cohort",
existing_data_behavior = 'delete_matching',
Expand Down

0 comments on commit 4f0e58d

Please sign in to comment.