Skip to content

Commit

Permalink
[ISSUE #8804] clean offset when remove group offset
Browse files Browse the repository at this point in the history
leizhiyuan authored Oct 10, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent c826373 commit e75554d
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.offset;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@@ -110,4 +111,25 @@ public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}

@Override
public void removeOffset(String group) {
if (!MixAll.isLmq(group)) {
super.removeOffset(group);
return;
}
Iterator<Map.Entry<String, Long>> it = this.lmqOffsetTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Long> next = it.next();
String topicAtGroup = next.getKey();
if (topicAtGroup.contains(group)) {
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
if (arrays.length == 2 && group.equals(arrays[1])) {
it.remove();
removeConsumerOffset(topicAtGroup);
LOG.warn("clean lmq group offset {}", topicAtGroup);
}
}
}
}
}

0 comments on commit e75554d

Please sign in to comment.