Skip to content

Commit

Permalink
Merge pull request #2278 from hongwei1/develop
Browse files Browse the repository at this point in the history
feauter/implement our own database logic
  • Loading branch information
simonredfern authored Sep 22, 2023
2 parents 3b1a50a + cb41c63 commit bd074dc
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 63 deletions.
3 changes: 3 additions & 0 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ write_connector_metrics=true
db.driver=org.h2.Driver
db.url=jdbc:h2:./lift_proto.db;NON_KEYWORDS=VALUE;DB_CLOSE_ON_EXIT=FALSE

#the default max database connection pool size is 10
db.maxPoolSize=10


#If you want to use the postgres , be sure to create your database and update the line below!
#db.driver=org.postgresql.Driver
Expand Down
8 changes: 5 additions & 3 deletions obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import code.metrics.{MappedConnectorMetric, MappedMetric, MetricArchive}
import code.migration.MigrationScriptLog
import code.model.dataAccess._
import code.model.dataAccess.internalMapping.AccountIdMapping
import code.model.{Consumer, _}
import code.model._
import code.obp.grpc.HelloWorldServer
import code.productAttributeattribute.MappedProductAttribute
import code.productcollection.MappedProductCollection
Expand All @@ -108,7 +108,7 @@ import code.productfee.ProductFee
import code.products.MappedProduct
import code.ratelimiting.RateLimiting
import code.remotedata.RemotedataActors
import code.scheduler.{DatabaseDriverScheduler, JobScheduler, MetricsArchiveScheduler}
import code.scheduler.{DatabaseDriverScheduler, JobScheduler, MetricsArchiveScheduler, DatabaseConnectionPoolScheduler}
import code.scope.{MappedScope, MappedUserScope}
import code.snippet.{OAuthAuthorisation, OAuthWorkedThanks}
import code.socialmedia.MappedSocialMedia
Expand Down Expand Up @@ -146,7 +146,7 @@ import net.liftweb.mapper._
import net.liftweb.sitemap.Loc._
import net.liftweb.sitemap._
import net.liftweb.util.Helpers._
import net.liftweb.util.{DefaultConnectionIdentifier, Helpers, Props, Schedule, _}
import net.liftweb.util.{DefaultConnectionIdentifier, _}
import org.apache.commons.io.FileUtils

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -263,6 +263,8 @@ class Boot extends MdcLoggable {
LiftRules.unloadHooks.append(vendor.closeAllConnections_! _)

DB.defineConnectionManager(net.liftweb.util.DefaultConnectionIdentifier, vendor)
DatabaseConnectionPoolScheduler.start(vendor, 10)// 10 seconds
// logger.debug("ThreadPoolConnectionsScheduler.start(vendor, 10)")
}

