Skip to content

Commit

Permalink
Merge pull request #612 from MAIF/enhancement/use-otoroshi-elasticsearch
Browse files Browse the repository at this point in the history
Enhancement/use otoroshi elasticsearch
  • Loading branch information
quentinovega authored Nov 2, 2023
2 parents d0f48ac + 82a0bc9 commit 39b330f
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 54 deletions.
64 changes: 64 additions & 0 deletions daikoku/app/audit/audit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import akka.kafka.ProducerSettings
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.{Done, NotUsed}
import cats.data.EitherT
import controllers.AppError
import diffson.DiffOps
import fr.maif.otoroshi.daikoku.audit.config.{ElasticAnalyticsConfig, Webhook}
import fr.maif.otoroshi.daikoku.domain._
import fr.maif.otoroshi.daikoku.env.Env
Expand Down Expand Up @@ -663,6 +666,7 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) {
private def urlFromPath(path: String): String = s"${config.clusterUri}$path"
private val index: String = config.index.getOrElse("otoroshi-events")
private val `type`: String = config.`type`.getOrElse("event")
private val searchUri = urlFromPath(s"/$index*/_search")
private implicit val mat = env.defaultMaterializer

private def url(url: String): WSRequest = {
Expand Down Expand Up @@ -788,6 +792,66 @@ class ElasticWritesAnalytics(config: ElasticAnalyticsConfig, env: Env) {
.runWith(Sink.ignore)
.map(_ => ())
}

def query(query: JsObject)(implicit ec: ExecutionContext): EitherT[Future, AppError, JsValue] = {
if (logger.isDebugEnabled) logger.debug(s"Query to Elasticsearch: $searchUri")
if (logger.isDebugEnabled) logger.debug(s"Query to Elasticsearch: ${Json.prettyPrint(query)}")

EitherT(url(searchUri)
.addHttpHeaders(config.headers.toSeq: _*)
.post(query)
.map { resp =>
resp.status match {
case 200 => Right[AppError, JsValue](resp.json)
case _ =>
Left[AppError, JsValue](AppError.InternalServerError(s"Error during es request: \n * ${resp.body}, \nquery was \n * $query"))
}
})
}
}

class ElasticReadsAnalytics(config: ElasticAnalyticsConfig, env: Env) {

lazy val logger = Logger("audit-reads-elastic")

private def urlFromPath(path: String): String = s"${config.clusterUri}$path"
private val index: String = config.index.getOrElse("otoroshi-events")
private val `type`: String = config.`type`.getOrElse("event")
private val searchUri = urlFromPath(s"/$index*/_search")
private implicit val mat = env.defaultMaterializer

private def url(url: String): WSRequest = {
val builder = env.wsClient.url(url)
authHeader()
.fold(builder) { h =>
builder.withHttpHeaders("Authorization" -> h)
}
.addHttpHeaders(config.headers.toSeq: _*)
}

private def authHeader(): Option[String] = {
for {
user <- config.user
password <- config.password
} yield
s"Basic ${Base64.getEncoder.encodeToString(s"$user:$password".getBytes())}"
}

def query(query: JsObject)(implicit ec: ExecutionContext): EitherT[Future, AppError, JsValue] = {
if (logger.isDebugEnabled) logger.debug(s"Query to Elasticsearch: $searchUri")
if (logger.isDebugEnabled) logger.debug(s"Query to Elasticsearch: ${Json.prettyPrint(query)}")

EitherT(url(searchUri)
.addHttpHeaders(config.headers.toSeq: _*)
.post(query)
.map { resp =>
resp.status match {
case 200 => Right[AppError, JsValue](resp.json)
case _ =>
Left[AppError, JsValue](AppError.InternalServerError(s"Error during es request: \n * ${resp.body}, \nquery was \n * $query"))
}
})
}
}

class WebHookAnalytics(webhook: Webhook) {
Expand Down
31 changes: 31 additions & 0 deletions daikoku/app/controllers/ApiController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, JsonFraming, Sink, Source}
import akka.util.ByteString
import cats.Id
import cats.data.EitherT
import cats.implicits.{catsSyntaxOptionId, toTraverseOps}
import controllers.AppError
import controllers.AppError._
import fr.maif.otoroshi.daikoku.actions.{DaikokuAction, DaikokuActionContext, DaikokuActionMaybeWithGuest}
import fr.maif.otoroshi.daikoku.audit.AuditTrailEvent
import fr.maif.otoroshi.daikoku.audit.config.ElasticAnalyticsConfig
import fr.maif.otoroshi.daikoku.ctrls.authorizations.async._
import fr.maif.otoroshi.daikoku.domain.NotificationAction.{ApiAccess, ApiSubscriptionDemand}
import fr.maif.otoroshi.daikoku.domain.UsagePlanVisibility.Private
Expand Down Expand Up @@ -4278,4 +4280,33 @@ class ApiController(
value.merge
}
}

