Skip to content

Commit

Permalink
Merge pull request #30 from pranavanba/main
Browse files Browse the repository at this point in the history
Change data fetch method
  • Loading branch information
pranavanba authored Jul 12, 2024
2 parents 2be17d0 + 03f46af commit 21f3e7a
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 85 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,10 @@ Parameter | Definition | Example
---|---|---
| `ontologyFileID` | A Synapse ID for the the i2b2 concepts map ontology file stored in Synapse. | syn12345678
| `parquetDirID` | A Synapse ID for a folder entity in Synapse where the input data is stored. This should be the folder housing the post-ETL parquet data. | syn12345678
| `deleteExistingDir` | Boolean representing if you want the `downloadLocation` folder to be removed before syncing input data there. Setting this to `TRUE` ensures that there is a new, clean location to sync the input data to and keep it isolated from input data that is old, unwanted, modified, etc. | `TRUE`
| `concept_replacements` | A named vector of strings and their replacements. The names must be valid values of the `concept_filter_col` column of the `concept_map` data frame. For RECOVER, `concept_map` is the ontology file data frame. | R Example<br>c('mins' = 'minutes', 'avghr' = 'averageheartrate', 'spo2' = 'spo2\_', 'hrv' = 'hrv_dailyrmssd', 'restinghr' = 'restingheartrate', 'sleepbrth' = 'sleepsummarybreath') | concept_cd
| `synFolderID` | A Synapse ID for a folder entity in Synapse where you want to store the final output files. | syn12345678
| `s3bucket` | The name of the S3 bucket containing input data | recover-bucket
| `s3basekey` | The base key of the S3 bucket containing input data. | main/archive/2024-.../
| `downloadLocation` | The location to sync input files to. | ./parquet
| `selectedVarsFileID` | A Synapse ID for the CSV file listing which datasets and variables have been selected for use in this pipeline | syn12345678
| `outputConceptsDir` | The location to save intermediate and final i2b2 summary files to | ./output-concepts

4 changes: 0 additions & 4 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ default:
staging:
ontologyFileID: syn52050046
parquetDirID: syn61250818
# dataset_name_filter: !expr c("fitbit","healthkit")
deleteExistingDir: FALSE
concept_replacements: !expr c("mins" = "minutes",
"avghr" = "averageheartrate",
"spo2" = "spo2_",
Expand All @@ -16,10 +14,8 @@ staging:
"sleependtime" = "enddate")
concept_filter_col: CONCEPT_CD
synFolderID: syn52504335
# method: sts
s3bucket: recover-main-project
s3basekey: main/archive/2024-06-13/
downloadLocation: ./temp-parquet
selectedVarsFileID: syn53503994
outputConceptsDir: ./temp-output-concepts

Expand Down
8 changes: 5 additions & 3 deletions pipeline/run-pipeline.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Get config variables
list2env(x = config::get(file = "config/config.yml",
config = "staging"),
envir = .GlobalEnv)
list2env(
x = config::get(file = "config/config.yml",
config = "staging"),
envir = .GlobalEnv
)

