Skip to content

Commit

Permalink
remove traffic shaping experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Oct 6, 2024
1 parent 59869be commit 60f62ef
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 60 deletions.
6 changes: 2 additions & 4 deletions src/main/scala/Controller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ object Controller:
val behavior: ClientEmit => ClientBehavior,
val rateLimit: RateLimit,
val header: RequestHeader,
val emitCounter: kamon.metric.Counter,
val alwaysFlush: Boolean
val emitCounter: kamon.metric.Counter
)
def endpoint(
name: String,
Expand All @@ -323,8 +322,7 @@ object Controller:
name = name
),
header,
Monitor.clientInCounter(name),
alwaysFlush
Monitor.clientInCounter(name)
)

type ResponseSync = Either[HttpResponseStatus, Endpoint]
Expand Down
59 changes: 6 additions & 53 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,20 @@ import io.netty.buffer.Unpooled
import io.netty.channel.*
import io.netty.handler.codec.http.websocketx.*
import io.netty.util.concurrent.{ Future as NettyFuture, GenericFutureListener }
import org.apache.pekko.actor.typed.{ ActorRef, Scheduler }

import java.util.concurrent.TimeUnit
import org.apache.pekko.actor.typed.ActorRef

import lila.ws.Controller.Endpoint
import lila.ws.netty.ProtocolHandler.key

final private class ActorChannelConnector(
clients: ActorRef[Clients.Control],
staticConfig: com.typesafe.config.Config,
settings: util.SettingStore,
workers: EventLoopGroup
)(using scheduler: Scheduler, ec: Executor):

private val flushQ = java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val monitor = Monitor.connector.flush

private object config:
private def int(key: String) = settings.makeSetting(key, staticConfig.getInt(key))
val step = int("netty.flush.step")
val interval = int("netty.flush.interval-millis")
val maxDelay = int("netty.flush.max-delay-millis")
inline def isFlushQEnabled() = interval.get() > 0
scheduler.scheduleWithFixedDelay(1 minute, 1 minute): () =>
monitor.config.step.update(step.get())
monitor.config.interval.update(interval.get())
monitor.config.maxDelay.update(maxDelay.get())

workers.schedule[Unit](() => flush(), 1, TimeUnit.SECONDS)
clients: ActorRef[Clients.Control]
)(using Executor):

def apply(endpoint: Endpoint, channel: Channel): Unit =
val clientPromise = Promise[Client]()
channel.attr(key.client).set(clientPromise.future)
val channelEmit: ClientEmit =
val emitter = emitToChannel(channel, withFlush = endpoint.alwaysFlush)
val emitter = emitToChannel(channel)
(msg: ipc.ClientIn) =>
endpoint.emitCounter.increment()
emitter(msg)
Expand All @@ -51,37 +30,11 @@ final private class ActorChannelConnector(
clients ! Clients.Control.Stop(client)
}

private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit =
private def emitToChannel(channel: Channel): ClientEmit =
case ipc.ClientIn.Disconnect(reason) =>
channel
.writeAndFlush(CloseWebSocketFrame(WebSocketCloseStatus(4010, reason)))
.addListener(ChannelFutureListener.CLOSE)
case ipc.ClientIn.RoundPingFrameNoFlush =>
channel.write { PingWebSocketFrame(Unpooled.copyLong(System.currentTimeMillis())) }
case in if withFlush || !config.isFlushQEnabled() =>
channel.writeAndFlush(TextWebSocketFrame(in.write))
case in =>
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def flush(): Unit =
val qSize = flushQ.size
val maxDelayFactor = config.interval.get().toDouble / config.maxDelay.get().atLeast(1)
var channelsToFlush = config.step.get().atLeast((qSize * maxDelayFactor).toInt)
val nextIntervalMillis =
if config.isFlushQEnabled() then config.interval.get()
else if qSize == 0 then 1000 // hibernate
else 1 // interval is 0 but we still need to empty the queue

workers.schedule[Unit](() => flush(), nextIntervalMillis, TimeUnit.MILLISECONDS)

monitor.qSize.record(qSize)
monitor.channelsToFlush.record(channelsToFlush)

while channelsToFlush > 0 do
Option(flushQ.poll()) match
case Some(channel) =>
if channel.isOpen then channel.flush()
channelsToFlush -= 1
case _ =>
channelsToFlush = 0
case in => channel.writeAndFlush(TextWebSocketFrame(in.write))
5 changes: 2 additions & 3 deletions src/main/scala/netty/NettyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@ import org.apache.pekko.actor.typed.Scheduler
final class NettyServer(
clients: ClientSystem,
router: Router,
config: Config,
settings: util.SettingStore
config: Config
)(using Executor, Scheduler):
private val logger = Logger(getClass)
private val threads = config.getInt("netty.threads")
private val (parent, workers, channelClass) =
if System.getProperty("os.name").toLowerCase.startsWith("mac") then
(new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(threads), classOf[KQueueServerSocketChannel])
else (new EpollEventLoopGroup(1), new EpollEventLoopGroup(threads), classOf[EpollServerSocketChannel])
private val connector = ActorChannelConnector(clients, config, settings, workers)
private val connector = ActorChannelConnector(clients)

def start(): Unit =

Expand Down

0 comments on commit 60f62ef

Please sign in to comment.