Skip to content

Commit

Permalink
Merge pull request #2250 from hongwei1/feature/addedFutureWithLimits
Browse files Browse the repository at this point in the history
Feature/added futureWithLimits method
  • Loading branch information
simonredfern authored Aug 24, 2023
2 parents 5247b4e + ba71328 commit 5cff441
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 69 deletions.
6 changes: 5 additions & 1 deletion obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -1239,4 +1239,8 @@ retain_metrics_move_limit = 50000
retain_metrics_scheduler_interval_in_seconds = 3600

#if same session used for different ip address, we can show this warning, default is false.
show_ip_address_change_warning=false
show_ip_address_change_warning=false


#the default expected Open Futures Per Service for the BackOffFactor parameter
expectedOpenFuturesPerService=100
2 changes: 1 addition & 1 deletion obp-api/src/main/scala/code/api/OBPRestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ trait OBPRestHelper extends RestHelper with MdcLoggable {
failIfBadJSON(r, handler)
}
val endTime = Helpers.now
logAPICall(startTime, endTime.getTime - startTime.getTime, rd)
writeEndpointMetric(startTime, endTime.getTime - startTime.getTime, rd)
response
}
def isDefinedAt(r : Req) = {
Expand Down
56 changes: 47 additions & 9 deletions obp-api/src/main/scala/code/api/util/APIUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
}
}

