From 16de30d0b8ac78fa65b53fd73afaa98e71ced143 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Wed, 13 Mar 2024 20:08:56 +0100 Subject: [PATCH] use ConcurrentHashMap in EvalCacheUpgrade too --- src/main/scala/evalCache/EvalCacheMulti.scala | 2 +- .../scala/evalCache/EvalCacheUpgrade.scala | 78 ++++++++++--------- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/main/scala/evalCache/EvalCacheMulti.scala b/src/main/scala/evalCache/EvalCacheMulti.scala index d08fb96e..a3ff92b1 100644 --- a/src/main/scala/evalCache/EvalCacheMulti.scala +++ b/src/main/scala/evalCache/EvalCacheMulti.scala @@ -30,7 +30,7 @@ final private class EvalCacheMulti(using members .compute( sri.value, - (k, prev) => + (_, prev) => Option(prev).foreach: _.setups.foreach(unregisterEval(_, sri)) WatchingMember(sri, e.variant, e.fens) diff --git a/src/main/scala/evalCache/EvalCacheUpgrade.scala b/src/main/scala/evalCache/EvalCacheUpgrade.scala index 863b1dcb..3b65ffd8 100644 --- a/src/main/scala/evalCache/EvalCacheUpgrade.scala +++ b/src/main/scala/evalCache/EvalCacheUpgrade.scala @@ -5,7 +5,7 @@ import chess.format.{ Fen, UciPath } import chess.variant.Variant import play.api.libs.json.JsString -import scala.collection.mutable +import java.util.concurrent.ConcurrentHashMap import lila.ws.ipc.ClientIn.EvalHit import lila.ws.ipc.ClientOut.EvalGet @@ -21,54 +21,58 @@ final private class EvalCacheUpgrade(using ): import EvalCacheUpgrade.* - private val members = mutable.AnyRefMap.empty[SriString, WatchingMember] - private val evals = mutable.AnyRefMap.empty[SetupId, EvalState] - private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 10 minutes, expire) + private val members = ConcurrentHashMap[SriString, WatchingMember](4096) + private val evals = ConcurrentHashMap[SetupId, EvalState](1024) + private val expirableSris = ExpireCallbackMemo[Sri](scheduler, 3 minutes, expire) private val upgradeMon = Monitor.evalCache.single.upgrade def register(sri: Sri, e: EvalGet): Unit = - members - .get(sri.value) - .foreach: wm => - unregisterEval(wm.setupId, sri) - val setupId = makeSetupId(e.variant, e.fen, e.multiPv) - members += (sri.value -> WatchingMember(sri, setupId, e.path)) - evals += (setupId -> evals.get(setupId).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri))) + members.compute( + sri.value, + (_, prev) => + Option(prev).foreach: member => + unregisterEval(member.setupId, sri) + val setupId = makeSetupId(e.variant, e.fen, e.multiPv) + evals.compute(setupId, (_, eval) => Option(eval).fold(EvalState(Set(sri), Depth(0)))(_.addSri(sri))) + WatchingMember(sri, setupId, e.path) + ) expirableSris.put(sri) def onEval(input: EvalCacheEntry.Input, fromSri: Sri): Unit = - (1 to input.eval.multiPv.value) - .flatMap: multiPv => - val setupId = makeSetupId(input.id.variant, input.fen, MultiPv(multiPv)) - evals.get(setupId).map(setupId -> _) - .filter: - _._2.depth < input.eval.depth - .foreach: (setupId, eval) => - evals += (setupId -> eval.copy(depth = input.eval.depth)) - val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => members.get(sri.value)) - if wms.nonEmpty then - val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen) - wms - .groupBy(_.path) - .map: (path, wms) => - val hit = EvalHit(evalJson + ("path" -> JsString(path.value))) - wms.foreach(wm => Bus.publish(_.sri(wm.sri), hit)) - upgradeMon.count.increment(wms.size) + (1 to input.eval.multiPv.value).foreach: multiPv => + val setupId = makeSetupId(input.id.variant, input.fen, MultiPv(multiPv)) + Option( + evals.computeIfPresent( + setupId, + (_, ev) => + if ev.depth >= input.eval.depth then ev + else ev.copy(depth = input.eval.depth) + ) + ).filter(_.depth == input.eval.depth) + .foreach: eval => + val wms = eval.sris.withFilter(_ != fromSri).flatMap(sri => Option(members.get(sri.value))) + if wms.nonEmpty then + val evalJson = EvalCacheJsonHandlers.writeEval(input.eval, input.fen) + wms + .groupBy(_.path) + .map: (path, members) => + val hit = EvalHit(evalJson + ("path" -> JsString(path.value))) + members.foreach(m => Bus.publish(_.sri(m.sri), hit)) + upgradeMon.count.increment(wms.size) private def expire(sri: Sri): Unit = - members.get(sri.value).foreach { wm => - unregisterEval(wm.setupId, sri) - members -= sri.value - } + Option(members.remove(sri.value)).foreach: m => + unregisterEval(m.setupId, sri) private def unregisterEval(setupId: SetupId, sri: Sri): Unit = - evals - .get(setupId) - .foreach: eval => + evals.computeIfPresent( + setupId, + (_, eval) => val newSris = eval.sris - sri - if newSris.isEmpty then evals -= setupId - else evals += (setupId -> eval.copy(sris = newSris)) + if newSris.isEmpty then null + else eval.copy(sris = newSris) + ) scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () => upgradeMon.members.update(members.size)