diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 347d92304dc..6c0bce5929a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -18,7 +18,16 @@ import com.google.common.collect.Maps; import io.netty.buffer.PooledByteBufAllocator; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.ThreadUtils; @@ -43,16 +52,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - public abstract class AbstractRocksDBStorage { protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); @@ -381,6 +380,7 @@ public synchronized boolean start() { public synchronized boolean shutdown() { try { if (!this.loaded) { + LOGGER.info("RocksDBStorage is not loaded, shutdown OK. dbPath={}, readOnly={}", this.dbPath, this.readOnly); return true; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 9d3c46a438a..187a0729e83 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -517,11 +517,9 @@ public void shutdown() { if (this.compactionService != null) { this.compactionService.shutdown(); } - - if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && this.rocksDBMessageStore != null) { this.rocksDBMessageStore.consumeQueueStore.shutdown(); } - this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java index a91ae5e244e..cb989852fb9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java @@ -144,7 +144,7 @@ private void loadMaxConsumeQueueOffsets() { Function predicate = entry -> entry.type == OffsetEntryType.MAXIMUM; Consumer fn = entry -> { topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset); - ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); + log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); }; try { forEach(predicate, fn);