Skip to content

Commit

Permalink
always flush remaining messages when traffic shaping is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
schlawg committed Sep 24, 2024
1 parent 56b90f9 commit 614315e
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/main/scala/netty/ActorChannelConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ final private class ActorChannelConnector(
scheduler: Scheduler,
ec: Executor
):
private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step"))
private val interval =
settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis"))
private val maxDelay =
settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis"))

private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]()
private val step = intSetting("netty.flush.step")
private val interval = intSetting("netty.flush.interval-millis")
private val maxDelay = intSetting("netty.flush.max-delay-millis")
private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]()

scheduler.scheduleOnce(1 second, () => flush())

Expand All @@ -44,6 +41,9 @@ final private class ActorChannelConnector(
clients ! Clients.Control.Stop(client)
}

private def intSetting(key: String) =
settings.makeSetting(key, config.getInt(key))

private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit =
case ipc.ClientIn.Disconnect(reason) =>
channel
Expand All @@ -57,9 +57,8 @@ final private class ActorChannelConnector(
channel.write(TextWebSocketFrame(in.write))
flushQ.add(channel)

private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get

private def flush(): Unit =
val maxDelayFactor = maxDelay.get.toDouble / interval.get
var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt)

while channelsToFlush > 0 do
Expand All @@ -70,5 +69,8 @@ final private class ActorChannelConnector(
case _ =>
channelsToFlush = 0

val nextInterval = if interval.get <= 0 then 1.second else interval.get.millis
val nextInterval =
if interval.get > 0 then interval.get.millis
else if flushQ.isEmpty then 1.second // hibernate
else 1.millis // interval is 0 but we still need to empty the queue
scheduler.scheduleOnce(nextInterval, () => flush())

0 comments on commit 614315e

Please sign in to comment.