From a27457982575565506defd391ad8c017e6903636 Mon Sep 17 00:00:00 2001 From: Gennadiy Anisimov Date: Mon, 19 Dec 2022 13:20:49 +0300 Subject: [PATCH] Add ability to resume execution of DQD checks It happens that due to connection issues or other conditions executeDqChecks fails and progress is lost. When new resume argument is set to TRUE, file in outputFile path is used as source of check results instead of actual processing. Missing check results or results reporting error are re-processed. Note that outputFile must be set explicitely to make resuming to take effect. New results overwrite file in outputFile path. Addresses #109 --- R/executeDqChecks.R | 7 ++++- R/runCheck.R | 73 +++++++++++++++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/R/executeDqChecks.R b/R/executeDqChecks.R index 69dcf381..6aded2ae 100644 --- a/R/executeDqChecks.R +++ b/R/executeDqChecks.R @@ -43,6 +43,7 @@ #' @param tableCheckThresholdLoc The location of the threshold file for evaluating the table checks. If not specified the default thresholds will be applied. #' @param fieldCheckThresholdLoc The location of the threshold file for evaluating the field checks. If not specified the default thresholds will be applied. #' @param conceptCheckThresholdLoc The location of the threshold file for evaluating the concept checks. If not specified the default thresholds will be applied. +#' @param resume Boolean to indicate if processing will be resumed #' #' @return If sqlOnly = FALSE, a list object of results #' @@ -75,7 +76,8 @@ executeDqChecks <- function(connectionDetails, cdmVersion = "5.3", tableCheckThresholdLoc = "default", fieldCheckThresholdLoc = "default", - conceptCheckThresholdLoc = "default") { + conceptCheckThresholdLoc = "default", + resume = FALSE) { # Check input ------------------------------------------------------------------------------------------------------------------- if (!("connectionDetails" %in% class(connectionDetails))) { stop("connectionDetails must be an object of class 'connectionDetails'.") @@ -96,6 +98,7 @@ executeDqChecks <- function(connectionDetails, stopifnot(is.null(checkNames) | is.character(checkNames), is.null(tablesToExclude) | is.character(tablesToExclude)) stopifnot(is.character(cdmVersion)) + stopifnot(is.logical(resume)) # Warning if check names for determining NA is missing if (!length(checkNames) == 0) { @@ -248,7 +251,9 @@ executeDqChecks <- function(connectionDetails, cohortDatabaseSchema, cohortDefinitionId, outputFolder, + outputFile, sqlOnly, + resume, progressBar = TRUE ) ParallelLogger::stopCluster(cluster = cluster) diff --git a/R/runCheck.R b/R/runCheck.R index e7bb491b..778619ea 100644 --- a/R/runCheck.R +++ b/R/runCheck.R @@ -27,7 +27,9 @@ #' @param cohortDatabaseSchema The schema where the cohort table is located. #' @param cohortDefinitionId The cohort definition id for the cohort you wish to run the DQD on. The package assumes a standard OHDSI cohort table called 'Cohort' #' @param outputFolder The folder to output logs and SQL files to +#' @param outputFile (OPTIONAL) File to re-use results of previous execution if resume is set #' @param sqlOnly Should the SQLs be executed (FALSE) or just returned (TRUE)? +#' @param resume Boolean to indicate if processing will be resumed #' #' @import magrittr #' @@ -44,7 +46,9 @@ cohortDatabaseSchema, cohortDefinitionId, outputFolder, - sqlOnly) { + outputFile = "", + sqlOnly, + resume) { ParallelLogger::logInfo(sprintf("Processing check description: %s", checkDescription$checkName)) filterExpression <- sprintf( @@ -65,6 +69,21 @@ } if (nrow(checks) > 0) { + recoveredNumber <- 0 + checkResultsSaved <- NULL + if (resume && nchar(outputFile) > 0 && file.exists(file.path(outputFolder, outputFile))) { + dqdResults <- jsonlite::read_json( + path = file.path(outputFolder, outputFile) + ) + checkResultsSaved <- lapply( + dqdResults$CheckResults, + function(cr) { + cr[sapply(cr, is.null)] <- NA + as.data.frame(cr) + } + ) + checkResultsSaved <- do.call(plyr::rbind.fill, checkResultsSaved) + } dfs <- apply(X = checks, MARGIN = 1, function(check) { columns <- lapply(names(check), function(c) { setNames(check[c], c) @@ -92,16 +111,52 @@ ), append = TRUE) data.frame() } else { - .processCheck( - connection = connection, - connectionDetails = connectionDetails, - check = check, - checkDescription = checkDescription, - sql = sql, - outputFolder = outputFolder - ) + checkResult <- NULL + if (!is.null(checkResultsSaved)) { + currentCheckId <- .getCheckId( + checkLevel = checkDescription$checkLevel, + checkName = checkDescription$checkName, + cdmTableName = check["cdmTableName"], + cdmFieldName = check["cdmFieldName"], + conceptId = check["conceptId"], + unitConceptId = check["unitConceptId"] + ) + checkResultCandidates <- checkResultsSaved %>% dplyr::filter(checkId == currentCheckId & is.na(ERROR)) + if (1 == nrow(checkResultCandidates)) { + savedResult <- checkResultCandidates[1, ] + warning <- if (is.null(savedResult$WARNING)) { NA } else { savedResult$WARNING } + checkResult <- .recordResult( + result = savedResult, + check = check, + checkDescription = checkDescription, + sql = sql, + executionTime = savedResult$EXECUTION_TIME, + warning = warning, + error = NA + ) + recoveredNumber <<- recoveredNumber + 1 + } + } + + if (is.null(checkResult)) { + checkResult <- .processCheck( + connection = connection, + connectionDetails = connectionDetails, + check = check, + checkDescription = checkDescription, + sql = sql, + outputFolder = outputFolder + ) + } + + checkResult } }) + + if (recoveredNumber > 0) { + ParallelLogger::logInfo(sprintf("Recovered %s of %s results from %s", recoveredNumber, nrow(checks), outputFile)) + } + do.call(rbind, dfs) } else { ParallelLogger::logWarn(paste0("Warning: Evaluation resulted in no checks: ", filterExpression))