Skip to content

Commit

Permalink
debounce eval upgrades based on a live setting
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Dec 10, 2024
1 parent 3a9c539 commit 8a475b6
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 40 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lazy val `lila-ws` = project
.classifier(s"linux-$arch_"),
("io.netty" % s"netty-transport-native-kqueue" % nettyVersion)
.classifier(s"osx-$arch_"),
"org.lichess" %% "scalalib-lila" % "11.3.2",
"org.lichess" %% "scalalib-lila" % "11.5.0",
"org.lichess" %% "scalachess" % chessVersion,
"org.lichess" %% "scalachess-play-json" % chessVersion,
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
Expand Down
24 changes: 13 additions & 11 deletions src/main/scala/evalCache/EvalCacheApi.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lila.ws
package evalCache

import com.softwaremill.macwire.*
import cats.syntax.all.*
import chess.ErrorStr
import chess.format.Fen
Expand All @@ -14,13 +15,13 @@ import java.time.LocalDateTime
import lila.ws.ipc.ClientIn
import lila.ws.ipc.ClientOut.{ EvalGet, EvalGetMulti, EvalPut }

final class EvalCacheApi(mongo: Mongo)(using
final class EvalCacheApi(mongo: Mongo, settings: util.SettingStore)(using
Executor,
org.apache.pekko.actor.typed.Scheduler
):

private val truster = EvalCacheTruster(mongo)
private val upgrade = EvalCacheUpgrade()
private val truster = wire[EvalCacheTruster]
private val upgrade = wire[EvalCacheUpgrade]
private val multi = EvalCacheMulti()

import EvalCacheEntry.*
Expand Down Expand Up @@ -72,8 +73,9 @@ final class EvalCacheApi(mongo: Mongo)(using
depth = e.depth,
by = user,
trust = trust
)
).foreach(putTrusted(sri, user, _))
),
sri
).foreach(putTrusted(user, _))

