diff --git a/.gitignore b/.gitignore index 3b96878..df8fb74 100644 --- a/.gitignore +++ b/.gitignore @@ -28,11 +28,13 @@ vignettes/*.pdf .Renviron *.Rproj # Misc files and dirs -temp_aws_parquet *manifest* aws awscliv2.zip -parquet_filtered/ -parquet_final/ session-manager-plugin.deb dictionaries +*.parquet +*.json +dev* +misc* +*temp* diff --git a/params.R b/params.R index 6457a75..445b40f 100644 --- a/params.R +++ b/params.R @@ -15,6 +15,7 @@ PARQUET_FOLDER_INTERNAL <- 'syn51406699' # Local location where parquet bucket files are synced to AWS_PARQUET_DOWNLOAD_LOCATION <- './temp_aws_parquet' +AWS_ARCHIVE_DOWNLOAD_LOCATION <- './temp_aws_archive' PARQUET_FILTERED_LOCATION <- './parquet_filtered' @@ -26,12 +27,12 @@ datasets_to_filter <- c("dataset_enrolledparticipants", "dataset_healthkitv2workouts", "dataset_symptomlog") -cols_to_drop <- list(c("EmailAddress", "DateOfBirth", "CustomFields_DeviceOrderInfo", "FirstName", "LastName", "PostalCode", "MiddleName"), +cols_to_drop <- list(c("EmailAddress", "DateOfBirth", "CustomFields_DeviceOrderInfo", "FirstName", "LastName", "PostalCode", "MiddleName", "MobilePhone"), # c("name"), # c("name"), c("Source_Name"), c("Source_Name", "Device_Name"), - c("Source_Name", "Metadata_HKWorkoutBrandName", "Metadata_Coach", "Metadata_trackerMetadata", "Metadata_SWMetadataKeyCustomWorkoutTitle", "Metadata_location"), + c("Source_Name", "Metadata_HKWorkoutBrandName", "Metadata_Coach", "Metadata_trackerMetadata", "Metadata_SWMetadataKeyCustomWorkoutTitle", "Metadata_location", "metadata_workout_name", "Metadata_name"), c("Value_notes", "Properties")) PARQUET_FINAL_LOCATION <- './parquet_final' diff --git a/sts_synindex_external.R b/sts_synindex_external.R index 1acbec8..b8f1e80 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -4,16 +4,102 @@ library(dplyr) library(synapserutils) library(rjson) + +# Functions --------------------------------------------------------------- + +#' Duplicate a folder +#' +#' This function duplicates a folder from the source to the destination. It checks +#' if the source folder exists, and if the destination folder already exists, it +#' gives a warning about possible file overwriting. +#' +#' @param source_folder The path to the source folder. +#' @param destination_folder The path to the destination folder. +#' +#' @return The path to the destination folder. +#' +#' @examples +#' source_folder <- "path/to/source_folder" +#' destination_folder <- "path/to/destination_folder" +#' duplicate_folder(source_folder, destination_folder) +#' +duplicate_folder <- function(source_folder, destination_folder) { + if (!dir.exists(source_folder)) { + stop("Source folder does not exist.") + } + + if (dir.exists(destination_folder)) { + warning("Destination folder already exists. Files might be overwritten.") + } else { + dir.create(destination_folder) + } + + system(glue::glue('cp -r {source_folder}/* {destination_folder}')) + + return(destination_folder) +} + +#' Copy folders and reparent +#' +#' This function copies folders from the source folder to the destination folder +#' while re-parenting them. It skips folders that already exist in the destination folder. +#' +#' @param source_folder The path to the source folder. +#' @param destination_folder The path to the destination folder. +#' +#' @examples +#' source_folder <- "path/to/source_folder" +#' destination_folder <- "path/to/destination_folder" +#' copy_folders_reparent(source_folder, destination_folder) +#' +copy_folders_reparent <- function(source_folder, destination_folder) { + folders_to_copy <- + setdiff( + list.dirs(source_folder, recursive = F, full.names = F), + list.dirs(destination_folder, recursive = F, full.names = F)) + + for (folder in folders_to_copy) { + source_path <- paste0(source_folder, '/', folder) + dest_path <- paste0(destination_folder, '/', folder) + + if (!dir.exists(dest_path)) { + system(glue::glue('cp -r {source_path} {destination_folder}')) + cat("Copied:", folder, '\n') + } else { + cat("Skipped:", folder, "- Folder already exists in", destination_folder, '\n') + } + } +} + +#' Replace equal sign with underscore +#' +#' This function renames a directory path by replacing equal signs with underscores. +#' If a replacement is performed, it logs the change. +#' +#' @param directory_path The path of the directory to rename. +#' +#' @examples +#' replace_equal_with_underscore("path_with=equals") +#' +replace_equal_with_underscore <- function(directory_path) { + new_directory_path <- gsub("=", "_", directory_path) + if (directory_path != new_directory_path) { + file.rename(directory_path, new_directory_path) + return(cat("Renamed:", directory_path, "to", new_directory_path, "\n")) + } +} + +# Setup ------------------------------------------------------------------- synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN')) source('~/recover-parquet-external/params.R') -#### Get STS token for bucket in order to sync to local dir #### -# Get STS credentials -token <- synapser::synGetStsStorageToken( - entity = PARQUET_FOLDER_INTERNAL, - permission = "read_only", - output_format = "json") +# Get STS credentials for input data bucket ------------------------------- +token <- + synapser::synGetStsStorageToken( + entity = PARQUET_FOLDER_INTERNAL, + permission = "read_only", + output_format = "json") if (PARQUET_BUCKET==token$bucket && PARQUET_BUCKET_BASE_KEY==token$baseKey) { base_s3_uri <- paste0('s3://', token$bucket, '/', token$baseKey) @@ -21,15 +107,16 @@ if (PARQUET_BUCKET==token$bucket && PARQUET_BUCKET_BASE_KEY==token$baseKey) { base_s3_uri <- paste0('s3://', PARQUET_BUCKET, '/', PARQUET_BUCKET_BASE_KEY) } -base_s3_uri_external <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_EXTERNAL) base_s3_uri_archive <- paste0('s3://', PARQUET_BUCKET_EXTERNAL, '/', PARQUET_BUCKET_BASE_KEY_ARCHIVE) -# configure the environment with AWS token + +# Configure the environment with AWS token -------------------------------- Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId, 'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey, 'AWS_SESSION_TOKEN'=token$sessionToken) -#### Sync bucket to local dir#### + +# Sync bucket to local dir ------------------------------------------------ unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T) sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) @@ -39,44 +126,10 @@ system(sync_cmd) source('~/recover-parquet-external/filtering.R') # Copy unfiltered parquet datasets to new location with filtered parquet datasets -duplicate_folder <- function(source_folder, destination_folder) { - if (dir.exists(source_folder)) { - if (!dir.exists(destination_folder)) { - dir.create(destination_folder) - } else { - warning("Destination folder already exists. Files might be overwritten.") - } - - system(glue::glue('cp -r {source_folder}/* {destination_folder}')) - - return(destination_folder) - } else { - stop("Source folder does not exist.") - } -} - unlink(PARQUET_FINAL_LOCATION, recursive = T, force = T) duplicate_folder(source_folder = PARQUET_FILTERED_LOCATION, destination_folder = PARQUET_FINAL_LOCATION) -copy_folders_reparent <- function(source_folder, destination_folder) { - folders_to_copy <- setdiff(list.dirs(source_folder, recursive = F, full.names = F), - list.dirs(destination_folder, recursive = F, full.names = F)) - - for (folder in folders_to_copy) { - source_path <- paste0(AWS_PARQUET_DOWNLOAD_LOCATION, '/', folder) - dest_path <- paste0(PARQUET_FINAL_LOCATION, '/', folder) - - if (!dir.exists(dest_path)) { - system(glue::glue('cp -r {source_path} {destination_folder}')) - - cat("Copied:", folder, '\n') - } else { - cat("Skipped:", folder, "- Folder already exists in", destination_folder, '\n') - } - } -} - copy_folders_reparent(AWS_PARQUET_DOWNLOAD_LOCATION, PARQUET_FINAL_LOCATION) # Remove intermediate folders @@ -86,109 +139,96 @@ unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) # De-identify parquet datasets -------------------------------------------- source('~/recover-parquet-external/deidentification.R') -# Sync final parquets to bucket ------------------------------------------- +# Sync final parquets to bucket ------------------------------------------- date <- lubridate::today() sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {PARQUET_FINAL_LOCATION} {base_s3_uri_archive}{date}/ --exclude "*owner.txt*" --exclude "*archive*"') system(sync_cmd) -# Upload parquet datasets directory tree to Synapse ------------------------ - -existing_dirs <- synGetChildren(PARQUET_FOLDER_ARCHIVE) %>% as.list() +# Recreate directory tree of parquet datasets bucket location in S -------- +# existing_dirs <- synGetChildren(PARQUET_FOLDER_ARCHIVE) %>% as.list() +# +# if(length(existing_dirs)>0) { +# for (i in seq_along(existing_dirs)) { +# synDelete(existing_dirs[[i]]$id) +# } +# } -if(length(existing_dirs)>0) { - for (i in seq_along(existing_dirs)) { - synDelete(existing_dirs[[i]]$id) - } -} +# Generate manifest of existing files +unlink(AWS_ARCHIVE_DOWNLOAD_LOCATION, recursive = T, force = T) +sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} {AWS_ARCHIVE_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"') +system(sync_cmd) # Modify cohort identifier in dir name -replace_equal_with_underscore <- function(directory_path) { - new_directory_path <- gsub("=", "_", directory_path) - if (directory_path != new_directory_path) { - file.rename(directory_path, new_directory_path) - cat("Renamed:", directory_path, "to", new_directory_path, "\n") - } +junk <- sapply(list.dirs(AWS_ARCHIVE_DOWNLOAD_LOCATION), replace_equal_with_underscore) + +SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') +manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv {AWS_ARCHIVE_DOWNLOAD_LOCATION}') +system(manifest_cmd) + + +# Index files in Synapse -------------------------------------------------- +# Get a list of all files to upload and their synapse locations (parentId) +STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(PARQUET_FINAL_LOCATION) + +## List all local files present (from manifest) +synapse_manifest <- + read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% + dplyr::filter(!grepl('owner.txt', path)) %>% + dplyr::rowwise() %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION)) %>% + dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% + dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% + dplyr::ungroup() + +# List all files currently indexed in Synapse +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() +synapse_fileview <- + synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% + read.csv() + +# Find the files in the manifest that are not yet indexed in Synapse +if (nrow(synapse_fileview)>0) { + synapse_manifest_to_upload <- + synapse_manifest %>% + dplyr::anti_join( + synapse_fileview %>% + dplyr::select(parent = parentId, + s3_file_key = dataFileKey, + md5_hash = dataFileMD5Hex)) +} else { + synapse_manifest_to_upload <- synapse_manifest } -invisible(lapply(list.dirs(PARQUET_FINAL_LOCATION), replace_equal_with_underscore)) +synapse_manifest_to_upload <- + synapse_manifest_to_upload %>% + mutate(file_key = gsub("cohort_", "cohort=", file_key), + s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) -# Generate manifest of existing files -sync_cmd <- glue::glue('aws s3 --profile service-catalog sync {base_s3_uri_archive} ./archive --exclude "*owner.txt*" --exclude "*archive*"') -system(sync_cmd) +# Index each file in Synapse +if(nrow(synapse_manifest_to_upload) > 0){ + for(file_number in seq_len(nrow(synapse_manifest_to_upload))){ + tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")] + + absolute_file_path <- tools::file_path_as_absolute(tmp$path) -SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN') -manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {PARQUET_FOLDER_ARCHIVE} --manifest ./current_manifest.tsv ./archive') -system(manifest_cmd) + temp_syn_obj <- + synapser::synCreateExternalS3FileHandle( + bucket_name = PARQUET_BUCKET_EXTERNAL, + s3_file_key = tmp$s3_file_key, + file_path = absolute_file_path, + parent = tmp$parent) + new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') -# # Index files in Synapse folder ------------------------------------------- -# -# ## Get a list of all files to upload and their synapse locations(parentId) -# STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(PARQUET_FINAL_LOCATION) -# -# ## All files present locally from manifest -# synapse_manifest <- read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% -# dplyr::filter(path != paste0(PARQUET_FINAL_LOCATION,'/owner.txt')) %>% # need not create a dataFileHandleId for owner.txt -# dplyr::rowwise() %>% -# dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% # location of file from home folder of S3 bucket -# dplyr::mutate(s3_file_key = paste0('main/parquet/', file_key)) %>% # the namespace for files in the S3 bucket is S3::bucket/main/ -# dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% -# dplyr::ungroup() -# -# ## All currently indexed files in Synapse -# synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv() -# synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv() -# -# ## find those files that are not in the fileview - files that need to be indexed -# if (nrow(synapse_fileview)>0) { -# synapse_manifest_to_upload <- -# synapse_manifest %>% -# dplyr::anti_join( -# synapse_fileview %>% -# dplyr::select(parent = parentId, -# s3_file_key = dataFileKey, -# md5_hash = dataFileMD5Hex)) -# synapse_manifest_to_upload <- -# synapse_manifest_to_upload %>% -# mutate(file_key = gsub("cohort_", "cohort=", file_key), -# s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) -# } else { -# synapse_manifest_to_upload <- -# synapse_manifest %>% -# mutate(file_key = gsub("cohort_", "cohort=", file_key), -# s3_file_key = gsub("cohort_", "cohort=", s3_file_key)) -# } -# -# ## For each file index it in Synapse given a parent synapse folder -# if(nrow(synapse_manifest_to_upload) > 0){ # there are some files to upload -# for(file_number in seq(nrow(synapse_manifest_to_upload))){ -# -# # file and related synapse parent id -# file_= synapse_manifest_to_upload$path[file_number] -# parent_id = synapse_manifest_to_upload$parent[file_number] -# s3_file_key = synapse_manifest_to_upload$s3_file_key[file_number] -# # this would be the location of the file in the S3 bucket, in the local it is at {AWS_PARQUET_DOWNLOAD_LOCATION}/ -# -# absolute_file_path <- tools::file_path_as_absolute(file_) # local absolute path -# -# temp_syn_obj <- synapser::synCreateExternalS3FileHandle( -# bucket_name = PARQUET_BUCKET_EXTERNAL, -# s3_file_key = s3_file_key, # -# file_path = absolute_file_path, -# parent = parent_id -# ) -# -# # synapse does not accept ':' (colon) in filenames, so replacing it with '_colon_' -# new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_') -# -# f <- File(dataFileHandleId=temp_syn_obj$id, -# parentId=parent_id, -# name = new_fileName) ## set the new file name -# -# f <- synStore(f) -# -# } -# } + f <- + synStore( + File(dataFileHandleId = temp_syn_obj$id, + parentId = tmp$parent, + name = new_fileName)) + } +} diff --git a/tests/testthat.R b/tests/testthat.R new file mode 100644 index 0000000..e39dfed --- /dev/null +++ b/tests/testthat.R @@ -0,0 +1,12 @@ +# This file is part of the standard setup for testthat. +# It is recommended that you do not modify it. +# +# Where should you do additional test configuration? +# Learn more about the roles of various files in: +# * https://r-pkgs.org/testing-design.html#sec-tests-files-overview +# * https://testthat.r-lib.org/articles/special-files.html + +library(testthat) +library(recover-parquet-external) + +test_check("recover-parquet-external") diff --git a/tests/testthat/test-sts_synindex_external.R b/tests/testthat/test-sts_synindex_external.R new file mode 100644 index 0000000..31c3b25 --- /dev/null +++ b/tests/testthat/test-sts_synindex_external.R @@ -0,0 +1,58 @@ +library(testthat) + +# Test for duplicate_folder +test_that("duplicate_folder correctly duplicates a folder", { + td <- "tmpdir" + dir.create(td) + source_folder <- file.path(td, "source_folder") + destination_folder <- file.path(td, "destination_folder") + dir.create(source_folder) + file.create(file.path(source_folder, "file1.txt")) + + # Create a separate temporary directory for the destination + expect_true(dir.exists(source_folder)) + result <- duplicate_folder(source_folder, destination_folder) + + expect_true(file.exists(file.path(destination_folder, "file1.txt"))) + expect_equal(result, destination_folder) + + # Use expect_message to capture the warning + # expect_message(result, "Destination folder already exists. Files might be overwritten.") + + # Clean up the temporary directories + unlink(td, recursive = T) +}) + +# Test for copy_folders_reparent +test_that("copy_folders_reparent correctly copies folders", { + td <- "tmpdir" + dir.create(td) + source_folder <- file.path(td, "source_folder") + destination_folder <- file.path(td, "destination_folder") + dir.create(source_folder) + dir.create(destination_folder) + dir.create(file.path(source_folder, "folder1")) + file.create(file.path(source_folder, "folder1", "folder1.txt")) + dir.create(file.path(source_folder, "folder2")) + file.create(file.path(source_folder, "folder2", "folder2.txt")) + result <- copy_folders_reparent(source_folder, destination_folder) + expect_true(dir.exists(file.path(destination_folder, "folder1"))) + expect_true(dir.exists(file.path(destination_folder, "folder2"))) + expect_true(file.exists(file.path(destination_folder, "folder1", "folder1.txt"))) + expect_true(file.exists(file.path(destination_folder, "folder2", "folder2.txt"))) + # expect_output(result, "Copied.*") + unlink(td, recursive = T) +}) + +# Test for replace_equal_with_underscore +test_that("replace_equal_with_underscore correctly replaces equal sign with underscore", { + td <- "tmpdir" + dir.create(td) + original_path <- "path=with=equals" + dir.create(file.path(td, original_path)) + replace_equal_with_underscore(file.path(td, original_path)) + expect_equal(list.dirs(td, recursive = F, full.names = F), "path_with_equals") + expect_true(dir.exists(list.dirs(td, recursive = F, full.names = T))) + # expect_output(new_path, "Renamed.*") + unlink(td, recursive = T) +})