Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RMHDR-252 Update external parquet pipeline to use the internal parquet archive #26

Merged
Merged
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ prod:
STAGING_TO_ARCHIVE_DOWNLOAD_LOCATION: ./temp_staging_to_archive/
ARCHIVE_TO_CURRENT_DOWNLOAD_LOCATION: ./temp_archive_to_current/
POST_WITHDRAW_LOCATION: ./temp_post_withdraw
DICTIONARIES_FOLDER: syn52316269

staging:
inherits: prod
Expand Down
16 changes: 11 additions & 5 deletions deidentification.R → scripts/deidentification/deidentification.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ unlink('./dictionaries/', recursive = T, force = T)
# Get dictionaries --------------------------------------------------------
system('synapse get -r syn52316269 --downloadLocation ./dictionaries/ --manifest suppress')

list.files("./dictionaries", full.names = T) %>% lapply(function(x) {
y <- x %>% stringr::str_remove_all("[0-9]")
file.rename(from = x, to = y)
})

junk <- lapply(list.files("./dictionaries/", full.names = T), function(f) {
lines <- readLines(f)

Expand Down Expand Up @@ -103,7 +108,8 @@ for (i in seq_along(deidentified_results$deidentified_datasets)) {

arrow::write_dataset(dataset = deidentified_results$deidentified_datasets[[i]],
path = file.path(PARQUET_FINAL_LOCATION, names(deidentified_results$deidentified_datasets)[[i]]),
max_rows_per_file = 1000000,
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = c('cohort'),
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet")))
Expand All @@ -122,15 +128,15 @@ for (i in seq_along(deidentified_results$values_to_review)) {

# Index each file in Synapse
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external")
latest_commit_file_url <- latest_commit$files[[1]]$blob_url
latest_commit_file_url <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/deidentification.R")

for (i in seq_along(list.files('./dictionaries/new_to_review/'))) {
synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i],
synStore(File(path = list.files('./dictionaries/new_to_review', full.names = T)[i],
parent = DEID_VALS_TO_REVIEW),
activityName = "Indexing",
activityDescription = "Indexing files containing new PII values to review for deidentification step",
used = c((synGetChildren('syn52316269') %>% as.list())[[i]]$id,
synFindEntityId(names(deidentified_results$deidentified_datasets)[i],
used = c((synGetChildren(DICTIONARIES_FOLDER) %>% as.list())[[i]]$id,
synFindEntityId(names(deidentified_results$deidentified_datasets)[i],
parent = PARQUET_FOLDER_INTERNAL)),
executed = latest_commit_file_url
)
Expand Down
11 changes: 8 additions & 3 deletions filtering.R → scripts/filtering/filtering.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ dob2age <- function(dataset, column, input = AWS_PARQUET_DOWNLOAD_LOCATION, part
dplyr::collect() %>%
dplyr::mutate(age = ifelse(age>89, "90+", age)) %>%
arrow::write_dataset(path = input_path,
max_rows_per_file = 100000,
max_open_files = 2048,
max_rows_per_file = 1000000,
partitioning = partitions,
existing_data_behavior = 'delete_matching')
}
Expand Down Expand Up @@ -51,7 +52,8 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA
arrow::open_dataset(sources = input_path) %>%
dplyr::select(!dplyr::any_of(columns)) %>%
arrow::write_dataset(path = final_path,
max_rows_per_file = 1000000,
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = partitions,
existing_data_behavior = 'delete_matching',
basename_template = paste0("part-0000{i}.", as.character("parquet")))
Expand All @@ -78,7 +80,10 @@ cols_to_drop <- lapply(datasets_to_filter, function(x) {

tmp <-
lapply(seq_along(datasets_to_filter), function(i) {
cat(i, "Dropping", cols_to_drop[[i]], "from", datasets_to_filter[[i]], "\n\n")
cat(i)
cat(": Dropping from [", datasets_to_filter[[i]], "]", "\n")
cat(cols_to_drop[[i]], sep = ", ")
cat("\n\n")
drop_cols_datasets(dataset = datasets_to_filter[[i]],
columns = cols_to_drop[[i]],
input = AWS_PARQUET_DOWNLOAD_LOCATION,
Expand Down
File renamed without changes.
File renamed without changes.
35 changes: 21 additions & 14 deletions sts_synindex_external.R → scripts/main/sts_synindex_external.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(synapser)
library(arrow)
library(dplyr)
library(synapserutils)
library(rjson)


# Functions ---------------------------------------------------------------
Expand Down Expand Up @@ -115,19 +113,28 @@ Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId,
'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey,
'AWS_SESSION_TOKEN'=token$sessionToken)

s3 <- arrow::S3FileSystem$create(access_key = token$accessKeyId,
secret_key = token$secretAccessKey,
session_token = token$sessionToken,
region="us-east-1")

archive_uri <- paste0(token$bucket, '/', token$baseKey, '/archive/')
archive_list <- s3$GetFileInfo(arrow::FileSelector$create(archive_uri, recursive=F))
latest_archive <- archive_list[[length(archive_list)]]$path


# Sync bucket to local dir ------------------------------------------------
unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
sync_cmd <- glue::glue('aws s3 sync s3://{latest_archive}/ {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)


# Remove withdrawn participants -------------------------------------------
source("~/recover-parquet-external/remove_withdrawn_participants.R")
source("~/recover-parquet-external/scripts/withdrawal/remove_withdrawn_participants.R")


# Filter parquet datasets -------------------------------------------------
source('~/recover-parquet-external/filtering.R')
source('~/recover-parquet-external/scripts/filtering/filtering.R')

# Copy unfiltered parquet datasets to new location with filtered parquet datasets
unlink(PARQUET_FINAL_LOCATION, recursive = T, force = T)
Expand All @@ -141,11 +148,11 @@ unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T)


# De-identify parquet datasets --------------------------------------------
source('~/recover-parquet-external/deidentification.R')
source('~/recover-parquet-external/scripts/deidentification/deidentification.R')


# Sync final parquets to bucket -------------------------------------------
date <- lubridate::today()
date <- latest_archive %>% stringr::str_extract("[0-9]{4}_[0-9]{2}_[0-9]{2}")
sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)

Expand Down Expand Up @@ -180,7 +187,6 @@ synapse_manifest <-
dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))


# List all files currently indexed in Synapse
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
Expand All @@ -204,7 +210,12 @@ if (nrow(synapse_fileview)>0) {

# Index each file in Synapse
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-external")
latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree")
latest_commit_this_file <- paste0(latest_commit$html_url %>% stringr::str_replace("commit", "blob"), "/sts_synindex_external.R")

act <- synapser::Activity(name = "Indexing",
description = "Indexing external parquet datasets",
used = paste0("s3://", latest_archive),
executed = latest_commit_this_file)

if(nrow(synapse_manifest_to_upload) > 0){
for(file_number in seq_len(nrow(synapse_manifest_to_upload))){
Expand All @@ -225,11 +236,7 @@ if(nrow(synapse_manifest_to_upload) > 0){
parentId = tmp$parent,
name = new_fileName)

f <- synStore(f,
activityName = "Indexing",
activityDescription = "Indexing external parquet datasets",
used = PARQUET_FOLDER_INTERNAL,
executed = latest_commit_tree_url)
f <- synStore(f, activity = act)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ if (length(participants_to_withdraw) > 0) {
d %>%
arrow::write_dataset(
path = file.path(POST_WITHDRAW_LOCATION, basename(x)),
max_open_files = 2048,
max_rows_per_file = 5000000,
partitioning = "cohort",
existing_data_behavior = 'delete_matching',
Expand Down
Loading