Skip to content

Commit

Permalink
Merge pull request #2287 from hongwei1/develop
Browse files Browse the repository at this point in the history
feature/added the HikariCP
  • Loading branch information
simonredfern authored Oct 9, 2023
2 parents 18b8340 + 1136da0 commit 860a33f
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 396 deletions.
5 changes: 5 additions & 0 deletions obp-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@
<artifactId>flexmark-util-options</artifactId>
<version>0.64.0</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
</dependency>
<!-- ********** flexmark END ********** -->
<!--scala utils, for type scan-->
<dependency>
Expand Down
4 changes: 0 additions & 4 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ write_connector_metrics=true
db.driver=org.h2.Driver
db.url=jdbc:h2:./lift_proto.db;NON_KEYWORDS=VALUE;DB_CLOSE_ON_EXIT=FALSE

#the default max database connection pool size is 30
db.maxPoolSize=30


#If you want to use the postgres , be sure to create your database and update the line below!
#db.driver=org.postgresql.Driver
#db.url=jdbc:postgresql://localhost:5432/dbname?user=dbusername&password=thepassword
Expand Down
312 changes: 140 additions & 172 deletions obp-api/src/main/scala/bootstrap/liftweb/Boot.scala

Large diffs are not rendered by default.

181 changes: 27 additions & 154 deletions obp-api/src/main/scala/bootstrap/liftweb/CustomDBVendor.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package bootstrap.liftweb

import code.api.util.APIUtil
import java.sql.{Connection, DriverManager}
import com.zaxxer.hikari.pool.ProxyConnection
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

import net.liftweb.common.{Box, Failure, Full, Logger}
import java.sql.{Connection}
import net.liftweb.common.{Box, Full, Logger}
import net.liftweb.db.ConnectionManager
import net.liftweb.util.ConnectionIdentifier
import net.liftweb.util.Helpers.tryo

/**
* The standard DB vendor.
* The Custom DB vendor.
*
* @param driverName the name of the database driver
* @param dbUrl the URL for the JDBC data connection
Expand All @@ -23,170 +24,42 @@ class CustomDBVendor(driverName: String,

private val logger = Logger(classOf[CustomDBVendor])

def createOne: Box[Connection] = {
tryo{t:Throwable => logger.error("Cannot load database driver: %s".format(driverName), t)}{Class.forName(driverName);()}
object HikariDatasource {
val config = new HikariConfig()

(dbUser, dbPassword) match {
case (Full(user), Full(pwd)) =>
tryo{t:Throwable => logger.error("Unable to get database connection. url=%s, user=%s".format(dbUrl, user),t)}(DriverManager.getConnection(dbUrl, user, pwd))
config.setJdbcUrl(dbUrl)
config.setUsername(user)
config.setPassword(pwd)
case _ =>
tryo{t:Throwable => logger.error("Unable to get database connection. url=%s".format(dbUrl),t)}(DriverManager.getConnection(dbUrl))
config.setJdbcUrl(dbUrl)
}

config.addDataSourceProperty("cachePrepStmts", "true")
config.addDataSourceProperty("prepStmtCacheSize", "250")
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")

val ds: HikariDataSource = new HikariDataSource(config)
}

def createOne: Box[Connection] = {
tryo{t:Throwable => logger.error("Cannot load database driver: %s".format(driverName), t)}{Class.forName(driverName);()}
tryo{t:Throwable => logger.error("Unable to get database connection. url=%s".format(dbUrl),t)}(HikariDatasource.ds.getConnection())
}

def closeAllConnections_!(): Unit = HikariDatasource.ds.close()
}

trait CustomProtoDBVendor extends ConnectionManager {
private val logger = Logger(classOf[CustomProtoDBVendor])
private var freePool: List[Connection] = Nil // no process use the connections, they are available for use
private var usedPool: List[Connection] = Nil // connections are already used, not available for use
// private var totalConnectionsCount = 0
// private var tempMaxSize = maxPoolSize

/**
* Override and set to false if the maximum freePool size can temporarily be expanded to avoid freePool starvation
*/
protected def allowTemporaryPoolExpansion = false