# Fetch data
tictoc::tic(msg = "INFO: Fetch data")
Expand Down
7 changes: 4 additions & 3 deletions scripts/egress/egress.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cat("Beginning egress: storing output concepts, input concept map, and input variable list in Synapse\n")
cat("\n----Beginning egress: storing output concepts,
input concept map, and input variable list in Synapse----\n")

synapser::synLogin()
login <- synapser::synLogin()

# Write the following to Synapse: 1) the final output concepts data, 2) the input data used in this pipeline
latest_commit <-
Expand Down Expand Up @@ -37,4 +38,4 @@ rm(latest_commit,
file_name
)

cat("Finished egress\n\n")
cat("\n----Finished egress----\n")
51 changes: 29 additions & 22 deletions scripts/fetch-data/fetch_data.R
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
library(synapser)
library(recoverutils)
library(dplyr)
library(tidyverse)

cat("Fetching data\n")
cat("\n----Fetching data and connecting to S3 bucket----\n")

synLogin()
login <- synapser::synLogin()

# Get input files from synapse
concept_map <-
Expand All @@ -28,25 +26,33 @@ token <- synapser::synGetStsStorageToken(
permission = "read_only",
output_format = "json")

if (s3bucket==token$bucket && s3basekey==token$baseKey) {
base_s3_uri <- paste0('s3://', token$bucket, '/', token$baseKey)
} else {
base_s3_uri <- paste0('s3://', s3bucket, '/', s3basekey)
}
bucket_path <-
paste0(
token$bucket, '/',
token$baseKey, '/',
stringr::str_extract(s3basekey, stringr::regex("[\\d]{4}-[\\d]{2}-[\\d]{2}")), '/'
)

Sys.setenv('AWS_ACCESS_KEY_ID'=token$accessKeyId,
'AWS_SECRET_ACCESS_KEY'=token$secretAccessKey,
'AWS_SESSION_TOKEN'=token$sessionToken)
s3 <-
arrow::S3FileSystem$create(
access_key = token$accessKeyId,
secret_key = token$secretAccessKey,
session_token = token$sessionToken,
region="us-east-1"
)

if (deleteExistingDir==TRUE) {
unlink(downloadLocation, recursive = T, force = T)
}
dataset_list <-
s3$GetFileInfo(
arrow::FileSelector$create(
base_dir = bucket_path,
recursive = FALSE
)
)

# Only sync the bucket folders containing the datasets we need
inclusions <- paste0("--include \"*",dataset_name_filter,"*\"", collapse = " ")
sync_cmd <- glue::glue('aws s3 sync {base_s3_uri} {downloadLocation} --exclude "*" {inclusions}')
system(sync_cmd)
rm(sync_cmd)
dataset_paths <- character()
for (dataset in dataset_list) {
dataset_paths <- c(dataset_paths, dataset$path)
}

# For use in process-data steps
concept_replacements_reversed <- recoverutils::vec_reverse(concept_replacements)
Expand All @@ -55,4 +61,5 @@ if (!dir.exists(outputConceptsDir)) {
dir.create(outputConceptsDir)
}

cat("Finished fetching data\n\n")
cat("\n----Finished----\n")

8 changes: 3 additions & 5 deletions scripts/process-data/fitbitactivitylogs.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(dplyr)

dataset <- "fitbitactivitylogs"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -12,7 +10,7 @@ vars <-

# Load the desired subset of this dataset in memory
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(vars)) %>%
collect()

