From 4cbb760c2d5e583db2d101408f299369118c1c84 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Fri, 17 Jan 2025 11:40:19 -0600 Subject: [PATCH] [AMQ-9646] Support selecting specific messages for command line backup --- .../apache/activemq/store/MessageStore.java | 13 +- .../activemq/store/ProxyMessageStore.java | 14 ++- .../store/memory/MemoryMessageStore.java | 28 +---- .../console/command/store/StoreBackup.java | 60 ++++++++- .../activemq/store/jdbc/JDBCMessageStore.java | 13 -- .../activemq/store/kahadb/KahaDBStore.java | 78 ++++++++++-- .../store/kahadb/TempKahaDBStore.java | 32 ----- .../cursors/StoreQueueCursorOrderTest.java | 5 - .../KahaDBOffsetRecoveryListenerTest.java | 116 ++++++++++++++---- 9 files changed, 242 insertions(+), 117 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java index e35327f8481..47795523814 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java @@ -178,7 +178,17 @@ public interface MessageStore extends Service { void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception; - void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception; + default void recoverNextMessages(long offset, int maxReturned, MessageRecoveryListener listener) throws Exception { + throw new UnsupportedOperationException("recoverNextMessages(offset,maxReturned,listener) is not supported"); + } + + default void recoverNextMessages(long offset, int maxReturned, MessageRecoveryListener listener, boolean useDedicatedCursor) throws Exception { + throw new UnsupportedOperationException("recoverNextMessages(offset,maxReturned,listener,useDedicatedCursor) is not supported"); + } + + default void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception { + throw new UnsupportedOperationException("recoverNextMessages(startMsgId,endMsgId,maxReturned,listener,useDedicatedCursor) is not supported"); + } void dispose(ConnectionContext context); @@ -211,4 +221,5 @@ public interface MessageStore extends Service { void updateMessage(Message message) throws IOException; void registerIndexListener(IndexListener indexListener); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index a4fb4be5b8a..b860bb2713c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -106,15 +106,25 @@ public long getMessageSize() throws IOException { } @Override - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { delegate.recoverNextMessages(maxReturned, listener); } @Override - public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { + public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { delegate.recoverNextMessages(offset, maxReturned, listener); } + @Override + public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception { + delegate.recoverNextMessages(offset, maxReturned, listener, useDedicatedCursor); + } + + @Override + public void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception { + delegate.recoverNextMessages(startMsgId, endMsgId, maxReturned, listener, useDedicatedCursor); + } + @Override public void resetBatching() { delegate.resetBatching(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index f0857fb960d..7a0f69b04ea 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -111,7 +111,7 @@ public void delete() { } @Override - public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { + public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { synchronized (messageTable) { boolean pastLackBatch = lastBatchId == null; for (Map.Entry entry : messageTable.entrySet()) { @@ -130,32 +130,6 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene } } - @Override - public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { - synchronized (messageTable) { - boolean pastLackBatch = lastBatchId == null; - int position = 0; - for (Map.Entry entry : messageTable.entrySet()) { - if(offset > 0 && offset > position) { - position++; - continue; - } - if (pastLackBatch) { - Object msg = entry.getValue(); - lastBatchId = entry.getKey(); - if (msg.getClass() == MessageId.class) { - listener.recoverMessageReference((MessageId) msg); - } else { - listener.recoverMessage((Message) msg); - } - } else { - pastLackBatch = entry.getKey().equals(lastBatchId); - } - position++; - } - } - } - @Override public void resetBatching() { lastBatchId = null; diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java index 026accf0eb6..0fa74ed7fe5 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; import java.util.HashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; @@ -43,7 +46,7 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; -import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; import org.apache.activemq.protobuf.AsciiBuffer; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.protobuf.UTF8Buffer; @@ -62,6 +65,11 @@ public class StoreBackup { String queue; Integer offset; Integer count; + String indexes; + Collection indexesList; + + String startMsgId; + String endMsgId; private final ObjectMapper mapper = new ObjectMapper(); private final AsciiBuffer ds_kind = new AsciiBuffer("ds"); @@ -88,6 +96,14 @@ public void execute() throws Exception { throw new Exception("optional --offset and --count must be specified together"); } + if ((startMsgId != null || endMsgId != null) && queue == null) { + throw new Exception("optional --queue must be specified when using startMsgId or endMsgId"); + } + + if (indexes != null && !indexes.isBlank()) { + indexesList = Stream.of(indexes.split(",")).map(index -> Integer.parseInt(index.trim())).collect(Collectors.toList()); + } + setFile(new File(filename)); System.out.println("Loading: " + config); BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting.. @@ -178,7 +194,19 @@ public boolean recoverMessage(Message message) throws IOException { return true; } }; - if(offset != null) { + if(startMsgId != null || endMsgId != null) { + System.out.println("Backing up from startMsgId: " + startMsgId + " to endMsgId: " + endMsgId); + queue.recoverNextMessages(startMsgId, endMsgId, (count != null ? count : Integer.MAX_VALUE), queueRecoveryListener, true); + } else if(indexesList != null) { + System.out.println("Backing up using indexes count: " + indexesList.size()); + for(int idx : indexesList) { + if(idx < 0) { + continue; + } + queue.recoverNextMessages(idx, 1, queueRecoveryListener, true); + } + } else if(offset != null) { + System.out.println("Backing up from offset: " + offset + " count: " + count); queue.recoverNextMessages(offset, count, queueRecoveryListener); } else { queue.recover(queueRecoveryListener); @@ -265,14 +293,14 @@ private QueueEntryPB createQueueEntryPB(Message message, long queueKey, long que private MessagePB createMessagePB(Message message, long messageKey) throws IOException { DataByteArrayOutputStream mos = new DataByteArrayOutputStream(); mos.writeBoolean(TIGHT_ENCODING); - mos.writeVarInt(OPENWIRE_VERSION); + mos.writeInt(OPENWIRE_VERSION); wireformat.marshal(message, mos); MessagePB messageRecord = new MessagePB(); messageRecord.setCodec(codec_id); messageRecord.setMessageKey(messageKey); messageRecord.setSize(message.getSize()); - messageRecord.setValue(new Buffer(mos.toBuffer().getData())); + messageRecord.setValue(new Buffer(mos.getData())); return messageRecord; } @@ -323,4 +351,28 @@ public void setCount(int count) { public Integer getCount() { return count; } + + public void setIndexes(String indexes) { + this.indexes = indexes; + } + + public String getIndexes() { + return indexes; + } + + public String getStartMsgId() { + return startMsgId; + } + + public void setStartMsgId(String startMsgId) { + this.startMsgId = startMsgId; + } + + public String getEndMsgId() { + return endMsgId; + } + + public void setEndMsgId(String endMsgId) { + this.endMsgId = endMsgId; + } } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 70ddb7ab1ee..8adc2f78ee9 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -398,19 +398,6 @@ public boolean recoverMessageReference(String reference) throws Exception { } - /** - * @param offset - * @param maxReturned - * @param listener - * @throws Exception - * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, - * org.apache.activemq.store.MessageRecoveryListener) - */ - @Override - public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener) throws Exception { - throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported."); - } - public void trackRollbackAck(Message message) { synchronized (rolledBackAcks) { rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 18b6eca9cb7..e4e5e6d111c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -683,7 +683,7 @@ public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); sd.orderIndex.resetCursorPosition(); - for (Iterator> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator + for (Iterator> iterator = sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && iterator .hasNext(); ) { Entry entry = iterator.next(); Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); @@ -733,7 +733,12 @@ public void execute(Transaction tx) throws Exception { } @Override - public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { + recoverNextMessages(offset, maxReturned, listener, false); + } + + @Override + public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception { indexLock.writeLock().lock(); try { pageFile.tx().execute(new Transaction.Closure() { @@ -741,18 +746,74 @@ public void recoverNextMessages(final int offset, final int maxReturned, final M public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Entry entry = null; - int position = 0; + long tmpOffset = (offset >= 0 ? offset : 0); int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + Iterator> iterator = (useDedicatedCursor ? sd.orderIndex.iterator(tx, new MessageOrderCursor(tmpOffset)) : sd.orderIndex.iterator(tx)); + while ( iterator.hasNext()) { entry = iterator.next(); if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } - if(offset > 0 && offset > position) { - position++; + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + listener.recoverMessage(msg); + counter++; + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { + break; + } + } + sd.orderIndex.stoppedIterating(); + } + }); + } finally { + indexLock.writeLock().unlock(); + } + } + + @Override + public void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception { + indexLock.writeLock().lock(); + try { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws Exception { + StoredDestination sd = getStoredDestination(dest, tx); + Long startOffset = null; + Long endOffset = null; + + if(startMsgId != null && !startMsgId.isBlank()) { + startOffset = sd.messageIdIndex.get(tx, startMsgId); + } + + if(startOffset == null) { + startOffset = Long.valueOf(0l); + } + + if(endMsgId != null && !endMsgId.isBlank()) { + endOffset = sd.messageIdIndex.get(tx, endMsgId); + } + + if(endOffset == null) { + endOffset = startOffset + Long.valueOf(maxReturned); + } + + if(endOffset < startOffset) { + // Fast fail on invalid arguments + return; + } + + Entry entry = null; + long tmpOffset = (startOffset >= 0 ? startOffset : 0); + int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + Iterator> iterator = (useDedicatedCursor ? sd.orderIndex.iterator(tx, new MessageOrderCursor(tmpOffset)) : sd.orderIndex.iterator(tx)); + while (iterator.hasNext()) { + entry = iterator.next(); + + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } @@ -760,8 +821,7 @@ public void execute(Transaction tx) throws Exception { msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); listener.recoverMessage(msg); counter++; - position++; - if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { + if (entry.getKey() >= endOffset || counter >= maxReturned || !listener.canRecoveryNextMessage()) { break; } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 7835a1b7b64..f7a49b8c186 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -241,43 +241,11 @@ public void execute(Transaction tx) throws Exception { } } - @Override - public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception { - synchronized(indexMutex) { - pageFile.tx().execute(new Transaction.Closure(){ - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - Entry entry=null; - int counter = 0; - int position = 0; - for (Iterator> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) { - entry = iterator.next(); - if(offset > 0 && offset > position) { - position++; - continue; - } - listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) ); - counter++; - position++; - if( counter >= maxReturned ) { - break; - } - } - if( entry!=null ) { - cursorPos = entry.getKey()+1; - } - } - }); - } - } - @Override public void resetBatching() { cursorPos=0; } - @Override public void setBatch(MessageId identity) throws IOException { final String key = identity.toProducerKey(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index cbb1579b579..5a1ab90d589 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -510,11 +510,6 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene } } - @Override - public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception { - - } - @Override public void setBatch(MessageId message) { batch.set((Long)message.getFutureOrSequenceLong()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java index 9ea4b68a84f..a6e7a5ae69a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java @@ -32,6 +32,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import jakarta.jms.Connection; import jakarta.jms.JMSException; @@ -44,8 +46,10 @@ public class KahaDBOffsetRecoveryListenerTest { + private static final Logger logger = LoggerFactory.getLogger(KahaDBOffsetRecoveryListenerTest.class); + protected BrokerService brokerService = null; - protected KahaDBStore kaha = null; + protected BrokerService restartBrokerService = null; @Before public void beforeEach() throws Exception { @@ -55,7 +59,7 @@ public void beforeEach() throws Exception { @After public void afterEach() { brokerService = null; - kaha = null; + restartBrokerService = null; } protected BrokerService createBroker(KahaDBStore kaha) throws Exception { @@ -69,39 +73,63 @@ protected BrokerService createBroker(KahaDBStore kaha) throws Exception { private KahaDBStore createStore(boolean delete) throws IOException { KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File("target/activemq-data/kahadb-recovery-tests")); + kaha.setJournalMaxFileLength(1024*100); + kaha.setDirectory(new File("target" + File.separator + "activemq-data" + File.separator + "kahadb-recovery-tests")); if( delete ) { kaha.deleteAllMessages(); } return kaha; } - protected void runOffsetTest(int sendCount, int expectedMessageCount, int recoverOffset, int recoverCount, int expectedRecoverCount, int expectedRecoverIndex, String queueName) throws Exception { - kaha = createStore(true); - kaha.setJournalMaxFileLength(1024*100); - brokerService = createBroker(kaha); + protected void runOffsetTest(final int sendCount, final int expectedMessageCount, final int recoverOffset, final int recoverCount, final int expectedRecoverCount, final int expectedRecoverIndex, final String queueName) throws Exception { + runOffsetLoopTest(sendCount, expectedMessageCount, recoverOffset, recoverCount, expectedRecoverCount, expectedRecoverIndex, queueName, 1, false); + } + + protected void runOffsetLoopTest(final int sendCount, final int expectedMessageCount, final int recoverOffset, final int recoverCount, final int expectedRecoverCount, final int expectedRecoverIndex, final String queueName, final int loopCount, final boolean repeatExpected) throws Exception { + KahaDBStore kahaDBStore = createStore(true); + brokerService = createBroker(kahaDBStore); sendMessages(sendCount, queueName); - brokerService.stop(); - brokerService.waitUntilStopped(); - TestMessageRecoveryListener testMessageRecoveryListener = new TestMessageRecoveryListener(); - kaha = createStore(false); - kaha.start(); - MessageStore messageStore = kaha.createQueueMessageStore(new ActiveMQQueue(queueName)); - messageStore.start(); - assertEquals(Integer.valueOf(expectedMessageCount), Integer.valueOf(messageStore.getMessageCount())); - messageStore.recoverNextMessages(recoverOffset, recoverCount, testMessageRecoveryListener); - messageStore.stop(); - kaha.stop(); + MessageStore messageStore = kahaDBStore.createQueueMessageStore(new ActiveMQQueue(queueName)); + + int tmpExpectedRecoverCount = expectedRecoverCount; + int tmpExpectedRecoverIndex = expectedRecoverIndex; + int tmpRecoverOffset = recoverOffset; + + for(int i=0; i= 0) { - assertEquals(Integer.valueOf(expectedRecoverIndex), (Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index")); + if(tmpExpectedRecoverIndex >= 0) { + assertEquals(Integer.valueOf(tmpExpectedRecoverIndex), (Integer)testMessageRecoveryListener.getRecoveredMessages().get(0).getProperty("index")); + } + + if(!repeatExpected) { + int nextExpectedRecoverCount = calculateExpectedRecoverCount(tmpRecoverOffset, tmpExpectedRecoverCount, expectedMessageCount); + int nextExpectedRecoverIndex = calculateExpectedRecoverIndex(tmpRecoverOffset, tmpExpectedRecoverCount, tmpExpectedRecoverIndex, expectedMessageCount); + int nextRecoverOffset = calculateRecoverOffset(tmpRecoverOffset, recoverCount, expectedMessageCount); + + tmpExpectedRecoverCount = nextExpectedRecoverCount; + tmpExpectedRecoverIndex = nextExpectedRecoverIndex; + tmpRecoverOffset = nextRecoverOffset; + } } - brokerService = createBroker(kaha); + brokerService.stop(); + brokerService.waitUntilStopped(); + + restartBrokerService = createBroker(createStore(false)); + restartBrokerService.start(); + restartBrokerService.waitUntilStarted(30_000l); assertEquals(sendCount, receiveMessages(queueName)); + + restartBrokerService.stop(); + restartBrokerService.waitUntilStopped(); } @Test @@ -134,10 +162,18 @@ public void testOffsetEmptyQueue() throws Exception { runOffsetTest(0, 0, 10_000, 1, 0, -1, "TEST.OFFSET.EMPTY"); } + @Test + public void testOffsetWalk() throws Exception { + runOffsetLoopTest(10_000, 10_000, 9_000, 200, 200, 9_000, "TEST.OFFSET.WALK", 8, false); + } + + @Test + public void testOffsetRepeat() throws Exception { + runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, "TEST.OFFSET.REPEAT", 10, true); + } + private void sendMessages(int count, String queueName) throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); - cf.setUseAsyncSend(true); - cf.setProducerWindowSize(1024); cf.setWatchTopicAdvisories(false); Connection connection = cf.createConnection(); @@ -179,6 +215,38 @@ private String createContent(int i) { return sb.toString(); } + private static int calculateExpectedRecoverCount(final int recoverOffset, final int expectedRecoverCount, final int expectedMessageCount) { + int nextOffset = calculateRecoverOffset(recoverOffset, expectedRecoverCount, expectedMessageCount); + if(nextOffset >= expectedMessageCount) { + return 0; + } + + int nextRange = nextOffset + expectedRecoverCount; + int remaining = expectedMessageCount - nextRange; + + if(remaining <= 0) { + return expectedRecoverCount - remaining; + } + + return expectedRecoverCount; + } + + private static int calculateExpectedRecoverIndex(final int recoverOffset, final int expectedRecoverCount, final int expectedRecoverIndex, final int expectedMessageCount) { + int nextOffset = calculateRecoverOffset(recoverOffset, expectedRecoverCount, expectedMessageCount); + + if(nextOffset >= (expectedMessageCount - 1)) { + return -1; + } + + return nextOffset; + } + + private static int calculateRecoverOffset(final int recoverOffset, final int expectedRecoverCount, final int expectedMessageCount) { + return (recoverOffset + expectedRecoverCount); + } + + // int tmpRecoverOffset = recoverOffset; + static class TestMessageRecoveryListener implements MessageRecoveryListener { List recoveredMessageIds = new LinkedList<>();