Skip to content

Commit

Permalink
Update indexing code and section names based on recover-parquet-exter…
Browse files Browse the repository at this point in the history
…nal repo indexing code
  • Loading branch information
pranavanba committed Apr 9, 2024
1 parent e2703fa commit 20d2975
Showing 1 changed file with 68 additions and 55 deletions.
123 changes: 68 additions & 55 deletions sts_synindex_internal.R
Original file line number Diff line number Diff line change
@@ -20,12 +20,15 @@ replace_equal_with_underscore <- function(directory_path) {
}
}


# Setup -------------------------------------------------------------------

synapser::synLogin(authToken = Sys.getenv('SYNAPSE_AUTH_TOKEN'))
source('~/recover-parquet-internal/sts_params_internal.R')

#### Get STS token for bucket in order to sync to local dir ####

# Get STS credentials
# Get STS credentials for processed data bucket ---------------------------

token <- synapser::synGetStsStorageToken(
entity = PARQUET_FOLDER,
permission = "read_only",
@@ -37,43 +40,59 @@ if (PARQUET_BUCKET==token$bucket && PARQUET_BUCKET_BASE_KEY==token$baseKey) {
base_s3_uri <- paste0('s3://', PARQUET_BUCKET, '/', PARQUET_BUCKET_BASE_KEY)
}

# configure the environment with AWS token

# Configure the environment with AWS token --------------------------------

Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId,
'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey,
'AWS_SESSION_TOKEN'=token$sessionToken)

#### Sync bucket to local dir ####

# Sync bucket to local dir ------------------------------------------------

unlink(AWS_PARQUET_DOWNLOAD_LOCATION, recursive = T, force = T)
sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {AWS_PARQUET_DOWNLOAD_LOCATION} --exclude "*owner.txt*" --exclude "*archive*"')
system(sync_cmd)


# Modify cohort identifier in dir name ------------------------------------

junk <- invisible(lapply(list.dirs(AWS_PARQUET_DOWNLOAD_LOCATION), replace_equal_with_underscore))

# Generate manifest of existing files

# Generate manifest of existing files -------------------------------------

SYNAPSE_AUTH_TOKEN <- Sys.getenv('SYNAPSE_AUTH_TOKEN')
manifest_cmd <- glue::glue('SYNAPSE_AUTH_TOKEN="{SYNAPSE_AUTH_TOKEN}" synapse manifest --parent-id {SYNAPSE_PARENT_ID} --manifest ./current_manifest.tsv {AWS_PARQUET_DOWNLOAD_LOCATION}')
system(manifest_cmd)


#### Index S3 Objects in Synapse ####
# Index files in Synapse --------------------------------------------------

## Get a list of all files to upload and their synapse locations(parentId)
# Get a list of all files to upload and their synapse locations (parentId)
STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION <- stringr::str_length(AWS_PARQUET_DOWNLOAD_LOCATION)

## All files present locally from manifest
synapse_manifest <- read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>%
dplyr::filter(path != paste0(AWS_PARQUET_DOWNLOAD_LOCATION,'/owner.txt')) %>% # need not create a dataFileHandleId for owner.txt
# List all local files present (from manifest)
synapse_manifest <-
read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>%
dplyr::filter(!grepl('owner.txt', path)) %>%
dplyr::rowwise() %>%
dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>% # location of file from home folder of S3 bucket
dplyr::mutate(s3_file_key = paste0('main/parquet/', file_key)) %>% # the namespace for files in the S3 bucket is S3::bucket/main/
dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_AWS_PARQUET_DOWNLOAD_LOCATION+2)) %>%
dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY, file_key)) %>%
dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>%
dplyr::ungroup()

## All currently indexed files in Synapse
synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv()
synapse_fileview <- synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>% read.csv()

## find those files that are not in the fileview - files that need to be indexed
dplyr::ungroup() %>%
dplyr::mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))

# List all files currently indexed in Synapse
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()
synapse_fileview <-
synapser::synTableQuery(paste0('SELECT * FROM ', SYNAPSE_FILEVIEW_ID))$filepath %>%
read.csv()

# Find the files in the manifest that are not yet indexed in Synapse
if (nrow(synapse_fileview)>0) {
synapse_manifest_to_upload <-
synapse_manifest %>%
@@ -82,45 +101,39 @@ if (nrow(synapse_fileview)>0) {
dplyr::select(parent = parentId,
s3_file_key = dataFileKey,
md5_hash = dataFileMD5Hex))
synapse_manifest_to_upload <-
synapse_manifest_to_upload %>%
mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))
} else {
synapse_manifest_to_upload <-
synapse_manifest %>%
mutate(file_key = gsub("cohort_", "cohort=", file_key),
s3_file_key = gsub("cohort_", "cohort=", s3_file_key))
synapse_manifest_to_upload <- synapse_manifest
}

## Index in Synapse
## For each file index it in Synapse given a parent synapse folder
if(nrow(synapse_manifest_to_upload) > 0){ # there are some files to upload
for(file_number in seq(nrow(synapse_manifest_to_upload))){

# file and related synapse parent id
file_= synapse_manifest_to_upload$path[file_number]
parent_id = synapse_manifest_to_upload$parent[file_number]
s3_file_key = synapse_manifest_to_upload$s3_file_key[file_number]
# this would be the location of the file in the S3 bucket, in the local it is at {AWS_PARQUET_DOWNLOAD_LOCATION}/

absolute_file_path <- tools::file_path_as_absolute(file_) # local absolute path

temp_syn_obj <- synapser::synCreateExternalS3FileHandle(
bucket_name = PARQUET_BUCKET,
s3_file_key = s3_file_key, #
file_path = absolute_file_path,
parent = parent_id
)

# synapse does not accept ':' (colon) in filenames, so replacing it with '_colon_'
# Get script details for SynStore() provenance
latest_commit <- gh::gh("/repos/:owner/:repo/commits/main", owner = "Sage-Bionetworks", repo = "recover-parquet-internal")
latest_commit_tree_url <- latest_commit$html_url %>% stringr::str_replace("commit", "tree")

# Index each file in Synapse
if(nrow(synapse_manifest_to_upload) > 0){
for(file_number in seq_len(nrow(synapse_manifest_to_upload))){
tmp <- synapse_manifest_to_upload[file_number, c("path", "parent", "s3_file_key")]

absolute_file_path <- tools::file_path_as_absolute(tmp$path)

temp_syn_obj <-
synapser::synCreateExternalS3FileHandle(
bucket_name = PARQUET_BUCKET,
s3_file_key = tmp$s3_file_key,
file_path = absolute_file_path,
parent = tmp$parent)

new_fileName <- stringr::str_replace_all(temp_syn_obj$fileName, ':', '_colon_')

f <- File(dataFileHandleId=temp_syn_obj$id,
parentId=parent_id,
name = new_fileName) ## set the new file name

f <- synStore(f)


f <- File(dataFileHandleId = temp_syn_obj$id,
parentId = tmp$parent,
name = new_fileName)

f <- synStore(f,
activityName = "Indexing",
activityDescription = "Indexing internal parquet datasets",
used = PARQUET_FOLDER,
executed = latest_commit_tree_url)

}
}

0 comments on commit 20d2975

Please sign in to comment.