Skip to content

Commit

Permalink
Merge pull request #2247 from constantine2nd/develop
Browse files Browse the repository at this point in the history
Request Timeout, ScalikeJDBC
  • Loading branch information
simonredfern committed Aug 16, 2023
2 parents 2a353ee + ac53cb9 commit 4a5e61e
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 153 deletions.
6 changes: 4 additions & 2 deletions obp-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@
<artifactId>postgresql</artifactId>
<version>42.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8-production -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>21.5.0.0</version>
<artifactId>ojdbc8-production</artifactId>
<version>23.2.0.0</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
<dependency>
Expand Down
8 changes: 8 additions & 0 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ read_authentication_type_validation_requires_role=false
## enable logging all the database queries in log file
#logging.database.queries.enable=true

## enable logging all the database queries in log file
#database_query_timeout_in_seconds=

## Define endpoint timeouts in miliseconds
short_endpoint_timeout = 1000
medium_endpoint_timeout = 7000
long_endpoint_timeout = 60000

##Added Props property_name_prefix, default is OBP_. This adds the prefix only for the system environment property name, eg: db.driver --> OBP_db.driver
#system_environment_property_name_prefix=OBP_

Expand Down
52 changes: 41 additions & 11 deletions obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ import com.openbankproject.commons.util.Functions.Implicits._
import com.openbankproject.commons.util.{ApiVersion, Functions}
import javax.mail.internet.MimeMessage
import net.liftweb.common._
import net.liftweb.db.DBLogEntry
import net.liftweb.db.{DB, DBLogEntry}
import net.liftweb.http.LiftRules.DispatchPF
import net.liftweb.http._
import net.liftweb.http.provider.HTTPCookie
Expand All @@ -149,7 +149,7 @@ import net.liftweb.util.Helpers._
import net.liftweb.util.{DefaultConnectionIdentifier, Helpers, Props, Schedule, _}
import org.apache.commons.io.FileUtils

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}

