Skip to content

Commit

Permalink
More pool resolution (#69)
Browse files Browse the repository at this point in the history
* Added failing test

* Fixes for not checked out connections

* Package maintenance

* moving dbisvalid for pool

* revertd previous change

* Check for active connection prior to query

* doc strings
  • Loading branch information
azimov authored Aug 13, 2024
1 parent 14aed39 commit b295cb3
Show file tree
Hide file tree
Showing 38 changed files with 427 additions and 140 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: ResultModelManager
Title: Result Model Manager
Version: 0.5.9
Version: 0.5.10
Authors@R:
person("Jamie", "Gilbert", , "[email protected]", role = c("aut", "cre"))
Description: Database data model management utilities for R packages in the Observational Health Data Sciences and
Expand Down
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# ResultModelManager 0.5.10

Bug fixes:

1. Resolved issue where failed queries were being aborted inside the wrong connection
in PooledConnectionHandler

2. Refactored pooled connection handler to better ensure checkout connections are returned

# ResultModelManager 0.5.9

Changes:
Expand Down
40 changes: 22 additions & 18 deletions R/ConnectionHandler.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Limit row count is intended for web applications that may cause a denial of service
.limitRowCount <- function(sql, overrideRowLimit) {
limitRowCount <- as.integer(Sys.getenv("LIMIT_ROW_COUNT"))
if (!is.na(limitRowCount) &
limitRowCount > 0 &
!overrideRowLimit) {
sql <- SqlRender::render("SELECT TOP @limit_row_count * FROM (@query) result;",
query = gsub(";$", "", sql), # Remove last semi-colon
limit_row_count = limitRowCount
)
}
return(sql)
}

#' ConnectionHandler
#' @description
#' Class for handling DatabaseConnector:connection objects with consistent R6 interfaces for pooled and non-pooled connections.
Expand Down Expand Up @@ -111,10 +125,6 @@ ConnectionHandler <- R6::R6Class(
#' Connects automatically if it isn't yet loaded
#' @returns DatabaseConnector Connection instance
getConnection = function() {
if (is.null(self$con)) {
self$initConnection()
}

if (!self$dbIsValid()) {
self$initConnection()
}
Expand Down Expand Up @@ -168,15 +178,7 @@ ConnectionHandler <- R6::R6Class(
#' @param ... Additional query parameters
#' @returns boolean TRUE if connection is valid
queryDb = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase, overrideRowLimit = FALSE, ...) {
# Limit row count is intended for web applications that may cause a denial of service if they consume too many
# resources.
limitRowCount <- as.integer(Sys.getenv("LIMIT_ROW_COUNT"))
if (!is.na(limitRowCount) & limitRowCount > 0 & !overrideRowLimit) {
sql <- SqlRender::render("SELECT TOP @limit_row_count * FROM (@query) result;",
query = gsub(";$", "", sql), # Remove last semi-colon
limit_row_count = limitRowCount
)
}
sql <- .limitRowCount(sql, overrideRowLimit)
sql <- self$renderTranslateSql(sql, ...)

tryCatch(
Expand All @@ -203,7 +205,7 @@ ConnectionHandler <- R6::R6Class(

tryCatch(
{
data <- self$executeFunction(sql)
self$executeFunction(sql)
},
error = function(error) {
if (self$dbms() %in% c("postgresql", "redshift")) {
Expand All @@ -223,17 +225,19 @@ ConnectionHandler <- R6::R6Class(
#' Does not translate or render sql.
#' @param sql sql query string
#' @param snakeCaseToCamelCase (Optional) Boolean. return the results columns in camel case (default)
queryFunction = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase) {
DatabaseConnector::querySql(self$getConnection(), sql, snakeCaseToCamelCase = snakeCaseToCamelCase)
#' @param connection (Optional) connection object
queryFunction = function(sql, snakeCaseToCamelCase = self$snakeCaseToCamelCase, connection = self$getConnection()) {
DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = snakeCaseToCamelCase)
},

#' execute Function
#' @description
#' exec query Function that can be overriden with subclasses (e.g. use different base function or intercept query)
#' Does not translate or render sql.
#' @param sql sql query string
executeFunction = function(sql) {
DatabaseConnector::executeSql(self$getConnection(), sql)
#' @param connection connection object
executeFunction = function(sql, connection = self$getConnection()) {
DatabaseConnector::executeSql(connection, sql)
}
)
)
52 changes: 26 additions & 26 deletions R/DataModel.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ checkAndFixColumnNames <-
expectedNames <- tableSpecs %>%
dplyr::select("columnName") %>%
dplyr::anti_join(dplyr::filter(optionalNames, !.data$columnName %in% observeredNames),
by = "columnName"
by = "columnName"
) %>%
dplyr::arrange("columnName") %>%
dplyr::pull()
Expand Down Expand Up @@ -181,7 +181,7 @@ checkAndFixDuplicateRows <-
specifications) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
duplicatedRows <- duplicated(table[, primaryKeys])
Expand All @@ -194,7 +194,7 @@ checkAndFixDuplicateRows <-
sum(duplicatedRows)
)
)
return(table[!duplicatedRows,])
return(table[!duplicatedRows, ])
} else {
return(table)
}
Expand All @@ -220,7 +220,7 @@ appendNewRows <-
if (nrow(data) > 0) {
primaryKeys <- specifications %>%
dplyr::filter(.data$tableName == !!tableName &
tolower(.data$primaryKey) == "yes") %>%
tolower(.data$primaryKey) == "yes") %>%
dplyr::select("columnName") %>%
dplyr::pull()
newData <- newData %>%
Expand Down Expand Up @@ -250,10 +250,10 @@ formatDouble <- function(x) {

.truncateTable <- function(tableName, connection, schema, tablePrefix) {
DatabaseConnector::renderTranslateExecuteSql(connection,
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
"TRUNCATE TABLE @schema.@table_prefix@table;",
table_prefix = tablePrefix,
schema = schema,
table = tableName
)
invisible(NULL)
}
Expand Down Expand Up @@ -354,8 +354,8 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
primaryKeyValuesInChunk <- unique(chunk[env$primaryKey])
duplicates <-
dplyr::inner_join(env$primaryKeyValuesInDb,
primaryKeyValuesInChunk,
by = env$primaryKey
primaryKeyValuesInChunk,
by = env$primaryKey
)

if (nrow(duplicates) != 0) {
Expand Down Expand Up @@ -386,7 +386,7 @@ uploadChunk <- function(chunk, pos, env, specifications, resultsFolder, connecti
# Remove duplicates we already dealt with:
env$primaryKeyValuesInDb <-
env$primaryKeyValuesInDb %>%
dplyr::anti_join(duplicates, by = env$primaryKey)
dplyr::anti_join(duplicates, by = env$primaryKey)
}
}
if (nrow(chunk) == 0) {
Expand Down Expand Up @@ -424,7 +424,7 @@ uploadTable <- function(tableName,
warnOnMissingTable) {
csvFileName <- paste0(tableName, ".csv")
specifications <- specifications %>%
dplyr::filter(.data$tableName == !!tableName)
dplyr::filter(.data$tableName == !!tableName)

if (csvFileName %in% list.files(resultsFolder)) {
rlang::inform(paste0("Uploading file: ", csvFileName, " to table: ", tableName))
Expand Down Expand Up @@ -485,11 +485,12 @@ uploadTable <- function(tableName,
convertType <- Vectorize(
function(type) {
switch(type,
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?") # default to guess if type not matched
varchar = "c",
bigint = "n",
int = "n",
date = "D",
"?"
) # default to guess if type not matched
}
)

Expand Down Expand Up @@ -592,10 +593,10 @@ uploadResults <- function(connection = NULL,
ParallelLogger::logInfo("Removing all records for tables within specification")

invisible(lapply(unique(specifications$tableName),
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
.truncateTable,
connection = connection,
schema = schema,
tablePrefix = tablePrefix
))
}

Expand Down Expand Up @@ -659,7 +660,6 @@ uploadResults <- function(connection = NULL,
#' @export
deleteAllRowsForPrimaryKey <-
function(connection, schema, tableName, keyValues) {

createSqlStatement <- function(i) {
sql <- paste0(
"DELETE FROM ",
Expand All @@ -668,7 +668,7 @@ deleteAllRowsForPrimaryKey <-
tableName,
"\nWHERE ",
paste(paste0(
colnames(keyValues), " = '", keyValues[i,], "'"
colnames(keyValues), " = '", keyValues[i, ], "'"
), collapse = " AND "),
";"
)
Expand Down Expand Up @@ -743,9 +743,9 @@ deleteAllRowsForDatabaseId <-
database_id = databaseId
)
DatabaseConnector::executeSql(connection,
sql,
progressBar = FALSE,
reportOverallTime = FALSE
sql,
progressBar = FALSE,
reportOverallTime = FALSE
)
}
}
Expand Down
Loading

0 comments on commit b295cb3

Please sign in to comment.