From ac8a11fefaa834016bda24deaf116f468ba4787c Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 9 Jan 2017 21:59:09 +0100 Subject: [PATCH] Removed the reference counter for stored messages --- .../java/io/moquette/spi/IMessagesStore.java | 17 ----------------- .../moquette/spi/impl/ProtocolProcessor.java | 4 ++-- .../spi/persistence/MapDBMessagesStore.java | 19 +------------------ .../spi/impl/MemoryMessagesStore.java | 14 -------------- 4 files changed, 3 insertions(+), 51 deletions(-) diff --git a/broker/src/main/java/io/moquette/spi/IMessagesStore.java b/broker/src/main/java/io/moquette/spi/IMessagesStore.java index d4288e076..8be4af803 100644 --- a/broker/src/main/java/io/moquette/spi/IMessagesStore.java +++ b/broker/src/main/java/io/moquette/spi/IMessagesStore.java @@ -37,7 +37,6 @@ class StoredMessage implements Serializable { //Optional attribute, available only fo QoS 1 and 2 private Integer m_msgID; private MessageGUID m_guid; - private AtomicInteger referenceCounter = new AtomicInteger(0); public StoredMessage(byte[] message, AbstractMessage.QOSType qos, String topic) { m_qos = qos; @@ -93,18 +92,6 @@ public boolean isRetained() { return m_retained; } - public void incReferenceCounter() { - this.referenceCounter.incrementAndGet(); - } - - public void decReferenceCounter() { - this.referenceCounter.decrementAndGet(); - } - - public int getReferenceCounter() { - return this.referenceCounter.get(); - } - @Override public String toString() { return "PublishEvent{" + @@ -149,8 +136,4 @@ public String toString() { StoredMessage getMessageByGuid(MessageGUID guid); void cleanRetained(String topic); - - void incUsageCounter(MessageGUID guid); - - void decUsageCounter(MessageGUID guid); } diff --git a/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java b/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java index e09eb0ca6..b8cf89ae8 100644 --- a/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java +++ b/broker/src/main/java/io/moquette/spi/impl/ProtocolProcessor.java @@ -427,9 +427,9 @@ public void processPubAck(Channel channel, PubAckMessage msg) { String topic = inflightMsg.getTopic(); - MessageGUID guid = inflightMsg.getGuid(); +// MessageGUID guid = inflightMsg.getGuid(); //Remove the message from message store - m_messagesStore.decUsageCounter(guid); +// m_messagesStore.decUsageCounter(guid); m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inflightMsg, topic, username)); } diff --git a/broker/src/main/java/io/moquette/spi/persistence/MapDBMessagesStore.java b/broker/src/main/java/io/moquette/spi/persistence/MapDBMessagesStore.java index 3991685a3..62f292753 100644 --- a/broker/src/main/java/io/moquette/spi/persistence/MapDBMessagesStore.java +++ b/broker/src/main/java/io/moquette/spi/persistence/MapDBMessagesStore.java @@ -101,10 +101,7 @@ public void dropMessagesInSession(String clientID) { void removeStoredMessage(MessageGUID guid) { //remove only the not retained and no more referenced StoredMessage storedMessage = m_persistentMessageStore.get(guid); - if (!storedMessage.isRetained() && storedMessage.getReferenceCounter() <= 0) { - if(storedMessage.getReferenceCounter() < 0) { - LOG.error("we should never have gotten a reference count less than zero"); - } + if (!storedMessage.isRetained()) { LOG.debug("Cleaning not retained message guid {}", guid); m_persistentMessageStore.remove(guid); } @@ -119,18 +116,4 @@ public StoredMessage getMessageByGuid(MessageGUID guid) { public void cleanRetained(String topic) { m_retainedStore.remove(topic); } - - @Override - public void incUsageCounter(MessageGUID guid) { - IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid); - storedMessage.incReferenceCounter(); - m_persistentMessageStore.put(guid, storedMessage); - } - - @Override - public void decUsageCounter(MessageGUID guid) { - IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid); - storedMessage.decReferenceCounter(); - m_persistentMessageStore.put(guid, storedMessage); - } } diff --git a/broker/src/test/java/io/moquette/spi/impl/MemoryMessagesStore.java b/broker/src/test/java/io/moquette/spi/impl/MemoryMessagesStore.java index 79814fedb..cad1d2308 100644 --- a/broker/src/test/java/io/moquette/spi/impl/MemoryMessagesStore.java +++ b/broker/src/test/java/io/moquette/spi/impl/MemoryMessagesStore.java @@ -98,18 +98,4 @@ public StoredMessage getMessageByGuid(MessageGUID guid) { public void cleanRetained(String topic) { m_retainedStore.remove(topic); } - - @Override - public void incUsageCounter(MessageGUID guid) { - IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid); - storedMessage.incReferenceCounter(); - m_persistentMessageStore.put(guid, storedMessage); - } - - @Override - public void decUsageCounter(MessageGUID guid) { - IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid); - storedMessage.decReferenceCounter(); - m_persistentMessageStore.put(guid, storedMessage); - } }