Skip to content

Commit

Permalink
use ConcurrentHashMap in EvalCacheUpgrade too
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Mar 13, 2024
1 parent f7f8bc9 commit 16de30d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/evalCache/EvalCacheMulti.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final private class EvalCacheMulti(using
members
.compute(
sri.value,
(k, prev) =>
(_, prev) =>
Option(prev).foreach:
_.setups.foreach(unregisterEval(_, sri))
WatchingMember(sri, e.variant, e.fens)
Expand Down
78 changes: 41 additions & 37 deletions src/main/scala/evalCache/EvalCacheUpgrade.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import chess.format.{ Fen, UciPath }
import chess.variant.Variant
import play.api.libs.json.JsString

import scala.collection.mutable
import java.util.concurrent.ConcurrentHashMap

import lila.ws.ipc.ClientIn.EvalHit
import lila.ws.ipc.ClientOut.EvalGet
Expand All @@ -21,54 +21,58 @@ final private class EvalCacheUpgrade(using
):
import EvalCacheUpgrade.*

private val members = mutable.AnyRefMap.empty[SriString, WatchingMember]
private val evals = mutable.AnyRefMap.empty[SetupId, EvalState]
private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 10 minutes, expire)
private val members = ConcurrentHashMap[SriString, WatchingMember](4096)
private val evals = ConcurrentHashMap[SetupId, EvalState](1024)
private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 3 minutes, expire)

private val upgradeMon = Monitor.evalCache.single.upgrade

def register(sri: Sri, e: EvalGet): Unit =
members
.get(sri.value)
.foreach: wm =>
unregisterEval(wm.setupId, sri)
val setupId = makeSetupId(e.variant, e.fen, e.multiPv)
members += (sri.value -> WatchingMember(sri, setupId, e.path))
evals += (setupId -> evals.get(setupId).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri)))
members.compute(
sri.value,
(_, prev) =>
Option(prev).foreach: member =>
unregisterEval(member.setupId, sri)
val setupId = makeSetupId(e.variant, e.fen, e.multiPv)
evals.compute(setupId, (_, eval) => Option(eval).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri)))
WatchingMember(sri, setupId, e.path)
)
expirableSris.put(sri)

def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit =
(1 to input.eval.multiPv.value)
.flatMap: multiPv =>
val setupId = makeSetupId(input.id.variant, input.fen, MultiPv(multiPv))
evals.get(setupId).map(setupId -> _)
.filter:
_._2.depth < input.eval.depth
.foreach: (setupId, eval) =>
evals += (setupId -> eval.copy(depth = input.eval.depth))
val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => members.get(sri.value))
if wms.nonEmpty then
val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen)
wms
.groupBy(_.path)
.map: (path, wms) =>
val hit = EvalHit(evalJson + ("path" -> JsString(path.value)))
wms.foreach(wm => Bus.publish(_.sri(wm.sri), hit))
upgradeMon.count.increment(wms.size)
(1 to input.eval.multiPv.value).foreach: multiPv =>
val setupId = makeSetupId(input.id.variant, input.fen, MultiPv(multiPv))
Option(
evals.computeIfPresent(
setupId,
(_, ev) =>
if ev.depth >= input.eval.depth then ev
else ev.copy(depth = input.eval.depth)
)
).filter(_.depth == input.eval.depth)
.foreach: eval =>
val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => Option(members.get(sri.value)))
if wms.nonEmpty then
val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen)
wms
.groupBy(_.path)
.map: (path, members) =>
val hit = EvalHit(evalJson + ("path" -> JsString(path.value)))
members.foreach(m => Bus.publish(_.sri(m.sri), hit))
upgradeMon.count.increment(wms.size)

private def expire(sri: Sri): Unit =
members.get(sri.value).foreach { wm =>
unregisterEval(wm.setupId, sri)
members -= sri.value
}
Option(members.remove(sri.value)).foreach: m =>
unregisterEval(m.setupId, sri)

private def unregisterEval(setupId: SetupId, sri: Sri): Unit =
evals
.get(setupId)
.foreach: eval =>
evals.computeIfPresent(
setupId,
(_, eval) =>
val newSris = eval.sris - sri
if newSris.isEmpty then evals -= setupId
else evals += (setupId -> eval.copy(sris = newSris))
if newSris.isEmpty then null
else eval.copy(sris = newSris)
)

scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
upgradeMon.members.update(members.size)
Expand Down

0 comments on commit 16de30d

Please sign in to comment.