From 90136a719ad38294770bf4bd6bcf642498749a0f Mon Sep 17 00:00:00 2001 From: dingshuangxi888 Date: Tue, 21 Jan 2025 11:48:37 +0800 Subject: [PATCH] Assign offset in offsetTable even if the subscription key not exist. --- .../broker/offset/ConsumerOffsetManager.java | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index ea46f1d8a1f..34f23b754bc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.offset; +import com.google.common.collect.Maps; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -417,27 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs } String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentMap map = resetOffsetTable.get(key); - if (null == map) { - map = new ConcurrentHashMap(); - ConcurrentMap previous = resetOffsetTable.putIfAbsent(key, map); - if (null != previous) { - map = previous; - } - } - - map.put(queueId, offset); - LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", - topic, group, queueId, offset); + resetOffsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset); + LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset); // Two things are important here: // 1, currentOffsetMap might be null if there is no previous records; // 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes // sense in cases like clients are offline. - ConcurrentMap currentOffsetMap = offsetTable.get(key); - if (null != currentOffsetMap) { - currentOffsetMap.put(queueId, offset); - } + offsetTable.computeIfAbsent(key, k-> Maps.newConcurrentMap()).put(queueId, offset); } public boolean hasOffsetReset(String topic, String group, int queueId) {