From 1f4110110b45a3231705f442dc79c4e9e68f6adf Mon Sep 17 00:00:00 2001 From: shenjianeng Date: Sun, 26 Nov 2023 13:40:49 +0800 Subject: [PATCH] lock.lock() before try --- .../rocketmq/client/impl/MQAdminImpl.java | 2 +- .../ConsumeMessageOrderlyService.java | 2 +- .../client/impl/consumer/ProcessQueue.java | 44 ++++++++++--------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 83835bd3d3ec..00d4e8ed81ff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -379,8 +379,8 @@ public void operationSucceed(RemotingCommand response) { MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); + lock.writeLock().lock(); try { - lock.writeLock().lock(); queryResultList.add(qr); } finally { lock.writeLock().unlock(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 4246768d409b..908e89447cd0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -487,8 +487,8 @@ public void run() { long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; + this.processQueue.getConsumeLock().lock(); try { - this.processQueue.getConsumeLock().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index ab94a984677a..d9394da91c0d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -424,29 +424,33 @@ public void fillProcessQueueInfo(final ProcessQueueInfo info) { try { this.treeMapLock.readLock().lockInterruptibly(); - if (!this.msgTreeMap.isEmpty()) { - info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); - info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); - info.setCachedMsgCount(this.msgTreeMap.size()); - info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); - } + try { + if (!this.msgTreeMap.isEmpty()) { + info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); + info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); + info.setCachedMsgCount(this.msgTreeMap.size()); + info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); + } - if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { - info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey()); - info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey()); - info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size()); - } + if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { + info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey()); + info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey()); + info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size()); + } - info.setLocked(this.locked); - info.setTryUnlockTimes(this.tryUnlockTimes.get()); - info.setLastLockTimestamp(this.lastLockTimestamp); + info.setLocked(this.locked); + info.setTryUnlockTimes(this.tryUnlockTimes.get()); + info.setLastLockTimestamp(this.lastLockTimestamp); - info.setDroped(this.dropped); - info.setLastPullTimestamp(this.lastPullTimestamp); - info.setLastConsumeTimestamp(this.lastConsumeTimestamp); - } catch (Exception e) { - } finally { - this.treeMapLock.readLock().unlock(); + info.setDroped(this.dropped); + info.setLastPullTimestamp(this.lastPullTimestamp); + info.setLastConsumeTimestamp(this.lastConsumeTimestamp); + } finally { + this.treeMapLock.readLock().unlock(); + } + + } catch (InterruptedException e) { + log.error("fillProcessQueueInfo lock error, ProcessQueueInfo={}", info, e); } }