/**
* Override this method if you want something other than 10 connections in the freePool and usedPool
* freePool.size + usedPool.size <=10
*/
val dbMaxPoolSize = APIUtil.getPropsAsIntValue("db.maxPoolSize",30)
protected def maxPoolSize = dbMaxPoolSize

/**
* The absolute maximum that this freePool can extend to
* The default is 40. Override this method to change.
*/
protected def doNotExpandBeyond = 40

/**
* The logic for whether we can expand the freePool beyond the current size. By
* default, the logic tests allowTemporaryPoolExpansion &amp;&amp; totalConnectionsCount &lt;= doNotExpandBeyond
*/
// protected def canExpand_? : Boolean = allowTemporaryPoolExpansion && totalConnectionsCount <= doNotExpandBeyond

/**
* How is a connection created?
*/
def createOne: Box[Connection]

/**
* Test the connection. By default, setAutoCommit(false),
* but you can do a real query on your RDBMS to see if the connection is alive
*/
protected def testConnection(conn: Connection) {
conn.setAutoCommit(false)
}

// Tail Recursive function in order to avoid Stack Overflow
// PLEASE NOTE: Changing this function you can break the above named feature
def newConnection(name: ConnectionIdentifier): Box[Connection] = {
val (connection: Box[Connection], needRecursiveAgain: Boolean) = commonPart(name)
needRecursiveAgain match {
case true => newConnection(name)
case false => connection
}
createOne
}

def releaseConnection(conn: Connection): Unit = {conn.asInstanceOf[ProxyConnection].close()}

def commonPart(name: ConnectionIdentifier): (Box[Connection], Boolean) =
synchronized {
freePool match {
case Nil if (freePool.size + usedPool.size) < maxPoolSize =>{
val ret = createOne // get oneConnection from JDBC, not in the freePool yet, we add ot the Pool when we release it .
try {
ret.head.setAutoCommit(false) // we test the connection status, if it is success, we return it back.
usedPool = ret.head :: usedPool
logger.trace(s"Created connection is good, detail is $ret ")
} catch {
case e: Exception =>
logger.trace(s"Created connection is bad, detail is $e")
}

//Note: we may return the invalid connection
(ret, false)
}

case Nil => //freePool is empty and we are at maxPoolSize limit
wait(50L)
logger.error(s"The (freePool.size + usedPool.size) is at the limit ($maxPoolSize) and there are no free connections.")
(
Failure(s"The (freePool.size + usedPool.size) is at the limit ($maxPoolSize) and there are no free connections."),
false
)

case freeHead :: freeTail =>//if freePool is not empty, we just get connection from freePool, no need to create new connection from JDBC.
logger.trace("Found connection in freePool, name=%s freePool size =%s".format(name, freePool.size))

freePool = freeTail // remove the head from freePool
//TODO check if we need add head or tail
usedPool = freeHead :: usedPool // we added connection to usedPool

try {
this.testConnection(freeHead) // we test the connection status, if it is success, we return it back.
(Full(freeHead),false)
} catch {
case e: Exception => try {
logger.error(s"testConnection failed, try to close it and call newConnection(name), detail is $e")
tryo(freeHead.close) // call JDBC to close this connection
(
Failure(s"testConnection failed, try to close it and call newConnection(name), detail is $e"),
true
)
} catch {
case e: Exception =>{
logger.error(s"could not close connection and call newConnection(name), detail is $e")
(
Failure(s"could not close connection and call newConnection(name), detail is $e"),
true
)
}
}
}
}
}

def releaseConnection(conn: Connection): Unit = synchronized {
usedPool = usedPool.filterNot(_ ==conn)
logger.trace(s"Released connection. removed connection from usedPool size is ${usedPool.size}")
//TODO check if we need add head or tail
freePool = conn :: freePool
logger.trace(s"Released connection. added connection to freePool size is ${freePool.size}")
notifyAll
}

def closeAllConnections_!(): Unit = _closeAllConnections_!(0)


private def _closeAllConnections_!(cnt: Int): Unit = synchronized {
logger.trace(s"Closing all connections, try the $cnt time")
if (cnt > 10) ()//we only try this 10 times,
else {
freePool.foreach {c => tryo(c.close);}
usedPool.foreach {c => tryo(c.close);}
freePool = Nil
usedPool = Nil

if (usedPool.length > 0 || freePool.length > 0) wait(250)

_closeAllConnections_!(cnt + 1)
}
}


