Skip to content

Commit

Permalink
Removed the reference counter for stored messages
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Jan 9, 2017
1 parent 0816312 commit ac8a11f
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 51 deletions.
17 changes: 0 additions & 17 deletions broker/src/main/java/io/moquette/spi/IMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class StoredMessage implements Serializable {
//Optional attribute, available only fo QoS 1 and 2
private Integer m_msgID;
private MessageGUID m_guid;
private AtomicInteger referenceCounter = new AtomicInteger(0);

public StoredMessage(byte[] message, AbstractMessage.QOSType qos, String topic) {
m_qos = qos;
Expand Down Expand Up @@ -93,18 +92,6 @@ public boolean isRetained() {
return m_retained;
}

public void incReferenceCounter() {
this.referenceCounter.incrementAndGet();
}

public void decReferenceCounter() {
this.referenceCounter.decrementAndGet();
}

public int getReferenceCounter() {
return this.referenceCounter.get();
}

@Override
public String toString() {
return "PublishEvent{" +
Expand Down Expand Up @@ -149,8 +136,4 @@ public String toString() {
StoredMessage getMessageByGuid(MessageGUID guid);

void cleanRetained(String topic);

void incUsageCounter(MessageGUID guid);

void decUsageCounter(MessageGUID guid);
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,9 @@ public void processPubAck(Channel channel, PubAckMessage msg) {

String topic = inflightMsg.getTopic();

MessageGUID guid = inflightMsg.getGuid();
// MessageGUID guid = inflightMsg.getGuid();
//Remove the message from message store
m_messagesStore.decUsageCounter(guid);
// m_messagesStore.decUsageCounter(guid);

m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inflightMsg, topic, username));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public void dropMessagesInSession(String clientID) {
void removeStoredMessage(MessageGUID guid) {
//remove only the not retained and no more referenced
StoredMessage storedMessage = m_persistentMessageStore.get(guid);
if (!storedMessage.isRetained() && storedMessage.getReferenceCounter() <= 0) {
if(storedMessage.getReferenceCounter() < 0) {
LOG.error("we should never have gotten a reference count less than zero");
}
if (!storedMessage.isRetained()) {
LOG.debug("Cleaning not retained message guid {}", guid);
m_persistentMessageStore.remove(guid);
}
Expand All @@ -119,18 +116,4 @@ public StoredMessage getMessageByGuid(MessageGUID guid) {
public void cleanRetained(String topic) {
m_retainedStore.remove(topic);
}

@Override
public void incUsageCounter(MessageGUID guid) {
IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid);
storedMessage.incReferenceCounter();
m_persistentMessageStore.put(guid, storedMessage);
}

@Override
public void decUsageCounter(MessageGUID guid) {
IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid);
storedMessage.decReferenceCounter();
m_persistentMessageStore.put(guid, storedMessage);
}
}
14 changes: 0 additions & 14 deletions broker/src/test/java/io/moquette/spi/impl/MemoryMessagesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,4 @@ public StoredMessage getMessageByGuid(MessageGUID guid) {
public void cleanRetained(String topic) {
m_retainedStore.remove(topic);
}

@Override
public void incUsageCounter(MessageGUID guid) {
IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid);
storedMessage.incReferenceCounter();
m_persistentMessageStore.put(guid, storedMessage);
}

@Override
public void decUsageCounter(MessageGUID guid) {
IMessagesStore.StoredMessage storedMessage = m_persistentMessageStore.get(guid);
storedMessage.decReferenceCounter();
m_persistentMessageStore.put(guid, storedMessage);
}
}

0 comments on commit ac8a11f

Please sign in to comment.