def getApiSubscriptionsUsage(teamId: String) =
DaikokuAction.async(parse.json) { ctx =>
TeamAdminOnly(AuditTrailEvent(s"@{user.name} has accessed to subscription usage for his team @{team.id}"))(teamId, ctx) { team =>

val subsIds = (ctx.request.body \ "subscriptions").as[JsArray]

for {
subscriptions <- env.dataStore.apiSubscriptionRepo.forTenant(ctx.tenant).find(Json.obj("_id" -> Json.obj("$in" -> subsIds)))
planIds = subscriptions.map(_.plan.asJson).distinct
plans <- env.dataStore.usagePlanRepo.forTenant(ctx.tenant).find(Json.obj("_id" -> Json.obj("$in" -> JsArray(planIds))))
test = subscriptions.groupBy(sub => sub.plan).toSeq
r <- Future.sequence(test.map { case (planId, subs) => getOtoroshiUsage(subs, plans.find(_.id == planId))(ctx.tenant)})
} yield Ok(JsArray(r.flatMap(_.value)))

}
}

private def getOtoroshiUsage(subscriptions: Seq[ApiSubscription], plan: Option[UsagePlan])(implicit tenant: Tenant): Future[JsArray] = {

val value1: EitherT[Future, JsArray, JsArray] = plan match {
case Some(value) => for {
otoroshi <- EitherT.fromOption[Future](tenant.otoroshiSettings.find(oto => value.otoroshiTarget.exists(_.otoroshiSettings == oto.id)), Json.arr())
usages <- otoroshiClient.getSubscriptionLastUsage(subscriptions)(otoroshi, tenant)
} yield usages
case None => EitherT.pure[Future, JsArray](Json.arr())
}
value1.merge
}
}
3 changes: 2 additions & 1 deletion daikoku/app/domain/SchemaDefinition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ object SchemaDefinition {

lazy val OtoroshiSettingsType = deriveObjectType[(DataStore, DaikokuActionContext[JsValue]), OtoroshiSettings](
ObjectTypeDescription("Settings to communicate with an instance of Otoroshi"),
ReplaceField("id", Field("id", StringType, resolve = _.value.id.value))
ReplaceField("id", Field("id", StringType, resolve = _.value.id.value)),
ReplaceField("elasticConfig", Field("elasticConfig", OptionType(ElasticAnalyticsConfigType), resolve = _.value.elasticConfig))
)
lazy val MailerSettingsType: InterfaceType[(DataStore, DaikokuActionContext[JsValue]), MailerSettings] = InterfaceType(
"MailerSettings",
Expand Down
10 changes: 7 additions & 3 deletions daikoku/app/domain/json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ object json {
.getOrElse("admin-api-apikey-id"),
clientSecret = (json \ "clientSecret")
.asOpt[String]
.getOrElse("admin-api-apikey-sectet")
.getOrElse("admin-api-apikey-secret"),
elasticConfig = (json \ "elasticConfig").asOpt(ElasticAnalyticsConfig.format)
)
)
} recover {
Expand All @@ -120,7 +121,8 @@ object json {
"url" -> o.url,
"host" -> o.host,
"clientId" -> o.clientId,
"clientSecret" -> o.clientSecret
"clientSecret" -> o.clientSecret,
"elasticConfig" -> o.elasticConfig.map(ElasticAnalyticsConfig.format.writes).getOrElse(JsNull).as[JsValue]
)
}