//This is only for debugging
def logAllConnectionsStatus = {
logger.trace(s"Hello from logAllConnectionsStatus: usedPool.size is ${usedPool.length}, freePool.size is ${freePool.length}")
for {
usedConnection <- usedPool
} yield {
logger.trace(s"usedConnection (${usedConnection.toString}): isClosed-${usedConnection.isClosed}, getWarnings-${usedConnection.getWarnings}")
}
for {
freeConnection <- freePool
} yield {
logger.trace(s"freeConnection (${freeConnection.toString}): isClosed-${freeConnection.isClosed}, getWarnings-${freeConnection.getWarnings}")
}
}
}
}
28 changes: 28 additions & 0 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TESOBE (http://www.tesobe.com/)

package code.api.util

import bootstrap.liftweb.CustomDBVendor
import java.io.InputStream
import java.net.URLDecoder
import java.nio.charset.Charset
Expand Down Expand Up @@ -4783,5 +4784,32 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
val (serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
logger.debug(s"decrementFutureCounter says: serviceName is $serviceName, serviceNameCounterLatest is $serviceNameCounterLatest, serviceNameOpenFuturesCounterLatest is ${serviceNameOpenFuturesCounterLatest}")
}

val driver =
Props.mode match {
case Props.RunModes.Production | Props.RunModes.Staging | Props.RunModes.Development => APIUtil.getPropsValue("db.driver") openOr "org.h2.Driver"
case Props.RunModes.Test => APIUtil.getPropsValue("db.driver") openOr "org.h2.Driver"
case _ => "org.h2.Driver"
}

val vendor =
Props.mode match {
case Props.RunModes.Production | Props.RunModes.Staging | Props.RunModes.Development =>
new CustomDBVendor(driver,
APIUtil.getPropsValue("db.url") openOr "jdbc:h2:lift_proto.db;AUTO_SERVER=TRUE",
APIUtil.getPropsValue("db.user"), APIUtil.getPropsValue("db.password"))
case Props.RunModes.Test =>
new CustomDBVendor(
driver,
APIUtil.getPropsValue("db.url") openOr Constant.h2DatabaseDefaultUrlValue,
APIUtil.getPropsValue("db.user").orElse(Empty),
APIUtil.getPropsValue("db.password").orElse(Empty)
)
case _ =>
new CustomDBVendor(
driver,
h2DatabaseDefaultUrlValue,
Empty, Empty)
}

}
27 changes: 19 additions & 8 deletions obp-api/src/main/scala/code/api/util/migration/Migration.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package code.api.util.migration

import bootstrap.liftweb.CustomDBVendor

import java.sql.{ResultSet, SQLException}
import java.text.SimpleDateFormat
import java.util.Date

import code.api.util.APIUtil.{getPropsAsBoolValue, getPropsValue}
import code.api.util.ErrorMessages.DatabaseConnectionClosedError
import code.api.util.{APIUtil, ApiPropsWithAlias}
import code.api.v4_0_0.DatabaseInfoJson
import code.consumer.Consumers
Expand All @@ -13,9 +15,9 @@ import code.customer.CustomerX
import code.migration.MigrationScriptLogProvider
import code.util.Helper.MdcLoggable
import com.github.dwickern.macros.NameOf.nameOf
import com.zaxxer.hikari.pool.ProxyConnection
import net.liftweb.mapper.Schemifier.getDefaultSchemaName
import net.liftweb.mapper.{BaseMetaMapper, DB, SuperConnection}
import net.liftweb.util.DefaultConnectionIdentifier

