Skip to content

Commit

Permalink
Merge pull request #88 from ohdsi-studies/perf_opt
Browse files Browse the repository at this point in the history
Various performance optimizations and fixes
  • Loading branch information
keesvanbochove authored Oct 1, 2021
2 parents 8e40ba3 + 56c2a8b commit 574bf2d
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 48 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: PioneerWatchfulWaiting
Type: Package
Title: PIONEER / EHDEN / OHDSI prostate cancer study
Version: 0.4.3.2
Version: 0.4.4
Author: Anthony G. Sena, Artem Gorbachev, Kees van Bochove
Authors@R: c(
person("Anthony G.", "Sena", email = "[email protected]", role = c("aut")),
Expand Down
4 changes: 2 additions & 2 deletions R/CohortConstruction.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ getInclusionStatisticsFromFiles <- function(cohortId,

fetchStats <- function(file) {
ParallelLogger::logDebug("- Fetching data from ", file)
stats <- readr::read_csv(file, col_types = readr::cols())
stats <- data.table::fread(file)
stats <- stats[stats$cohortDefinitionId == cohortId, ]
return(stats)
}
Expand Down Expand Up @@ -459,7 +459,7 @@ saveAndDropTempInclusionStatsTables <- function(connection,
if (incremental) {
saveIncremental(data, fullFileName, cohortDefinitionId = cohortIds)
} else {
readr::write_csv(data, fullFileName)
data.table::fread(data, fullFileName)
}
}
fetchStats("#cohort_inclusion", "cohortInclusion.csv")
Expand Down
1 change: 1 addition & 0 deletions R/GenerateSurvival.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ generateSurvival <- function(connection, cohortDatabaseSchema, cohortTable, targ
id = dplyr::row_number()) %>%
dplyr::select(id, timeToEvent, event)

# TODO: Change to Cyclops
surv_info <- survival::survfit(survival::Surv(timeToEvent, event) ~ 1, data = km_proc)

surv_info <- survminer::surv_summary(surv_info)
Expand Down
12 changes: 6 additions & 6 deletions R/Incremental.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ computeChecksum <- function(column) {

isTaskRequired <- function(..., checksum, recordKeepingFile, verbose = TRUE) {
if (file.exists(recordKeepingFile)) {
recordKeeping <- readr::read_csv(recordKeepingFile, col_types = readr::cols())
recordKeeping <- data.table::fread(recordKeepingFile)
task <- recordKeeping[getKeyIndex(list(...), recordKeeping), ]
if (nrow(task) == 0) {
return(TRUE)
Expand All @@ -46,7 +46,7 @@ isTaskRequired <- function(..., checksum, recordKeepingFile, verbose = TRUE) {
getRequiredTasks <- function(..., checksum, recordKeepingFile) {
tasks <- list(...)
if (file.exists(recordKeepingFile) && length(tasks[[1]]) > 0) {
recordKeeping <- readr::read_csv(recordKeepingFile, col_types = readr::cols())
recordKeeping <- data.table::fread(recordKeepingFile)
tasks$checksum <- checksum
tasks <- tibble::as_tibble(tasks)
if (all(names(tasks) %in% names(recordKeeping))) {
Expand Down Expand Up @@ -83,7 +83,7 @@ recordTasksDone <- function(..., checksum, recordKeepingFile, incremental = TRUE
return()
}
if (file.exists(recordKeepingFile)) {
recordKeeping <- readr::read_csv(recordKeepingFile, col_types = readr::cols())
recordKeeping <- data.table::fread(recordKeepingFile)
idx <- getKeyIndex(list(...), recordKeeping)
if (length(idx) > 0) {
recordKeeping <- recordKeeping[-idx, ]
Expand All @@ -95,20 +95,20 @@ recordTasksDone <- function(..., checksum, recordKeepingFile, incremental = TRUE
newRow$checksum <- checksum
newRow$timeStamp <- Sys.time()
recordKeeping <- dplyr::bind_rows(recordKeeping, newRow)
readr::write_csv(recordKeeping, recordKeepingFile)
data.table::fwrite(recordKeeping, recordKeepingFile)
}

saveIncremental <- function(data, fileName, ...) {
if (length(list(...)[[1]]) == 0) {
return()
}
if (file.exists(fileName)) {
previousData <- readr::read_csv(fileName, col_types = readr::cols())
previousData <- data.table::fread(fileName)
idx <- getKeyIndex(list(...), previousData)
if (length(idx) > 0) {
previousData <- previousData[-idx, ]
}
data <- dplyr::bind_rows(previousData, data)
}
readr::write_csv(data, fileName)
data.table::fwrite(data, fileName)
}
4 changes: 2 additions & 2 deletions R/ResourceFiles.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ getCohortsToCreate <- function(cohortGroups = getCohortGroups()) {
packageName <- getThisPackageName()
cohorts <- data.frame()
for(i in 1:nrow(cohortGroups)) {
c <- readr::read_csv(system.file(cohortGroups$fileName[i], package = packageName, mustWork = TRUE), col_types = readr::cols())
c <- data.table::fread(system.file(cohortGroups$fileName[i], package = packageName, mustWork = TRUE))
c <- c[c('name', 'atlasName', 'atlasId', 'cohortId')]
c$cohortType <- cohortGroups$cohortGroup[i]
cohorts <- rbind(cohorts, c)
Expand Down Expand Up @@ -104,7 +104,7 @@ getThisPackageName <- function() {
readCsv <- function(resourceFile) {
packageName <- getThisPackageName()
pathToCsv <- system.file(resourceFile, package = packageName, mustWork = TRUE)
fileContents <- readr::read_csv(pathToCsv, col_types = readr::cols())
fileContents <- data.table::fread(pathToCsv)
return(fileContents)
}

Expand Down
20 changes: 10 additions & 10 deletions R/RunStudy.R
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ runStudy <- function(connectionDetails = NULL,
DistribAnalyses <- c('AgeAtDiagnosis', 'YearOfDiagnosis', 'CharlsonAtDiagnosis', 'PsaAtDiagnosis', outcomeBasedAnalyses)
outcomes <- getFeatures()

metricsDistribution <- data.frame()
metricsDistribution <- data.table()

for(analysis in DistribAnalyses){
outcome <- gsub("TimeTo", "", analysis)
outcome <- substring(SqlRender::camelCaseToTitleCase(outcome), 2)
outcomeId <- outcomes[tolower(outcomes$name) == tolower(outcome), "cohortId"][[1]]
outcomeId <- outcomes[tolower(outcomes$name) == tolower(outcome), "cohortId"]

if (length(outcomeId) == 0 & analysis %in% outcomeBasedAnalyses){
next
Expand Down Expand Up @@ -260,7 +260,7 @@ runStudy <- function(connectionDetails = NULL,
writeToCsv(counts, file.path(exportFolder, "cohort_count.csv"), incremental = incremental, cohortId = counts$cohortId)

# Read in the cohort counts
counts <- readr::read_csv(file.path(exportFolder, "cohort_count.csv"), col_types = readr::cols())
counts <- data.table::fread(file.path(exportFolder, "cohort_count.csv"))
colnames(counts) <- SqlRender::snakeCaseToCamelCase(colnames(counts))

# Export the cohorts from the study
Expand Down Expand Up @@ -411,9 +411,9 @@ exportResults <- function(exportFolder, databaseId, cohortIdsToExcludeFromResult
# Censor out the cohorts based on the IDs passed in
for(i in 1:length(filesWithCohortIds)) {
fileName <- file.path(tempFolder, filesWithCohortIds[i])
fileContents <- readr::read_csv(fileName, col_types = readr::cols())
fileContents <- data.table::fread(fileName)
fileContents <- fileContents[!(fileContents$cohort_id %in% cohortIdsToExcludeFromResultsExport),]
readr::write_csv(fileContents, fileName)
data.table::fwrite(fileContents, fileName)
}

# Zip the results and copy to the main export folder
Expand Down Expand Up @@ -591,7 +591,7 @@ writeToCsv <- function(data, fileName, incremental = FALSE, ...) {
params$fileName = fileName
do.call(saveIncremental, params)
} else {
readr::write_csv(data, fileName)
data.table::fwrite(data, fileName)
}
}

Expand Down Expand Up @@ -672,7 +672,7 @@ recordTasksDone <- function(..., checksum, recordKeepingFile, incremental = TRUE
return()
}
if (file.exists(recordKeepingFile)) {
recordKeeping <- readr::read_csv(recordKeepingFile, col_types = readr::cols())
recordKeeping <- data.table::fread(recordKeepingFile)
idx <- getKeyIndex(list(...), recordKeeping)
if (length(idx) > 0) {
recordKeeping <- recordKeeping[-idx, ]
Expand All @@ -684,20 +684,20 @@ recordTasksDone <- function(..., checksum, recordKeepingFile, incremental = TRUE
newRow$checksum <- checksum
newRow$timeStamp <- Sys.time()
recordKeeping <- dplyr::bind_rows(recordKeeping, newRow)
readr::write_csv(recordKeeping, recordKeepingFile)
data.table::fwrite(recordKeeping, recordKeepingFile)
}

saveIncremental <- function(data, fileName, ...) {
if (length(list(...)[[1]]) == 0) {
return()
}
if (file.exists(fileName)) {
previousData <- readr::read_csv(fileName, col_types = readr::cols())
previousData <- data.table::fread(fileName)
idx <- getKeyIndex(list(...), previousData)
if (length(idx) > 0) {
previousData <- previousData[-idx, ]
}
data <- dplyr::bind_rows(previousData, data)
}
readr::write_csv(data, fileName)
data.table::fwrite(data, fileName)
}
1 change: 1 addition & 0 deletions R/Shiny.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ launchShinyApp <- function(outputFolder,
ensure_installed("shinydashboard")
ensure_installed("shinyWidgets")
ensure_installed("DT")
ensure_installed("data.table")
ensure_installed("VennDiagram")
ensure_installed("htmltools")
ensure_installed("pool")
Expand Down
4 changes: 2 additions & 2 deletions codemeta.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"codeRepository": "https://github.com/ohdsi-studies/PioneerWatchfulWaiting",
"issueTracker": "https://github.com/ohdsi-studies/PioneerWatchfulWaiting/issues",
"license": "https://spdx.org/licenses/Apache-2.0",
"version": "0.4.3.2",
"version": "0.4.4",
"programmingLanguage": {
"@type": "ComputerLanguage",
"name": "R",
Expand Down Expand Up @@ -309,5 +309,5 @@
"SystemRequirements": null
},
"isPartOf": "https://ohdsi.org",
"fileSize": "39823.066KB"
"fileSize": "1877409.878KB"
}
40 changes: 24 additions & 16 deletions inst/shiny/PioneerWatchfulWaitingExplorer/global.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
library(shiny)
library(pool)
library(DatabaseConnector)
library(data.table)
source("DataPulls.R")

connPool <- NULL # Will be initialized if using a DB
Expand Down Expand Up @@ -72,7 +73,7 @@ if (dataStorage == "database") {
# print(file)
tableName <- gsub(".csv$", "", file)
camelCaseName <- SqlRender::snakeCaseToCamelCase(tableName)
data <- readr::read_csv(file.path(folder, file), col_types = readr::cols(), guess_max = 1e7, locale = readr::locale(encoding = "UTF-8"))
data <- data.table::fread(file.path(folder, file))
colnames(data) <- SqlRender::snakeCaseToCamelCase(colnames(data))

if (!overwrite && exists(camelCaseName, envir = .GlobalEnv)) {
Expand Down Expand Up @@ -138,21 +139,21 @@ if (exists("covariate")) {
}

# Setup filters
domain <- data.frame()
domain <- rbind(domain,data.frame(name = "All", covariateAnalysisId = c(1:10000)))
domain <- rbind(domain,data.frame(name = "Cohort", covariateAnalysisId = c(10000)))
domain <- rbind(domain,data.frame(name = "Demographics", covariateAnalysisId = c(1:99)))
domain <- rbind(domain,data.frame(name = "Drug", covariateAnalysisId = c(412)))
domain <- rbind(domain,data.frame(name = "Condition", covariateAnalysisId = c(212)))
domain <- rbind(domain,data.frame(name = 'Procedure', covariateAnalysisId = c(712)))
domain <- data.table()
domain <- rbind(domain,data.table(name = "All", covariateAnalysisId = c(1:10000)))
domain <- rbind(domain,data.table(name = "Cohort", covariateAnalysisId = c(10000)))
domain <- rbind(domain,data.table(name = "Demographics", covariateAnalysisId = c(1:99)))
domain <- rbind(domain,data.table(name = "Drug", covariateAnalysisId = c(412)))
domain <- rbind(domain,data.table(name = "Condition", covariateAnalysisId = c(212)))
domain <- rbind(domain,data.table(name = 'Procedure', covariateAnalysisId = c(712)))
domain$name <- as.character(domain$name)
domainName <- "All"

# This must match the featureTimeWindow.csv from the Pioneer study
timeWindow <- data.frame(windowId=c(1:4), name=c("-365 to index", "index to 365", "366d to 730d", "731d+"))
timeWindow <- data.table(windowId=c(1:4), name=c("-365 to index", "index to 365", "366d to 730d", "731d+"))
timeWindow$name <- as.character(timeWindow$name)

cohortXref <- readr::read_csv("./cohortXref.csv", col_types = readr::cols())
cohortXref <- data.table::fread("./cohortXref.csv")
targetCohort <- cohortXref[,c("targetId","targetName")]
targetCohort <- unique(targetCohort)
targetCohort <- targetCohort[order(targetCohort$targetName),]
Expand All @@ -177,7 +178,7 @@ strataName <- cohortXref[cohortXref$cohortId == initCharCohortId,c("strataName")
comparatorName <- cohortXref[cohortXref$cohortId == initCharCompareCohortId,c("targetName")][1]
comparatorStrataName <- cohortXref[cohortXref$cohortId == initCharCompareCohortId,c("strataName")][1]

cohortInfo <- readr::read_csv("./cohorts.csv", col_types = readr::cols())
cohortInfo <- data.table::fread("./cohorts.csv")
cohortInfo <- cohortInfo[order(cohortInfo$name),]

# Read in the database terms of use
Expand All @@ -188,12 +189,19 @@ database <- database[order(database$databaseId),]


# Add Time to Event names and ids

# Gather unique outcome ids in time to event table
ids <- unique(cohortTimeToEvent$outcomeId)
names <- unique(cohortStagingCount$name[cohortStagingCount$cohortId %in% ids])
if(length(cohortStagingCount$name[cohortStagingCount$cohortId == max(ids)]) == 0){
names <- c(names, 'Symptomatic progr. free surv.')
}
# Find corresponding cohort names
names <- sapply(ids,function(id){ cohortStagingCount$name[cohortStagingCount$cohortId == id ][1]})

# hack/fix which I don't understand
#if(length(cohortStagingCount$name[cohortStagingCount$cohortId == max(ids)]) == 0){
# names <- c(names, 'Symptomatic progr. free surv.')
#}

KMIds <- data.frame(id = ids,
KMIds <- data.table(id = ids,
name = names)

#Filter out NA value in name which leads to problems with computations and plotting
KMIds <- KMIds[!is.na(KMIds$name)]
19 changes: 10 additions & 9 deletions inst/shiny/PioneerWatchfulWaitingExplorer/server.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ library(shiny)
library(shinydashboard)
library(DT)
library(htmltools)
library(data.table)
source("PlotsAndTables.R")
source("utilities.R")
source("survplot_core.R")
Expand Down Expand Up @@ -302,7 +303,7 @@ shinyServer(function(input, output, session) {
output$TimeToEventDeath <- renderPlot({
target_id <- cohortCount[cohortCount$databaseId %in% input$databasesTimeToEvent &
cohortCount$cohortId %in% cohortIdTimeToEvent(), ][[1]]
target_id_entries_num <- cohortCount[cohortCount$cohortId == target_id, "cohortEntries"][[1]]
target_id_entries_num <- sum(cohortCount[cohortCount$cohortId == target_id, "cohortEntries"])

if (length(target_id) == 0 | target_id_entries_num <= 100 | is.null(input$KMPlot)){
plot <- ggplot2::ggplot()
Expand All @@ -312,7 +313,7 @@ shinyServer(function(input, output, session) {
targetIdTimeToEventData <- cohortTimeToEvent %>% dplyr::filter(targetId == target_id,
databaseId == input$databasesTimeToEvent)

accumulatedData <- data.frame(time = c(), surv = c(), n.censor = c(),
accumulatedData <- data.table(time = c(), surv = c(), n.censor = c(),
n.event = c(), upper = c(), lower = c())
for(plotName in input$KMPlot){
oId <- KMIds$id[KMIds$name == plotName]
Expand All @@ -330,7 +331,6 @@ shinyServer(function(input, output, session) {
color_map <- c("#000000", "#E69F00", "#56B4E9", "#009E73", "#F0E442", "#0072B2", "#D55E00", "#CC79A7")
names(color_map) <- KMIds$name


plot <- ggsurvplot_core(accumulatedData,
risk.table = "nrisk_cumcensor",
palette = color_map,
Expand All @@ -354,12 +354,13 @@ shinyServer(function(input, output, session) {


getMetricsTable <- reactive ({
browser()
target_id <- cohortCount[cohortCount$databaseId %in% input$databasesMetricsDistribution
& cohortCount$cohortId %in% cohortIdMetricsDistribution(), ][[1]]
metricsTable <- metricsDistribution %>% dplyr::filter(cohortDefinitionId == target_id,
databaseId == input$databasesMetricsDistribution)
names(metricsTable)[names(metricsTable) == 'iqr'] <- 'IQR'
return(metricsTable[c('analysisName', 'IQR', 'minimum', 'q1', 'median', 'q3', 'maximum')])
return(metricsTable[,c('analysisName', 'IQR', 'minimum', 'q1', 'median', 'q3', 'maximum')])
})


Expand All @@ -386,11 +387,11 @@ shinyServer(function(input, output, session) {
data <- getCohortCountsTable()
databaseIds <- unique(data$databaseId)
databaseIds <- sort(databaseIds)
table <- data[data$databaseId == databaseIds[1], columnsToInclude]
table <- data[data$databaseId == databaseIds[1], ..columnsToInclude]
colnames(table)[subjectIndex] <- paste(colnames(table)[2], databaseIds[1], sep = "_")
if (length(databaseIds) > 1) {
for (i in 2:length(databaseIds)) {
temp <- data[data$databaseId == databaseIds[i], columnsToInclude]
temp <- data[data$databaseId == databaseIds[i], ..columnsToInclude]
colnames(temp)[subjectIndex] <- paste(colnames(temp)[subjectIndex], databaseIds[i], sep = "_")
table <- merge(table, temp, all = TRUE)
}
Expand Down Expand Up @@ -498,11 +499,11 @@ shinyServer(function(input, output, session) {
databaseIds <- unique(data$databaseId)
databaseIdsWithCounts <- merge(databaseIds, counts, by.x="x", by.y="databaseId")
databaseIdsWithCounts <- dplyr::rename(databaseIdsWithCounts, databaseId="x")
table <- data[data$databaseId == databaseIdsWithCounts$databaseId[1], columnsToInclude]
table <- data[data$databaseId == databaseIdsWithCounts$databaseId[1], ..columnsToInclude]
colnames(table)[meanColumnIndex] <- paste(colnames(table)[meanColumnIndex], databaseIdsWithCounts$databaseId[1], sep = "_")
if (nrow(databaseIdsWithCounts) > 1) {
for (i in 2:nrow(databaseIdsWithCounts)) {
temp <- data[data$databaseId == databaseIdsWithCounts$databaseId[i], columnsToInclude]
temp <- data[data$databaseId == databaseIdsWithCounts$databaseId[i], ..columnsToInclude]
colnames(temp)[meanColumnIndex] <- paste(colnames(temp)[meanColumnIndex], databaseIdsWithCounts$databaseId[i], sep = "_")
table <- merge(table, temp, all = TRUE)
}
Expand Down Expand Up @@ -585,7 +586,7 @@ shinyServer(function(input, output, session) {

computeBalance <- reactive({
if (cohortId() == comparatorCohortId()) {
return(data.frame())
return(data.table())
}
covariateFiltered <- getFilteredCovariates()
covariateValue <- getCovariateDataSubset(cohortId(), input$database, comparatorCohortId())
Expand Down

0 comments on commit 574bf2d

Please sign in to comment.