def logAPICall(callContext: Option[CallContextLight]) = {
def writeEndpointMetric(callContext: Option[CallContextLight]) = {
callContext match {
case Some(cc) =>
if(getPropsAsBoolValue("write_metrics", false)) {
Expand Down Expand Up @@ -353,7 +353,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
}
}

def logAPICall(date: TimeSpan, duration: Long, rd: Option[ResourceDoc]) = {
def writeEndpointMetric(date: TimeSpan, duration: Long, rd: Option[ResourceDoc]) = {
val authorization = S.request.map(_.header("Authorization")).flatten
val directLogin: Box[String] = S.request.map(_.header("DirectLogin")).flatten
if(getPropsAsBoolValue("write_metrics", false)) {
Expand Down Expand Up @@ -2465,7 +2465,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
}


def saveConnectorMetric[R](blockOfCode: => R)(nameOfFunction: String = "")(implicit nameOfConnector: String): R = {
def writeMetricEndpointTiming[R](blockOfCode: => R)(nameOfFunction: String = "")(implicit nameOfConnector: String): R = {
val t0 = System.currentTimeMillis()
// call-by-name
val result = blockOfCode
Expand All @@ -2480,7 +2480,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
result
}

def logEndpointTiming[R](callContext: Option[CallContextLight])(blockOfCode: => R): R = {
def writeMetricEndpointTiming[R](callContext: Option[CallContextLight])(blockOfCode: => R): R = {
val result = blockOfCode
// call-by-name
val endTime = Helpers.now
Expand All @@ -2491,7 +2491,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
case _ =>
// There are no enough information for logging
}
logAPICall(callContext.map(_.copy(endTime = Some(endTime))))
writeEndpointMetric(callContext.map(_.copy(endTime = Some(endTime))))
result
}

Expand Down Expand Up @@ -2834,7 +2834,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
def futureToResponse[T](in: LAFuture[(T, Option[CallContext])]): JsonResponse = {
RestContinuation.async(reply => {
in.onSuccess(
t => logEndpointTiming(t._2.map(_.toLight))(reply.apply(successJsonResponseNewStyle(cc = t._1, t._2)(getHeadersNewStyle(t._2.map(_.toLight)))))
t => writeMetricEndpointTiming(t._2.map(_.toLight))(reply.apply(successJsonResponseNewStyle(cc = t._1, t._2)(getHeadersNewStyle(t._2.map(_.toLight)))))
)
in.onFail {
case Failure(_, Full(JsonResponseException(jsonResponse)), _) =>
Expand All @@ -2847,7 +2847,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
extractAPIFailureNewStyle(msg) match {
case Some(af) =>
val callContextLight = af.ccl.map(_.copy(httpCode = Some(af.failCode)))
logEndpointTiming(callContextLight)(reply.apply(errorJsonResponse(af.failMsg, af.failCode, callContextLight)(getHeadersNewStyle(af.ccl))))
writeMetricEndpointTiming(callContextLight)(reply.apply(errorJsonResponse(af.failMsg, af.failCode, callContextLight)(getHeadersNewStyle(af.ccl))))
case _ =>
val errorResponse: JsonResponse = errorJsonResponse(msg)
reply.apply(errorResponse)
Expand Down Expand Up @@ -2884,7 +2884,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
case (Full(jsonResponse: JsonResponse), _: Option[_]) =>
reply(jsonResponse)
case t => Full(
logEndpointTiming(t._2.map(_.toLight))(
writeMetricEndpointTiming(t._2.map(_.toLight))(
reply.apply(successJsonResponseNewStyle(t._1, t._2)(getHeadersNewStyle(t._2.map(_.toLight))))
)
)
Expand All @@ -2908,7 +2908,7 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
extractAPIFailureNewStyle(msg) match {
case Some(af) =>
val callContextLight = af.ccl.map(_.copy(httpCode = Some(af.failCode)))
Full(logEndpointTiming(callContextLight)(reply.apply(errorJsonResponse(af.failMsg, af.failCode, callContextLight)(getHeadersNewStyle(af.ccl)))))
Full(writeMetricEndpointTiming(callContextLight)(reply.apply(errorJsonResponse(af.failMsg, af.failCode, callContextLight)(getHeadersNewStyle(af.ccl)))))
case _ =>
val errorResponse: JsonResponse = errorJsonResponse(msg)
Full((reply.apply(errorResponse)))
Expand Down Expand Up @@ -4665,5 +4665,43 @@ object APIUtil extends MdcLoggable with CustomJsonFormats{
* @return
*/
def `checkIfContains::::` (value: String) = value.contains("::::")

val expectedOpenFuturesPerService = APIUtil.getPropsAsIntValue("expectedOpenFuturesPerService", 100)
def getBackOffFactor (openCalls: Int) = openCalls match {
case x if x < expectedOpenFuturesPerService*1 => 1 // i.e. every call will get passed through
case x if x < expectedOpenFuturesPerService*2 => 2
case x if x < expectedOpenFuturesPerService*3 => 4
case x if x < expectedOpenFuturesPerService*4 => 8
case x if x < expectedOpenFuturesPerService*5 => 16
case x if x < expectedOpenFuturesPerService*6 => 32
case x if x < expectedOpenFuturesPerService*7 => 64
case x if x < expectedOpenFuturesPerService*8 => 128
case x if x < expectedOpenFuturesPerService*9 => 256
case _ => 1024 // the default, catch-all
}

type serviceNameOpenCallsCounterInt = Int
type serviceNameCounterInt = Int
val serviceNameCountersMap = new ConcurrentHashMap[String, (serviceNameCounterInt, serviceNameOpenCallsCounterInt)]

def canOpenFuture(serviceName :String) = {
val (serviceNameCounter, serviceNameOpenCallsCounter) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))
serviceNameCounter % getBackOffFactor(serviceNameOpenCallsCounter) == 0
}

def incrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenCallsCounter) = serviceNameCountersMap.getOrDefault(serviceName,(0,0))
serviceNameCountersMap.put(serviceName,(serviceNameCounter + 1,serviceNameOpenCallsCounter+1))
if(serviceNameOpenCallsCounter>=expectedOpenFuturesPerService) {
logger.warn(s"incrementFutureCounter says: current service($serviceName) open future is ${serviceNameOpenCallsCounter+1}, which is over expectedOpenFuturesPerService($expectedOpenFuturesPerService)")
}
logger.debug(s"incrementFutureCounter says: serviceName is $serviceName, serviceNameCounter is $serviceNameCounter, serviceNameOpenCallsCounter is $serviceNameOpenCallsCounter")
}

def decrementFutureCounter(serviceName:String) = {
val (serviceNameCounter, serviceNameOpenCallsCounter) = serviceNameCountersMap.getOrDefault(serviceName, (0, 1))
serviceNameCountersMap.put(serviceName, (serviceNameCounter, serviceNameOpenCallsCounter - 1))
logger.debug(s"decrementFutureCounter says: serviceName is $serviceName, serviceNameCounter is $serviceNameCounter, serviceNameOpenCallsCounter is $serviceNameOpenCallsCounter")
}

}
1 change: 1 addition & 0 deletions obp-api/src/main/scala/code/api/util/ErrorMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ object ErrorMessages {
val InvalidJsonValue = "OBP-10035: Incorrect json value."
val InvalidHttpMethod = "OBP-10037: Incorrect http_method."
val InvalidHttpProtocol = "OBP-10038: Incorrect http_protocol."
val ServiceIsTooBusy = "OBP-10040: The Service is too busy, please try it later."

// General Sort and Paging
val FilterSortDirectionError = "OBP-10023: obp_sort_direction parameter can only take two values: DESC or ASC!" // was OBP-20023
Expand Down
23 changes: 23 additions & 0 deletions obp-api/src/main/scala/code/api/util/FutureUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.{Timer, TimerTask}

import code.api.{APIFailureNewStyle, Constant}
import net.liftweb.json.{Extraction, JsonAST}
import code.api.util.APIUtil.{decrementFutureCounter, incrementFutureCounter}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps
Expand Down Expand Up @@ -72,4 +73,26 @@ object FutureUtil {
p.future
}

def futureWithLimits[T](future: Future[T], serviceName: String)(implicit ec: ExecutionContext): Future[T] = {

incrementFutureCounter(serviceName)

// Promise will be fulfilled with either the callers Future
val p = Promise[T]

future.map {
result =>
if (p.trySuccess(result)) {
decrementFutureCounter(serviceName)
}
}.recover {
case e: Exception =>
if (p.tryFailure(e)) {
decrementFutureCounter(serviceName)
}
}

p.future
}

}
8 changes: 7 additions & 1 deletion obp-api/src/main/scala/code/api/v3_0_0/APIMethods300.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2378,7 +2378,13 @@ trait APIMethods300 {
cc =>
for {
(_, callContext) <- anonymousAccess(cc)
(banks, callContext) <- NewStyle.function.getBanks(callContext)
(banks, callContext) <- if(canOpenFuture("NewStyle.function.getBanks")) {
FutureUtil.futureWithLimits(NewStyle.function.getBanks(callContext), "NewStyle.function.getBanks")
} else {
Future {
throw new RuntimeException(ServiceIsTooBusy +"Current Service(NewStyle.function.getBanks) ")
}
}
} yield
(JSONFactory300.createBanksJson(banks), HttpCode.`200`(callContext))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ object LocalMappedConnector extends Connector with MdcLoggable {
}

//gets a particular bank handled by this connector
override def getBankLegacy(bankId: BankId, callContext: Option[CallContext]): Box[(Bank, Option[CallContext])] = saveConnectorMetric {
override def getBankLegacy(bankId: BankId, callContext: Option[CallContext]): Box[(Bank, Option[CallContext])] = writeMetricEndpointTiming {
getMappedBank(bankId).map(bank => (bank, callContext))
}("getBank")

Expand All @@ -526,7 +526,7 @@ object LocalMappedConnector extends Connector with MdcLoggable {
}


override def getBanksLegacy(callContext: Option[CallContext]): Box[(List[Bank], Option[CallContext])] = saveConnectorMetric {
override def getBanksLegacy(callContext: Option[CallContext]): Box[(List[Bank], Option[CallContext])] = writeMetricEndpointTiming {
Full(MappedBank
.findAll()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import java.util.Date
import java.util.UUID.randomUUID
import _root_.akka.stream.StreamTcpException
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpProtocol, _}
import akka.http.scaladsl.model._
import akka.util.ByteString
import code.api.APIFailureNewStyle
import code.api.ResourceDocs1_4_0.MessageDocsSwaggerDefinitions
import code.api.cache.Caching
import code.api.dynamic.endpoint.helper.MockResponseHolder
import code.api.util.APIUtil.{AdapterImplementation, MessageDoc, OBPReturnType, saveConnectorMetric, _}
import code.api.util.APIUtil._
import code.api.util.ErrorMessages._
import code.api.util.ExampleValue._
import code.api.util.RSAUtil.{computeXSign, getPrivateKeyFromString}
Expand All @@ -47,14 +47,14 @@ import code.model.dataAccess.internalMapping.MappedAccountIdMappingProvider
import code.util.AkkaHttpClient._
import code.util.Helper
import code.util.Helper.MdcLoggable
import com.openbankproject.commons.dto.{InBoundTrait, _}
import com.openbankproject.commons.dto._
import com.openbankproject.commons.model.enums.StrongCustomerAuthentication.SCA
import com.openbankproject.commons.model.enums.StrongCustomerAuthenticationStatus.SCAStatus
import com.openbankproject.commons.model.enums.{AccountAttributeType, CardAttributeType, ChallengeType, CustomerAttributeType, DynamicEntityOperation, ProductAttributeType, StrongCustomerAuthentication, TransactionAttributeType}
import com.openbankproject.commons.model.{ErrorMessage, TopicTrait, _}
import com.openbankproject.commons.model.{Meta, _}
import com.openbankproject.commons.util.{JsonUtils, ReflectUtils}
import com.tesobe.{CacheKeyFromArguments, CacheKeyOmit}
import net.liftweb.common.{Box, Empty, _}
import net.liftweb.common._
import net.liftweb.json
import net.liftweb.json.Extraction.decompose
import net.liftweb.json.JsonDSL._
Expand Down
Loading

0 comments on commit 5cff441

Please sign in to comment.