diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 9a0824829e1..e32c16a82a8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -406,6 +406,7 @@ public boolean resetOffset(long offset) { } ListIterator iterator = this.mappedFiles.listIterator(mappedFiles.size()); + List toRemoves = new ArrayList<>(); while (iterator.hasPrevious()) { mappedFileLast = iterator.previous(); @@ -416,9 +417,14 @@ public boolean resetOffset(long offset) { mappedFileLast.setCommittedPosition(where); break; } else { - iterator.remove(); + toRemoves.add(mappedFileLast); } } + + if (!toRemoves.isEmpty()) { + this.mappedFiles.removeAll(toRemoves); + } + return true; } diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index d92b3cbc0d9..3cc17c659b9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -477,6 +477,21 @@ public void testMappedFile_Rename() throws IOException, InterruptedException { TimeUnit.SECONDS.sleep(3); } + @Test + public void testReset() { + final String fixedMsg = "0123456789abcdef"; + MappedFileQueue mappedFileQueue = + new MappedFileQueue(storePath + File.separator + "a/", 64, null); + for (int i = 0; i < 8; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue(); + } + assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(2); + assertThat(mappedFileQueue.resetOffset(0)).isTrue(); + assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(1); + } + @After public void destroy() { File file = new File(storePath);