import scala.collection.immutable
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -524,12 +526,21 @@ object Migration extends MdcLoggable {
/**
* The purpose is to provide info about the database in mapper mode.
*/
def mapperDatabaseInfo(): DatabaseInfoJson = {
val connection = DB.use(DefaultConnectionIdentifier){ conn => conn}
val md = connection.getMetaData
val productName = md.getDatabaseProductName()
val productVersion = md.getDatabaseProductVersion()
DatabaseInfoJson(product_name = productName, product_version = productVersion)
def mapperDatabaseInfo(vendor: CustomDBVendor): DatabaseInfoJson = {
val connection = vendor.createOne.openOrThrowException(DatabaseConnectionClosedError)
try {
val md = connection.getMetaData
val productName = md.getDatabaseProductName()
val productVersion = md.getDatabaseProductVersion()
DatabaseInfoJson(product_name = productName, product_version = productVersion)
} finally {
try {
connection.asInstanceOf[ProxyConnection].close()
} catch {
case t: Throwable => logger.error(s"mapperDatabaseInfo.close connection throw exception, detail is: $t")
}
}

}

/**
Expand Down
2 changes: 1 addition & 1 deletion obp-api/src/main/scala/code/api/v4_0_0/APIMethods400.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ trait APIMethods400 {
cc =>
implicit val ec = EndpointContext(Some(cc))
Future {
(Migration.DbFunction.mapperDatabaseInfo(), HttpCode.`200`(cc.callContext))
(Migration.DbFunction.mapperDatabaseInfo(APIUtil.vendor), HttpCode.`200`(cc.callContext))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions obp-api/src/main/scala/code/bankconnectors/Connector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -955,12 +955,12 @@ trait Connector extends MdcLoggable {
def getStatus(challengeThresholdAmount: BigDecimal, transactionRequestCommonBodyAmount: BigDecimal, transactionRequestType: TransactionRequestType): Future[TransactionRequestStatus.Value] = {
Future(
if (transactionRequestCommonBodyAmount < challengeThresholdAmount) {
// For any connector != mapped we should probably assume that transaction_status_scheduler_delay will be > 0
// For any connector != mapped we should probably assume that transaction_request_status_scheduler_delay will be > 0
// so that getTransactionRequestStatusesImpl needs to be implemented for all connectors except mapped.
// i.e. if we are certain that saveTransaction will be honored immediately by the backend, then transaction_status_scheduler_delay
// i.e. if we are certain that saveTransaction will be honored immediately by the backend, then transaction_request_status_scheduler_delay
// can be empty in the props file. Otherwise, the status will be set to STATUS_PENDING
// and getTransactionRequestStatusesImpl needs to be run periodically to update the transaction request status.
if (APIUtil.getPropsAsLongValue("transaction_status_scheduler_delay").isEmpty || (transactionRequestType.value ==REFUND.toString))
if (APIUtil.getPropsAsLongValue("transaction_request_status_scheduler_delay").isEmpty || (transactionRequestType.value ==REFUND.toString))
TransactionRequestStatus.COMPLETED
else
TransactionRequestStatus.PENDING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4851,12 +4851,12 @@ object LocalMappedConnector extends Connector with MdcLoggable {
override def getStatus(challengeThresholdAmount: BigDecimal, transactionRequestCommonBodyAmount: BigDecimal, transactionRequestType: TransactionRequestType): Future[TransactionRequestStatus.Value] = {
Future(
if (transactionRequestCommonBodyAmount < challengeThresholdAmount && transactionRequestType.value != REFUND.toString) {
// For any connector != mapped we should probably assume that transaction_status_scheduler_delay will be > 0
// For any connector != mapped we should probably assume that transaction_request_status_scheduler_delay will be > 0
// so that getTransactionRequestStatusesImpl needs to be implemented for all connectors except mapped.
// i.e. if we are certain that saveTransaction will be honored immediately by the backend, then transaction_status_scheduler_delay
// i.e. if we are certain that saveTransaction will be honored immediately by the backend, then transaction_request_status_scheduler_delay
// can be empty in the props file. Otherwise, the status will be set to STATUS_PENDING
// and getTransactionRequestStatusesImpl needs to be run periodically to update the transaction request status.
if (APIUtil.getPropsAsLongValue("transaction_status_scheduler_delay").isEmpty)
if (APIUtil.getPropsAsLongValue("transaction_request_status_scheduler_delay").isEmpty)
TransactionRequestStatus.COMPLETED
else
TransactionRequestStatus.PENDING
Expand Down
Loading

0 comments on commit 860a33f

Please sign in to comment.