// reduce the number of evals stored and,
// perhaps more importantly, distributed to subscribers
Expand Down Expand Up @@ -104,7 +106,7 @@ final class EvalCacheApi(mongo: Mongo)(using
if res.isDefined then mongo.evalCacheUsedNow(id)
res

private def putTrusted(sri: Sri, user: User.Id, input: Input): Future[Unit] =
private def putTrusted(user: User.Id, input: Input): Future[Unit] =
mongo.evalCacheColl.flatMap: c =>
validate(input).match
case Left(error) =>
Expand All @@ -124,21 +126,21 @@ final class EvalCacheApi(mongo: Mongo)(using
.one(entry)
.recover(mongo.ignoreDuplicateKey)
.map: _ =>
afterPut(input, sri, entry)
afterPut(input, entry)
case Some(oldEntry) =>
val entry = oldEntry.add(input.eval)
if entry.similarTo(oldEntry) then Future.successful(())
else
c.update
.one(BSONDocument("_id" -> entry.id), entry, upsert = true)
.map: _ =>
afterPut(input, sri, entry)
afterPut(input, entry)

private def afterPut(input: Input, sri: Sri, entry: EvalCacheEntry): Unit =
private def afterPut(input: Input, entry: EvalCacheEntry): Unit =
cache.put(input.id, Future.successful(entry.some))
// todo: debounce upgrades in hot rooms
upgrade.onEval(input, sri)
multi.onEval(input, sri)
upgrade.onEval(input)
multi.onEval(input)

private def validate(in: EvalCacheEntry.Input): Either[ErrorStr, Unit] =
in.eval.pvs.traverse_ { pv =>
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/evalCache/EvalCacheEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class EvalCacheEntry(

object EvalCacheEntry:

case class Input(id: Id, fen: Fen.Full, situation: Situation, eval: Eval)
case class Input(id: Id, fen: Fen.Full, situation: Situation, eval: Eval, sri: Sri)

case class Eval(pvs: NonEmptyList[Pv], knodes: Knodes, depth: Depth, by: User.Id, trust: Trust):

Expand Down Expand Up @@ -83,10 +83,10 @@ object EvalCacheEntry:

def truncate = copy(moves = Moves.truncate(moves))

def makeInput(variant: Variant, fen: Fen.Full, eval: Eval) =
def makeInput(variant: Variant, fen: Fen.Full, eval: Eval, sri: Sri) =
Fen
.read(variant, fen)
.filter(_.playable(false))
.ifTrue(eval.looksValid)
.map: situation =>
Input(Id(situation), fen, situation, eval.truncatePvs)
Input(Id(situation), fen, situation, eval.truncatePvs, sri)
4 changes: 2 additions & 2 deletions src/main/scala/evalCache/EvalCacheMulti.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final private class EvalCacheMulti(using
evals.compute(id, (_, prev) => Option(prev).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri)))
expirableSris.put(sri)

def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit =
def onEval(input: EvalCacheEntry.Input): Unit =
Option(
evals.computeIfPresent(
input.id,
Expand All @@ -50,7 +50,7 @@ final private class EvalCacheMulti(using
)
).filter(_.depth == input.eval.depth)
.foreach: eval =>
val sris = eval.sris.filter(_ != fromSri)
val sris = eval.sris.filter(_ != input.sri)
if sris.nonEmpty then
val hit = EvalHitMulti:
EvalCacheJsonHandlers.writeMultiHit(input.fen, input.eval)
Expand Down
57 changes: 35 additions & 22 deletions src/main/scala/evalCache/EvalCacheUpgrade.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package lila.ws
package evalCache

import java.util.concurrent.ConcurrentHashMap
import chess.format.UciPath
import play.api.libs.json.JsString

import java.util.concurrent.ConcurrentHashMap
import scalalib.DebouncerFunction

import lila.ws.ipc.ClientIn.EvalHit
import lila.ws.ipc.ClientOut.EvalGet
Expand All @@ -14,7 +14,9 @@ import lila.ws.util.ExpireCallbackMemo
* by remembering the last evalGet of each socket member,
* and listening to new evals stored.
*/
final private class EvalCacheUpgrade(using
final private class EvalCacheUpgrade(
settings: util.SettingStore
)(using
ec: Executor,
scheduler: org.apache.pekko.actor.typed.Scheduler
):
Expand All @@ -24,6 +26,11 @@ final private class EvalCacheUpgrade(using
private val evals = ConcurrentHashMap[SetupId, EvalState](1024)
private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 3 minutes, expire)

private def debouncerSetting =
settings.makeSetting[Boolean]("lila-ws.EvalCacheUpgrade.debouncerEnable", false)

private val debouncer = DebouncerFunction[SetupId](scheduler.scheduleOnce(5.seconds, _), 64)

private val upgradeMon = Monitor.evalCache.single.upgrade

def register(sri: Sri, e: EvalGet): Unit =
Expand All @@ -42,27 +49,33 @@ final private class EvalCacheUpgrade(using
)
expirableSris.put(sri)

def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit =
def onEval(input: EvalCacheEntry.Input): Unit =
(1 to input.eval.multiPv.value).foreach: multiPv =>
val setupId = SetupId(input.id, MultiPv(multiPv))
Option(
evals.computeIfPresent(
setupId,
(_, ev) =>
if ev.depth >= input.eval.depth then ev
else ev.copy(depth = input.eval.depth)
)
).filter(_.depth == input.eval.depth)
.foreach: eval =>
val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => Option(members.get(sri.value)))
if wms.nonEmpty then
val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen)
wms
.groupBy(_.path)
.map: (path, members) =>
val hit = EvalHit(evalJson + ("path" -> JsString(path.value)))
members.foreach(m => Bus.publish(_.sri(m.sri), hit))
upgradeMon.count.increment(wms.size)
if debouncerSetting.get()
then debouncer.push(setupId)(() => publishEval(setupId, input))
else publishEval(setupId, input)

private def publishEval(setupId: SetupId, input: EvalCacheEntry.Input) =
val newEvalState = Option:
evals.computeIfPresent(
setupId,
(_, stored) =>
if stored.depth >= input.eval.depth then stored
else stored.copy(depth = input.eval.depth)
)
newEvalState
.filter(_.depth == input.eval.depth) // ensure the new one from input
.foreach: eval =>
val wms = eval.sris.withFilter(_ != input.sri).flatMap(sri => Option(members.get(sri.value)))
if wms.nonEmpty then
val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen)
wms
.groupBy(_.path)
.map: (path, members) =>
val hit = EvalHit(evalJson + ("path" -> JsString(path.value)))
members.foreach(m => Bus.publish(_.sri(m.sri), hit))
upgradeMon.count.increment(wms.size)

private def expire(sri: Sri): Unit =
Option(members.remove(sri.value)).foreach: m =>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/util/SettingStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import reactivemongo.api.bson.*
* db.setting.updateOne({_id:'dogs'},{$set:{value:50}})
*/

case class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Option[A]])(using
final class Setting[A](default: A, ttl: FiniteDuration)(fetch: () => Future[Option[A]])(using
ec: Executor,
scheduler: Scheduler
):
Expand Down

0 comments on commit 8a475b6

Please sign in to comment.