if (APIUtil.getPropsAsBoolValue("logging.database.queries.enable", false)) {
Expand Down
163 changes: 103 additions & 60 deletions obp-api/src/main/scala/bootstrap/liftweb/CustomDBVendor.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package bootstrap.liftweb

import code.api.util.APIUtil
import java.sql.{Connection, DriverManager}

import net.liftweb.common.{Box, Failure, Full, Logger}
Expand All @@ -22,7 +23,7 @@ class CustomDBVendor(driverName: String,

private val logger = Logger(classOf[CustomDBVendor])

protected def createOne: Box[Connection] = {
def createOne: Box[Connection] = {
tryo{t:Throwable => logger.error("Cannot load database driver: %s".format(driverName), t)}{Class.forName(driverName);()}

(dbUser, dbPassword) match {
Expand All @@ -36,37 +37,39 @@ class CustomDBVendor(driverName: String,

trait CustomProtoDBVendor extends ConnectionManager {
private val logger = Logger(classOf[CustomProtoDBVendor])
private var pool: List[Connection] = Nil
private var poolSize = 0
private var tempMaxSize = maxPoolSize
private var freePool: List[Connection] = Nil // no process use the connections, they are available for use
private var usedPool: List[Connection] = Nil // connections are already used, not available for use
// private var totalConnectionsCount = 0
// private var tempMaxSize = maxPoolSize

/**
* Override and set to false if the maximum pool size can temporarily be expanded to avoid pool starvation
* Override and set to false if the maximum freePool size can temporarily be expanded to avoid freePool starvation
*/
protected def allowTemporaryPoolExpansion = true
protected def allowTemporaryPoolExpansion = false

/**
* Override this method if you want something other than
* 20 connections in the pool
* Override this method if you want something other than 10 connections in the freePool and usedPool
* freePool.size + usedPool.size <=10
*/
protected def maxPoolSize = 20
val dbMaxPoolSize = APIUtil.getPropsAsIntValue("db.maxPoolSize",10)
protected def maxPoolSize = dbMaxPoolSize

/**
* The absolute maximum that this pool can extend to
* The absolute maximum that this freePool can extend to
* The default is 40. Override this method to change.
*/
protected def doNotExpandBeyond = 30
protected def doNotExpandBeyond = 40

/**
* The logic for whether we can expand the pool beyond the current size. By
* default, the logic tests allowTemporaryPoolExpansion &amp;&amp; poolSize &lt;= doNotExpandBeyond
* The logic for whether we can expand the freePool beyond the current size. By
* default, the logic tests allowTemporaryPoolExpansion &amp;&amp; totalConnectionsCount &lt;= doNotExpandBeyond
*/
protected def canExpand_? : Boolean = allowTemporaryPoolExpansion && poolSize <= doNotExpandBeyond
// protected def canExpand_? : Boolean = allowTemporaryPoolExpansion && totalConnectionsCount <= doNotExpandBeyond

/**
* How is a connection created?
*/
protected def createOne: Box[Connection]
def createOne: Box[Connection]

/**
* Test the connection. By default, setAutoCommit(false),
Expand All @@ -76,74 +79,114 @@ trait CustomProtoDBVendor extends ConnectionManager {
conn.setAutoCommit(false)
}

def newConnection(name: ConnectionIdentifier): Box[Connection] =
// Tail Recursive function in order to avoid Stack Overflow
// PLEASE NOTE: Changing this function you can break the above named feature
def newConnection(name: ConnectionIdentifier): Box[Connection] = {
val (connection: Box[Connection], needRecursiveAgain: Boolean) = commonPart(name)
needRecursiveAgain match {
case true => newConnection(name)
case false => connection
}
}


def commonPart(name: ConnectionIdentifier): (Box[Connection], Boolean) =
synchronized {
pool match {
case Nil if poolSize < tempMaxSize =>
val ret = createOne
ret.foreach(_.setAutoCommit(false))
poolSize = poolSize + 1
logger.debug("Created new pool entry. name=%s, poolSize=%d".format(name, poolSize))
ret

case Nil =>
val curSize = poolSize
logger.trace("No connection left in pool, waiting...")
wait(500L)
// if we've waited 50 ms and the pool is still empty, temporarily expand it
if (pool.isEmpty && poolSize == curSize && canExpand_?) {
tempMaxSize += 1
logger.debug("Temporarily expanding pool. name=%s, tempMaxSize=%d".format(name, tempMaxSize))
newConnection(name)
}else{
logger.error(s"The poolSize is expanding to tempMaxSize ($tempMaxSize), we can not create new connection, need to restart OBP now.")
Failure(s"Database may be down, please check database connection! OBP already create $tempMaxSize connections, because all connections are occupied!")
freePool match {
case Nil if (freePool.size + usedPool.size) < maxPoolSize =>{ //we set maxPoolSize 4.
val ret = createOne // get oneConnection from JDBC, not in the freePool yet, we add ot the Pool when we release it .
try {
ret.head.setAutoCommit(false) // we test the connection status, if it is success, we return it back.
usedPool = ret.head :: usedPool
logger.debug(s"Created connection is good, detail is $ret ")
} catch {
case e: Exception =>
logger.debug(s"Created connection is bad, detail is $e")
}

case x :: xs =>
logger.trace("Found connection in pool, name=%s".format(name))
pool = xs

//Note: we may return the invalid connection
(ret, false)
}

case Nil => //freePool is empty and we are at maxPoolSize limit
wait(50L)
logger.error(s"The (freePool.size + usedPool.size) is at the limit ($maxPoolSize) and there are no free connections.")
(
Failure(s"The (freePool.size + usedPool.size) is at the limit ($maxPoolSize) and there are no free connections."),
true
)

case freeHead :: freeTail =>//if freePool is not empty, we just get connection from freePool, no need to create new connection from JDBC.
logger.trace("Found connection in freePool, name=%s freePool size =%s".format(name, freePool.size))

freePool = freeTail // remove the head from freePool
//TODO check if we need add head or tail
usedPool = freeHead :: usedPool // we added connection to usedPool

try {
this.testConnection(x)
Full(x)
this.testConnection(freeHead) // we test the connection status, if it is success, we return it back.
(Full(freeHead),false)
} catch {
case e: Exception => try {
logger.debug("Test connection failed, removing connection from pool, name=%s".format(name))
poolSize = poolSize - 1
tryo(x.close)
newConnection(name)
logger.error(s"testConnection failed, try to close it and call newConnection(name), detail is $e")
tryo(freeHead.close) // call JDBC to close this connection
(
Failure(s"testConnection failed, try to close it and call newConnection(name), detail is $e"),
true
)
} catch {
case e: Exception => newConnection(name)
case e: Exception =>{
logger.error(s"could not close connection and call newConnection(name), detail is $e")
(
Failure(s"could not close connection and call newConnection(name), detail is $e"),
true
)
}
}
}
}
}

def releaseConnection(conn: Connection): Unit = synchronized {
if (tempMaxSize > maxPoolSize) {
tryo {conn.close()}
tempMaxSize -= 1
poolSize -= 1
} else {
pool = conn :: pool
}
logger.debug("Released connection. poolSize=%d".format(poolSize))
usedPool = usedPool.filterNot(_ ==conn)
logger.debug(s"Released connection. removed connection from usedPool size is ${usedPool.size}")
//TODO check if we need add head or tail
freePool = conn :: freePool
logger.debug(s"Released connection. added connection to freePool size is ${freePool.size}")
notifyAll
}

def closeAllConnections_!(): Unit = _closeAllConnections_!(0)


private def _closeAllConnections_!(cnt: Int): Unit = synchronized {
logger.info("Closing all connections")
if (poolSize <= 0 || cnt > 10) ()
logger.debug(s"Closing all connections, try the $cnt time")
if (cnt > 10) ()//we only try this 10 times,
else {
pool.foreach {c => tryo(c.close); poolSize -= 1}
pool = Nil
freePool.foreach {c => tryo(c.close);}
usedPool.foreach {c => tryo(c.close);}
freePool = Nil
usedPool = Nil

if (poolSize > 0) wait(250)
if (usedPool.length > 0 || freePool.length > 0) wait(250)

_closeAllConnections_!(cnt + 1)
}
}


//This is only for debugging
def logAllConnectionsStatus = {
logger.debug(s"Hello from logAllConnectionsStatus: usedPool.size is ${usedPool.length}, freePool.size is ${freePool.length}")
for {
usedConnection <- usedPool
} yield {
logger.debug(s"usedConnection (${usedConnection.toString}): isClosed-${usedConnection.isClosed}, getWarnings-${usedConnection.getWarnings}")
}
for {
freeConnection <- freePool
} yield {
logger.debug(s"freeConnection (${freeConnection.toString}): isClosed-${freeConnection.isClosed}, getWarnings-${freeConnection.getWarnings}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package code.scheduler

import bootstrap.liftweb.CustomProtoDBVendor
import code.actorsystem.ObpLookupSystem
import code.util.Helper.MdcLoggable
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._


object DatabaseConnectionPoolScheduler extends MdcLoggable {

private lazy val actorSystem = ObpLookupSystem.obpLookupSystem
implicit lazy val executor = actorSystem.dispatcher
private lazy val scheduler = actorSystem.scheduler

def start(vendor: CustomProtoDBVendor, interval: Long): Unit = {
scheduler.schedule(
initialDelay = Duration(interval, TimeUnit.SECONDS),
interval = Duration(interval, TimeUnit.SECONDS),
runnable = new Runnable {
def run(): Unit = {
clearAllConnections(vendor)
// vendor.logAllConnectionsStatus //This is only to be used for debugging .
}
}
)
}

def clearAllConnections(vendor: CustomProtoDBVendor) = {
val connectionBox = vendor.createOne
try {
if (connectionBox.isEmpty) {
vendor.closeAllConnections_!()
logger.debug("ThreadPoolConnectionsScheduler.clearAllConnections")
}
} catch {
case e => logger.debug(s"ThreadPoolConnectionsScheduler.clearAllConnections() method throwed exception, details is $e")
}finally {
try {
if (connectionBox.isDefined)
connectionBox.head.close()
} catch {
case e =>logger.debug(s"ThreadPoolConnectionsScheduler.clearAllConnections.close method throwed exception, details is $e")
}
}
}


}
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Most recent changes at top of file
```
Date Commit Action
22/09/2023 752ff04b Added props db.maxPoolSize, default is 10.
24/08/2023 bcb8fcfd Added props expectedOpenFuturesPerService, default is 100.
16/08/2023 4d8dfa66 Added props short_endpoint_timeout, default is 1.
Added props medium_endpoint_timeout, default is 7.
Expand Down

0 comments on commit bd074dc

Please sign in to comment.