Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to resume execution of DQD checks #411

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion R/executeDqChecks.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#' @param tableCheckThresholdLoc The location of the threshold file for evaluating the table checks. If not specified the default thresholds will be applied.
#' @param fieldCheckThresholdLoc The location of the threshold file for evaluating the field checks. If not specified the default thresholds will be applied.
#' @param conceptCheckThresholdLoc The location of the threshold file for evaluating the concept checks. If not specified the default thresholds will be applied.
#' @param resume Boolean to indicate if processing will be resumed
#'
#' @return If sqlOnly = FALSE, a list object of results
#'
Expand Down Expand Up @@ -75,7 +76,8 @@ executeDqChecks <- function(connectionDetails,
cdmVersion = "5.3",
tableCheckThresholdLoc = "default",
fieldCheckThresholdLoc = "default",
conceptCheckThresholdLoc = "default") {
conceptCheckThresholdLoc = "default",
resume = FALSE) {
# Check input -------------------------------------------------------------------------------------------------------------------
if (!("connectionDetails" %in% class(connectionDetails))) {
stop("connectionDetails must be an object of class 'connectionDetails'.")
Expand All @@ -96,6 +98,7 @@ executeDqChecks <- function(connectionDetails,

stopifnot(is.null(checkNames) | is.character(checkNames), is.null(tablesToExclude) | is.character(tablesToExclude))
stopifnot(is.character(cdmVersion))
stopifnot(is.logical(resume))

# Warning if check names for determining NA is missing
if (!length(checkNames) == 0) {
Expand Down Expand Up @@ -248,7 +251,9 @@ executeDqChecks <- function(connectionDetails,
cohortDatabaseSchema,
cohortDefinitionId,
outputFolder,
outputFile,
sqlOnly,
resume,
progressBar = TRUE
)
ParallelLogger::stopCluster(cluster = cluster)
Expand Down
73 changes: 64 additions & 9 deletions R/runCheck.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#' @param cohortDatabaseSchema The schema where the cohort table is located.
#' @param cohortDefinitionId The cohort definition id for the cohort you wish to run the DQD on. The package assumes a standard OHDSI cohort table called 'Cohort'
#' @param outputFolder The folder to output logs and SQL files to
#' @param outputFile (OPTIONAL) File to re-use results of previous execution if resume is set
#' @param sqlOnly Should the SQLs be executed (FALSE) or just returned (TRUE)?
#' @param resume Boolean to indicate if processing will be resumed
#'
#' @import magrittr
#'
Expand All @@ -44,7 +46,9 @@
cohortDatabaseSchema,
cohortDefinitionId,
outputFolder,
sqlOnly) {
outputFile = "",
sqlOnly,
resume) {
ParallelLogger::logInfo(sprintf("Processing check description: %s", checkDescription$checkName))

filterExpression <- sprintf(
Expand All @@ -65,6 +69,21 @@
}

if (nrow(checks) > 0) {
recoveredNumber <- 0
checkResultsSaved <- NULL
if (resume && nchar(outputFile) > 0 && file.exists(file.path(outputFolder, outputFile))) {
dqdResults <- jsonlite::read_json(
path = file.path(outputFolder, outputFile)
)
checkResultsSaved <- lapply(
dqdResults$CheckResults,
function(cr) {
cr[sapply(cr, is.null)] <- NA
as.data.frame(cr)
}
)
checkResultsSaved <- do.call(plyr::rbind.fill, checkResultsSaved)
}
dfs <- apply(X = checks, MARGIN = 1, function(check) {
columns <- lapply(names(check), function(c) {
setNames(check[c], c)
Expand Down Expand Up @@ -92,16 +111,52 @@
), append = TRUE)
data.frame()
} else {
.processCheck(
connection = connection,
connectionDetails = connectionDetails,
check = check,
checkDescription = checkDescription,
sql = sql,
outputFolder = outputFolder
)
checkResult <- NULL
if (!is.null(checkResultsSaved)) {
currentCheckId <- .getCheckId(
checkLevel = checkDescription$checkLevel,
checkName = checkDescription$checkName,
cdmTableName = check["cdmTableName"],
cdmFieldName = check["cdmFieldName"],
conceptId = check["conceptId"],
unitConceptId = check["unitConceptId"]
)
checkResultCandidates <- checkResultsSaved %>% dplyr::filter(checkId == currentCheckId & is.na(ERROR))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes the ERROR field is present in the results object, which is not the case when none of the checks resulted in an error.

if (1 == nrow(checkResultCandidates)) {
savedResult <- checkResultCandidates[1, ]
warning <- if (is.null(savedResult$WARNING)) { NA } else { savedResult$WARNING }
checkResult <- .recordResult(
result = savedResult,
check = check,
checkDescription = checkDescription,
sql = sql,
executionTime = savedResult$EXECUTION_TIME,
warning = warning,
error = NA
)
recoveredNumber <<- recoveredNumber + 1
}
}

if (is.null(checkResult)) {
checkResult <- .processCheck(
connection = connection,
connectionDetails = connectionDetails,
check = check,
checkDescription = checkDescription,
sql = sql,
outputFolder = outputFolder
)
}

checkResult
}
})

if (recoveredNumber > 0) {
ParallelLogger::logInfo(sprintf("Recovered %s of %s results from %s", recoveredNumber, nrow(checks), outputFile))
}

do.call(rbind, dfs)
} else {
ParallelLogger::logWarn(paste0("Warning: Evaluation resulted in no checks: ", filterExpression))
Expand Down