Skip to content

Commit

Permalink
[ISSUE #7480] Fix the offset in the timerCheckPoint will not be corre…
Browse files Browse the repository at this point in the history
…cted when the commitlog and consumeQueue are truncated (#7488)
  • Loading branch information
RongtongJin authored Dec 4, 2024
1 parent e104c02 commit f4c4984
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,19 @@ public void recover() {
}
currQueueOffset = Math.min(currQueueOffset, timerCheckpoint.getMasterTimerQueueOffset());

ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, 0);

// Correction based consume queue
if (cq != null && currQueueOffset < cq.getMinOffsetInQueue()) {
LOGGER.warn("Timer currQueueOffset:{} is smaller than minOffsetInQueue:{}",
currQueueOffset, cq.getMinOffsetInQueue());
currQueueOffset = cq.getMinOffsetInQueue();
} else if (cq != null && currQueueOffset > cq.getMaxOffsetInQueue()) {
LOGGER.warn("Timer currQueueOffset:{} is larger than maxOffsetInQueue:{}",
currQueueOffset, cq.getMaxOffsetInQueue());
currQueueOffset = cq.getMaxOffsetInQueue();
}

//check timer wheel
currReadTimeMs = timerCheckpoint.getLastReadTimeMs();
long nextReadTimeMs = formatTimeMs(
Expand Down Expand Up @@ -614,7 +627,7 @@ public void addMetric(MessageExt msg, int value) {
return;
}
if (msg.getProperty(TIMER_ENQUEUE_MS) != null
&& NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) {
&& NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) {
return;
}
// pass msg into addAndGet, for further more judgement extension.
Expand Down

0 comments on commit f4c4984

Please sign in to comment.