Skip to content

Commit

Permalink
Reduced memory use on busy Sessions by removing handled items from in…
Browse files Browse the repository at this point in the history
…flightTimeouts

Items in the inflightTimeouts DelayQueue were only ever removed when they
timed out. But in normal operation the related messages would have been
handled long before that. There should only ever be a number equal to the
number of inflightSlots in the queue, but the queue would grow to the
maximum number of messages ever handled in a 5 second interval. This made
each session take much more memory than needed.
  • Loading branch information
hylkevds committed Apr 30, 2024
1 parent 073cd63 commit 862812b
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ boolean isClean() {

public void processPubRec(int pubRecPacketId) {
// Message discarded, make sure any buffers in it are released
cleanFromInflight(pubRecPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId);
if (removed == null) {
LOG.warn("Received a PUBREC with not matching packetId");
Expand All @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) {

public void processPubComp(int messageID) {
// Message discarded, make sure any buffers in it are released
cleanFromInflight(messageID);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed == null) {
LOG.warn("Received a PUBCOMP with not matching packetId");
Expand Down Expand Up @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
cleanFromInflight(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed == null) {
LOG.warn("Received a PUBACK with not matching packetId");
Expand All @@ -355,6 +358,10 @@ void pubAckReceived(int ackPacketId) {
drainQueueToConnection();
}

private void cleanFromInflight(int ackPacketId) {
inflightTimeouts.removeIf(d -> d.packetId == ackPacketId);
}

public void flushAllQueuedMessages() {
drainQueueToConnection();
}
Expand Down Expand Up @@ -495,6 +502,7 @@ public void cleanUp() {
// in case of in memory session queues all contained messages
// has to be released.
sessionQueue.closeAndPurge();
inflightTimeouts.clear();
for (EnqueuedMessage msg : inflightWindow.values()) {
msg.release();
}
Expand Down

0 comments on commit 862812b

Please sign in to comment.