diff --git a/.gitignore b/.gitignore index df8fb74..f2c5f43 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ dictionaries dev* misc* *temp* +*pilot* diff --git a/filtering.R b/filtering.R index a8397d0..aa92142 100644 --- a/filtering.R +++ b/filtering.R @@ -50,13 +50,17 @@ drop_cols_datasets <- function(dataset, columns=c(), input = AWS_PARQUET_DOWNLOA arrow::write_dataset(path = final_path, max_rows_per_file = 100000, partitioning = partitions, - existing_data_behavior = 'delete_matching') + existing_data_behavior = 'delete_matching', + basename_template = paste0("part-0000{i}.", as.character("parquet"))) } } # Filtering --------------------------------------------------------------- -dob2age("dataset_enrolledparticipants", "DateOfBirth") +dob2age(dataset = "dataset_enrolledparticipants", + column = "DateOfBirth", + input = AWS_PARQUET_DOWNLOAD_LOCATION, + partitions = "cohort") unlink(PARQUET_FILTERED_LOCATION, recursive = T, force = T) @@ -67,7 +71,11 @@ pii_to_drop <- synGet('syn52523394')$path %>% read.csv() tmp <- lapply(seq_len(nrow(pii_to_drop)), function(i) { cat(i, "Dropping", pii_to_drop$column_to_be_dropped[[i]], "from", pii_to_drop$dataset[[i]], "\n") - drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], columns = pii_to_drop$column_to_be_dropped[[i]]) + drop_cols_datasets(dataset = pii_to_drop$dataset[[i]], + columns = pii_to_drop$column_to_be_dropped[[i]], + input = AWS_PARQUET_DOWNLOAD_LOCATION, + output = PARQUET_FILTERED_LOCATION, + partitions = "cohort") }) rm(pii_to_drop) diff --git a/sts_synindex_external.R b/sts_synindex_external.R index da084ae..8d3a5ff 100644 --- a/sts_synindex_external.R +++ b/sts_synindex_external.R @@ -170,14 +170,14 @@ system(manifest_cmd) # Index files in Synapse -------------------------------------------------- # Get a list of all files to upload and their synapse locations (parentId) -STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(PARQUET_FINAL_LOCATION) +STR_LEN_PARQUET_FINAL_LOCATION <- stringr::str_length(AWS_ARCHIVE_DOWNLOAD_LOCATION) ## List all local files present (from manifest) synapse_manifest <- read.csv('./current_manifest.tsv', sep = '\t', stringsAsFactors = F) %>% dplyr::filter(!grepl('owner.txt', path)) %>% dplyr::rowwise() %>% - dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION)) %>% + dplyr::mutate(file_key = stringr::str_sub(string = path, start = STR_LEN_PARQUET_FINAL_LOCATION+2)) %>% dplyr::mutate(s3_file_key = paste0(PARQUET_BUCKET_BASE_KEY_ARCHIVE, file_key)) %>% dplyr::mutate(md5_hash = as.character(tools::md5sum(path))) %>% dplyr::ungroup() @@ -232,8 +232,8 @@ if(nrow(synapse_manifest_to_upload) > 0){ name = new_fileName) f <- synStore(f, - activity = "Indexing", - activityDescription = "Indexing external parquet datasets", + activityName = "Indexing", + activityDescription = "Indexing external parquet datasets", used = PARQUET_FOLDER_INTERNAL, executed = latest_commit_tree_url)