/**
* A class that's instantiated early and run. It allows the application
Expand Down Expand Up @@ -279,6 +279,25 @@ class Boot extends MdcLoggable {
}
}
}

// Database query timeout
APIUtil.getPropsValue("database_query_timeout_in_seconds").map { timeoutInSeconds =>
tryo(timeoutInSeconds.toInt).isDefined match {
case true =>
DB.queryTimeout = Full(timeoutInSeconds.toInt)
logger.info(s"Query timeout database_query_timeout_in_seconds is set to ${timeoutInSeconds} seconds")
case false =>
logger.error(
s"""
|------------------------------------------------------------------------------------
|Query timeout database_query_timeout_in_seconds [${timeoutInSeconds}] is not an integer value.
|Actual DB.queryTimeout value: ${DB.queryTimeout}
|------------------------------------------------------------------------------------""".stripMargin)
}

}


implicit val formats = CustomJsonFormats.formats
LiftRules.statelessDispatch.prepend {
case _ if tryo(DB.use(DefaultConnectionIdentifier){ conn => conn}.isClosed).isEmpty =>
Expand Down Expand Up @@ -835,15 +854,26 @@ class Boot extends MdcLoggable {

// create Hydra client if exists active consumer but missing Hydra client
def createHydraClients() = {
import scala.concurrent.ExecutionContext.Implicits.global
// exists hydra clients id
val oAuth2ClientIds = HydraUtil.hydraAdmin.listOAuth2Clients(Long.MaxValue, 0L).stream()
.map[String](_.getClientId)
.collect(Collectors.toSet())

Consumers.consumers.vend.getConsumersFuture().foreach{ consumers =>
consumers.filter(consumer => consumer.isActive.get && !oAuth2ClientIds.contains(consumer.key.get))
.foreach(HydraUtil.createHydraClient(_))
try {
import scala.concurrent.ExecutionContext.Implicits.global
// exists hydra clients id
val oAuth2ClientIds = HydraUtil.hydraAdmin.listOAuth2Clients(Long.MaxValue, 0L).stream()
.map[String](_.getClientId)
.collect(Collectors.toSet())

Consumers.consumers.vend.getConsumersFuture().foreach{ consumers =>
consumers.filter(consumer => consumer.isActive.get && !oAuth2ClientIds.contains(consumer.key.get))
.foreach(HydraUtil.createHydraClient(_))
}
} catch {
case e: Exception =>
if(HydraUtil.integrateWithHydra) {
logger.error("------------------------------ Mirror consumer in hydra issue ------------------------------")
e.printStackTrace()
} else {
logger.warn("------------------------------ Mirror consumer in hydra issue ------------------------------")
logger.warn(e)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions obp-api/src/main/scala/code/api/constant/constant.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ object Constant extends MdcLoggable {
final val limit = 500
}

final val shortEndpointTimeoutInMillis = APIUtil.getPropsAsLongValue(nameOfProperty = "short_endpoint_timeout", 1L * 1000L)
final val mediumEndpointTimeoutInMillis = APIUtil.getPropsAsLongValue(nameOfProperty = "medium_endpoint_timeout", 7L * 1000L)
final val longEndpointTimeoutInMillis = APIUtil.getPropsAsLongValue(nameOfProperty = "long_endpoint_timeout", 60L * 1000L)

final val h2DatabaseDefaultUrlValue = "jdbc:h2:mem:OBPTest_H2_v2.1.214;NON_KEYWORDS=VALUE;DB_CLOSE_DELAY=10"

final val HostName = APIUtil.getPropsValue("hostname").openOrThrowException(ErrorMessages.HostnameNotSpecified)
Expand Down Expand Up @@ -102,6 +106,7 @@ object ResponseHeader {
final lazy val `WWW-Authenticate` = "WWW-Authenticate"
final lazy val ETag = "ETag"
final lazy val `Cache-Control` = "Cache-Control"
final lazy val Connection = "Connection"
}

object BerlinGroup extends Enumeration {
Expand Down
10 changes: 8 additions & 2 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ import org.apache.commons.lang3.StringUtils
import java.security.AccessControlException
import java.util.regex.Pattern

import code.api.util.FutureUtil.EndpointTimeout
import code.etag.MappedETag
import code.users.Users
import net.liftweb.mapper.By
Expand Down Expand Up @@ -807,6 +808,9 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
def check401(message: String): Boolean = {
message.contains(extractErrorMessageCode(UserNotLoggedIn))
}
def check408(message: String): Boolean = {
message.contains(extractErrorMessageCode(requestTimeout))
}
val (code, responseHeaders) =
message match {
case msg if check401(msg) =>
Expand All @@ -816,6 +820,8 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
(401, getHeaders() ::: headers.list ::: addHeader)
case msg if check403(msg) =>
(403, getHeaders() ::: headers.list)
case msg if check408(msg) =>
(408, getHeaders() ::: headers.list ::: List((ResponseHeader.Connection, "close")))
case _ =>
(httpCode, getHeaders() ::: headers.list)
}
Expand Down Expand Up @@ -2950,8 +2956,8 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
* @tparam T
* @return
*/
implicit def scalaFutureToBoxedJsonResponse[T](scf: OBPReturnType[T])(implicit m: Manifest[T]): Box[JsonResponse] = {
futureToBoxedResponse(scalaFutureToLaFuture(scf))
implicit def scalaFutureToBoxedJsonResponse[T](scf: OBPReturnType[T])(implicit t: EndpointTimeout, m: Manifest[T]): Box[JsonResponse] = {
futureToBoxedResponse(scalaFutureToLaFuture(FutureUtil.futureWithTimeout(scf)))
}


Expand Down
4 changes: 4 additions & 0 deletions obp-api/src/main/scala/code/api/util/ErrorMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ object ErrorMessages {
val NoValidElasticsearchIndicesConfigured = "OBP-00011: No elasticsearch indices are allowed on this instance. Please set es.warehouse.allowed.indices = index1,index2 (or = ALL for all). "
val CustomerFirehoseNotAllowedOnThisInstance = "OBP-00012: Customer firehose is not allowed on this instance. Please set allow_customer_firehose = true in props files. "

// Exceptions (OBP-01XXX) ------------------------------------------------>
val requestTimeout = "OBP-01000: Request Timeout. The OBP API decided to return a timeout. This is probably because a backend service did not respond in time. "
// <------------------------------------------------ Exceptions (OBP-01XXX)

// WebUiProps Exceptions (OBP-08XXX)
val InvalidWebUiProps = "OBP-08001: Incorrect format of name."
val WebUiPropsNotFound = "OBP-08002: WebUi props not found. Please specify a valid value for WEB_UI_PROPS_ID."
Expand Down
64 changes: 64 additions & 0 deletions obp-api/src/main/scala/code/api/util/FutureUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package code.api.util

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import code.api.Constant

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

// All Future's that use futureWithTimeout will use the same Timer object
// it is thread safe and scales to thousands of active timers
// The true parameter ensures that timeout timers are daemon threads and do not stop
// the program from shutting down

val timer: Timer = new Timer(true)

case class EndpointTimeout(inMillis: Long)

implicit val defaultTimeout: EndpointTimeout = EndpointTimeout(Constant.longEndpointTimeoutInMillis)

/**
* Returns the result of the provided future within the given time or a timeout exception, whichever is first
* This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
* Thread.sleep would
* @param future Caller passes a future to execute
* @param timeout Time before we return a Timeout exception instead of future's outcome
* @return Future[T]
*/
def futureWithTimeout[T](future : Future[T])(implicit timeout : EndpointTimeout, ec: ExecutionContext): Future[T] = {

// Promise will be fulfilled with either the callers Future or the timer task if it times out
var p = Promise[T]

// and a Timer task to handle timing out

val timerTask = new TimerTask() {
def run() : Unit = {
p.tryFailure(new TimeoutException(ErrorMessages.requestTimeout))
}
}

// Set the timeout to check in the future
timer.schedule(timerTask, timeout.inMillis)

future.map {
a =>
if(p.trySuccess(a)) {
timerTask.cancel()
}
}
.recover {
case e: Exception =>
if(p.tryFailure(e)) {
timerTask.cancel()
}
}

p.future
}

}
52 changes: 47 additions & 5 deletions obp-api/src/main/scala/code/api/v5_1_0/APIMethods510.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package code.api.v5_1_0


import code.api.Constant
import code.api.ResourceDocs1_4_0.SwaggerDefinitionsJSON.{apiCollectionJson400, apiCollectionsJson400, apiInfoJson400, postApiCollectionJson400, revokedConsentJsonV310, _}
import code.api.util.APIUtil._
import code.api.util.ApiRole._
import code.api.util.ApiTag._
import code.api.util.ErrorMessages.{$UserNotLoggedIn, BankNotFound, ConsentNotFound, InvalidJsonFormat, UnknownError, UserNotFoundByUserId, UserNotLoggedIn, _}
import code.api.util.FutureUtil.EndpointTimeout
import code.api.util.NewStyle.HttpCode
import code.api.util._
import code.api.v2_0_0.{EntitlementJSONs, JSONFactory200}
import code.api.v3_0_0.JSONFactory300
import code.api.v3_0_0.JSONFactory300.createAggregateMetricJson
import code.api.v3_1_0.ConsentJsonV310
Expand All @@ -24,12 +27,13 @@ import code.transactionrequests.TransactionRequests.TransactionRequestTypes.{app
import code.userlocks.UserLocksProvider
import code.users.Users
import code.util.Helper
import code.views.Views
import code.views.system.{AccountAccess, ViewDefinition}
import com.github.dwickern.macros.NameOf.nameOf
import com.openbankproject.commons.ExecutionContext.Implicits.global
import com.openbankproject.commons.dto.CustomerAndAttribute
import com.openbankproject.commons.model.enums.{AtmAttributeType, UserAttributeType}
import com.openbankproject.commons.model.{AtmId, AtmT, BankId}
import com.openbankproject.commons.model.{AtmId, AtmT, BankId, Permission}
import com.openbankproject.commons.util.{ApiVersion, ScannedApiVersion}
import net.liftweb.common.{Box, Full}
import net.liftweb.http.S
Expand Down Expand Up @@ -110,13 +114,14 @@ trait APIMethods510 {
lazy val waitingForGodot: OBPEndpoint = {
case "waiting-for-godot" :: Nil JsonGet _ => {
cc =>
implicit val timeout = EndpointTimeout(Constant.mediumEndpointTimeoutInMillis) // Set endpoint timeout explicitly
for {
httpParams <- NewStyle.function.extractHttpParamsFromUrl(cc.url)
} yield {
val sleep: String = httpParams.filter(_.name == "sleep").headOption
sleep: String = httpParams.filter(_.name == "sleep").headOption
.map(_.values.headOption.getOrElse("0")).getOrElse("0")
val sleepInMillis: Long = tryo(sleep.trim.toLong).getOrElse(0)
Thread.sleep(sleepInMillis)
sleepInMillis: Long = tryo(sleep.trim.toLong).getOrElse(0)
_ <- Future(Thread.sleep(sleepInMillis))
} yield {
(JSONFactory510.waitingForGodot(sleepInMillis), HttpCode.`200`(cc.callContext))
}
}
Expand Down Expand Up @@ -285,6 +290,43 @@ trait APIMethods510 {
}
}
}



staticResourceDocs += ResourceDoc(
getEntitlementsAndPermissions,
implementedInApiVersion,
"getEntitlementsAndPermissions",
"GET",
"/users/USER_ID/entitlements-and-permissions",
"Get Entitlements and Permissions for a User",
s"""
|
|
""".stripMargin,
EmptyBody,
userJsonV300,
List(
$UserNotLoggedIn,
UserNotFoundByUserId,
UserHasMissingRoles,
UnknownError),
List(apiTagRole, apiTagEntitlement, apiTagUser),
Some(List(canGetEntitlementsForAnyUserAtAnyBank)))


lazy val getEntitlementsAndPermissions: OBPEndpoint = {
case "users" :: userId :: "entitlements-and-permissions" :: Nil JsonGet _ => {
cc =>
for {
(user, callContext) <- NewStyle.function.getUserByUserId(userId, cc.callContext)
entitlements <- NewStyle.function.getEntitlementsByUserId(userId, callContext)
} yield {
val permissions: Option[Permission] = Views.views.vend.getPermissionForUser(user).toOption
(JSONFactory300.createUserInfoJSON (user, entitlements, permissions), HttpCode.`200`(callContext))
}
}
}


staticResourceDocs += ResourceDoc(
Expand Down
Loading

0 comments on commit 4a5e61e

Please sign in to comment.