Expand Down Expand Up @@ -480,7 +482,9 @@ object json {
Try {
JsSuccess(OtoroshiSettingsId(json.as[String]))
} recover {
case e => JsError(e.getMessage)
case e =>
AppLogger.error(e.getMessage, e)
JsError(e.getMessage)
} get

override def writes(o: OtoroshiSettingsId): JsValue = JsString(o.value)
Expand Down
3 changes: 2 additions & 1 deletion daikoku/app/domain/tenantEntities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ case class OtoroshiSettings(id: OtoroshiSettingsId,
url: String,
host: String,
clientId: String = "admin-api-apikey-id",
clientSecret: String = "admin-api-apikey-secret")
clientSecret: String = "admin-api-apikey-secret",
elasticConfig: Option[ElasticAnalyticsConfig] = None)
extends CanJson[OtoroshiSettings] {
def asJson: JsValue = json.OtoroshiSettingsFormat.writes(this)
def toUiPayload(): JsValue = {
Expand Down
1 change: 1 addition & 0 deletions daikoku/app/utils/admin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ abstract class AdminApiController[Of, Id <: ValueType](
}

def findById(id: String) = DaikokuApiAction.async { ctx =>
println("hi")
val notDeleted: Boolean =
ctx.request.queryString.get("notDeleted").exists(_ == "true")
notDeleted match {
Expand Down
81 changes: 80 additions & 1 deletion daikoku/app/utils/otoroshi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package fr.maif.otoroshi.daikoku.utils

import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
import cats.data.EitherT
import cats.implicits.catsSyntaxOptionId
import controllers.AppError
import controllers.AppError.OtoroshiError
import fr.maif.otoroshi.daikoku.audit.{ElasticReadsAnalytics, ElasticWritesAnalytics}
import fr.maif.otoroshi.daikoku.audit.config.ElasticAnalyticsConfig
import fr.maif.otoroshi.daikoku.domain.json.ActualOtoroshiApiKeyFormat
import fr.maif.otoroshi.daikoku.domain.{ActualOtoroshiApiKey, OtoroshiSettings}
import fr.maif.otoroshi.daikoku.domain.{ActualOtoroshiApiKey, ApiSubscription, OtoroshiSettings, Tenant, json}
import fr.maif.otoroshi.daikoku.env.Env
import fr.maif.otoroshi.daikoku.logger.AppLogger
import play.api.libs.json._
import play.api.libs.ws.{WSAuthScheme, WSRequest}
import play.api.mvc._
Expand Down Expand Up @@ -285,4 +290,78 @@ class OtoroshiClient(env: Env) {
}
}
}

def getSubscriptionLastUsage(subscriptions: Seq[ApiSubscription])(implicit otoroshiSettings: OtoroshiSettings, tenant: Tenant): EitherT[Future, JsArray, JsArray] = {
otoroshiSettings.elasticConfig match {
case Some(config) =>
new ElasticReadsAnalytics(config, env)
.query(Json.obj(
"query" -> Json.obj(
"bool" -> Json.obj(
"filter" -> Json.arr(
Json.obj("terms" -> Json.obj(
"identity.identity" -> JsArray(subscriptions.map(_.apiKey.clientId).map(JsString))
))
)
)
),
"aggs" -> Json.obj(
"lastUsages" -> Json.obj(
"terms" -> Json.obj(
"field" -> "identity.identity"
),
"aggs" -> Json.obj(
"latest" -> Json.obj(
"top_hits" -> Json.obj(
"size" -> 1,
"sort" -> Json.arr(Json.obj(
"@timestamp" -> Json.obj(
"order" -> "desc"
)
))
)
)
)
)),
"size" -> 0
))
.map(resp => {
val buckets = (resp \ "aggregations" \ "lastUsages" \ "buckets").as[JsArray]
JsArray(buckets.value.map(agg => {
val key = (agg \ "key").as[String]
val lastUsage = (agg \ "latest" \ "hits" \ "hits").as[JsArray].value.head
val date = (lastUsage \ "_source" \ "@timestamp").as[JsValue]

Json.obj(
"clientName" -> key,
"date" -> date,
"subscription" -> subscriptions.find(_.apiKey.clientId == key).map(_.id.asJson).getOrElse(JsNull).as[JsValue]
)
}))
})
.leftMap(e => {
AppLogger.error(e.getErrorMessage())
Json.arr()
})
case None => for {
elasticConfig <- EitherT.fromOptionF(getElasticConfig(), Json.arr())
updatedSettings = otoroshiSettings.copy(elasticConfig = elasticConfig.some)
updatedTenant = tenant.copy(otoroshiSettings = tenant.otoroshiSettings.filter(_.id != otoroshiSettings.id) + updatedSettings)
_ <- EitherT.liftF(env.dataStore.tenantRepo.save(updatedTenant))
r <- getSubscriptionLastUsage(subscriptions)(updatedSettings, updatedTenant)
} yield r
}
}

private def getElasticConfig()(implicit otoroshiSettings: OtoroshiSettings): Future[Option[ElasticAnalyticsConfig]] = {
client(s"/api/globalconfig").get().map(resp => {
if (resp.status == 200) {
val config = resp.json.as[JsObject]
val elasticReadConfig = (config \ "elasticReadsConfig").asOpt(ElasticAnalyticsConfig.format)
elasticReadConfig
} else {
None
}
})
}
}
1 change: 1 addition & 0 deletions daikoku/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ DELETE /api/teams/:teamId/subscriptions/_clean fr.maif.otoroshi.daiko
POST /api/teams/:teamId/subscriptions/:id/_rotation fr.maif.otoroshi.daikoku.ctrls.ApiController.toggleApiKeyRotation(teamId, id)
POST /api/teams/:teamId/subscriptions/:id/_refresh fr.maif.otoroshi.daikoku.ctrls.ApiController.regenerateApiKeySecret(teamId, id)
POST /api/subscriptions/_init fr.maif.otoroshi.daikoku.ctrls.ApiController.initSubscriptions()
POST /api/teams/:teamId/subscriptions/_lastUsage fr.maif.otoroshi.daikoku.ctrls.ApiController.getApiSubscriptionsUsage(teamId)
POST /api/apis/_init fr.maif.otoroshi.daikoku.ctrls.ApiController.initApis()
GET /api/teams/:teamId/subscription/:id/informations fr.maif.otoroshi.daikoku.ctrls.ApiController.getSubscriptionInformations(teamId, id)
GET /api/teams/:teamId/apis/:apiId/:version/subscriptions fr.maif.otoroshi.daikoku.ctrls.ApiController.getApiSubscriptions(teamId, apiId, version)
Expand Down
Loading

0 comments on commit 39b330f

Please sign in to comment.