Expand Down Expand Up @@ -95,7 +93,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"),"\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(dataset,
Expand Down
8 changes: 3 additions & 5 deletions scripts/process-data/fitbitdailydata.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(dplyr)

dataset <- "fitbitdailydata"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -12,7 +10,7 @@ vars <-

# Load the desired subset of this dataset in memory
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
mutate(Steps = as.numeric(Steps),
HeartRateIntradayMinuteCount = as.numeric(HeartRateIntradayMinuteCount)) %>%
select(all_of(c(vars, "HeartRateIntradayMinuteCount"))) %>%
Expand Down Expand Up @@ -114,7 +112,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"), "\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(dataset,
Expand Down
12 changes: 5 additions & 7 deletions scripts/process-data/fitbitecg.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ ecg_stat_summarize <- function(df) {
return(result)
}

library(dplyr)
dataset <- "fitbitecg$"

dataset <- "fitbitecg"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -114,11 +112,11 @@ vars <-

# Load the desired subset of this dataset in memory and do some feature engineering for derived variables
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(c(vars))) %>%
filter(ResultClassification %in% c("Normal Sinus Rhythm", "Atrial Fibrillation")) %>%
rename(StartDate = StartTime) %>%
mutate(EndDate = base_s3_uri %>% stringr::str_extract("[0-9]{4}-[0-9]{2}-[0-9]{2}")) %>%
mutate(EndDate = bucket_path %>% stringr::str_extract("[0-9]{4}-[0-9]{2}-[0-9]{2}")) %>%
collect()

colnames(df) <- tolower(colnames(df))
Expand Down Expand Up @@ -195,7 +193,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"), "\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(ecg_stat_summarize,
Expand Down
8 changes: 3 additions & 5 deletions scripts/process-data/fitbitintradaycombined.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(dplyr)

dataset <- "fitbitintradaycombined"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -12,7 +10,7 @@ vars <-

# Load the desired subset of this dataset in memory
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(vars)) %>%
mutate(
DeepSleepSummaryBreathRate = as.numeric(DeepSleepSummaryBreathRate),
Expand Down Expand Up @@ -128,7 +126,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"), "\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(dataset,
Expand Down
12 changes: 5 additions & 7 deletions scripts/process-data/fitbitsleeplogs.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ sleeplogs_stat_summarize <- function(df) {
return(result)
}

library(dplyr)
dataset <- "fitbitsleeplogs$"

dataset <- "fitbitsleeplogs"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -140,7 +138,7 @@ vars <-

# Load the desired subset of this dataset in memory and do some feature engineering for derived variables
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(c(vars, "LogId"))) %>%
collect() %>%
distinct() %>%
Expand Down Expand Up @@ -212,7 +210,7 @@ sleeplogsdetails_vars <-
pull(Variable)

sleeplogsdetails_df <-
arrow::open_dataset(file.path(downloadLocation, "dataset_fitbitsleeplogs_sleeplogdetails")) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, "sleeplogdetails"))) %>%
select(all_of(sleeplogsdetails_vars)) %>%
collect() %>%
distinct() %>%
Expand Down Expand Up @@ -434,7 +432,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"),"\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(sleeplogs_stat_summarize,
Expand Down
10 changes: 4 additions & 6 deletions scripts/process-data/healthkitv2electrocardiogram.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ ecg_stat_summarize <- function(df) {
return(result)
}

library(dplyr)
dataset <- "healthkitv2electrocardiogram$"

dataset <- "healthkitv2electrocardiogram"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -119,7 +117,7 @@ participants_to_exclude <-

# Load the desired subset of this dataset in memory and do some feature engineering for derived variables
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(c(vars))) %>%
dplyr::filter(!(ParticipantIdentifier %in% participants_to_exclude)) %>%
filter(Classification %in% c("SinusRhythm", "AtrialFibrillation")) %>%
Expand Down Expand Up @@ -198,7 +196,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"),"\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(ecg_stat_summarize,
Expand Down
8 changes: 3 additions & 5 deletions scripts/process-data/healthkitv2samples.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(dplyr)

dataset <- "healthkitv2samples"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -17,7 +15,7 @@ participants_to_exclude <-

# Load the desired subset of this dataset in memory
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(vars)) %>%
dplyr::filter(!(ParticipantIdentifier %in% participants_to_exclude)) %>%
dplyr::filter(
Expand Down Expand Up @@ -118,7 +116,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"),"\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(dataset,
Expand Down
8 changes: 3 additions & 5 deletions scripts/process-data/healthkitv2statistics.R
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
library(dplyr)

dataset <- "healthkitv2statistics"

cat(glue::glue("Transforming data for {dataset}"),"\n")
cat(paste0("\n----", glue::glue("Transforming data for {dataset}"), "----\n"))

# Get variables for this dataset
vars <-
Expand All @@ -17,7 +15,7 @@ participants_to_exclude <-

# Load the desired subset of this dataset in memory
df <-
arrow::open_dataset(file.path(downloadLocation, glue::glue("dataset_{dataset}"))) %>%
arrow::open_dataset(s3$path(str_subset(dataset_paths, dataset))) %>%
select(all_of(vars)) %>%
dplyr::filter(Type=="DailySteps") %>%
dplyr::filter(!(ParticipantIdentifier %in% participants_to_exclude)) %>%
Expand Down Expand Up @@ -98,7 +96,7 @@ output_concepts %>%
write.csv(file.path(outputConceptsDir, glue::glue("{dataset}.csv")), row.names = F)
cat(glue::glue("output_concepts written to {file.path(outputConceptsDir, paste0(dataset, '.csv'))}"),"\n")

cat(glue::glue("Finished transforming data for {dataset}"),"\n\n")
cat(paste0("\n----", glue::glue("Finished transforming data for {dataset}"),"\n"))

# Remove objects created here from the global environment
rm(dataset,
Expand Down
Loading

0 comments on commit 21f3e7a

Please sign in to comment.