Skip to content

Commit

Permalink
[ISSUE #7565] Fix the issue of canceling scheduled tasks for discarde…
Browse files Browse the repository at this point in the history
…d tasks.
  • Loading branch information
lokistars committed Nov 16, 2023
1 parent 651a5ca commit 5f56964
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -883,18 +883,20 @@ public void run() {

this.currentThread = Thread.currentThread();

ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);

if (null == processQueue || processQueue.isDropped()) {
log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}

if (assignedMessageQueue.isPaused(messageQueue)) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
log.debug("Message Queue: {} has been paused!", messageQueue);
return;
}

ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);

if (null == processQueue || processQueue.isDropped()) {
log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
return;
}

processQueue.setLastPullTimestamp(System.currentTimeMillis());

Expand Down

0 comments on commit 5f56964

Please sign in to comment.