diff --git a/src/main/scala/netty/ActorChannelConnector.scala b/src/main/scala/netty/ActorChannelConnector.scala index 0283f10c..d5c47469 100644 --- a/src/main/scala/netty/ActorChannelConnector.scala +++ b/src/main/scala/netty/ActorChannelConnector.scala @@ -18,13 +18,10 @@ final private class ActorChannelConnector( scheduler: Scheduler, ec: Executor ): - private val step = settings.makeSetting("netty.flush.step", config.getInt("netty.flush.step")) - private val interval = - settings.makeSetting("netty.flush.interval-millis", config.getInt("netty.flush.interval-millis")) - private val maxDelay = - settings.makeSetting("netty.flush.max-delay-millis", config.getInt("netty.flush.max-delay-millis")) - - private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() + private val step = intSetting("netty.flush.step") + private val interval = intSetting("netty.flush.interval-millis") + private val maxDelay = intSetting("netty.flush.max-delay-millis") + private val flushQ = new java.util.concurrent.ConcurrentLinkedQueue[Channel]() scheduler.scheduleOnce(1 second, () => flush()) @@ -44,6 +41,9 @@ final private class ActorChannelConnector( clients ! Clients.Control.Stop(client) } + private def intSetting(key: String) = + settings.makeSetting(key, config.getInt(key)) + private def emitToChannel(channel: Channel, withFlush: Boolean): ClientEmit = case ipc.ClientIn.Disconnect(reason) => channel @@ -57,9 +57,8 @@ final private class ActorChannelConnector( channel.write(TextWebSocketFrame(in.write)) flushQ.add(channel) - private def maxDelayFactor: Double = interval.get.toDouble / maxDelay.get - private def flush(): Unit = + val maxDelayFactor = maxDelay.get.toDouble / interval.get var channelsToFlush = step.get.atLeast((flushQ.size * maxDelayFactor).toInt) while channelsToFlush > 0 do @@ -70,5 +69,8 @@ final private class ActorChannelConnector( case _ => channelsToFlush = 0 - val nextInterval = if interval.get <= 0 then 1.second else interval.get.millis + val nextInterval = + if interval.get > 0 then interval.get.millis + else if flushQ.isEmpty then 1.second // hibernate + else 1.millis // interval is 0 but we still need to empty the queue scheduler.scheduleOnce(nextInterval, () => flush())