Skip to content

Commit

Permalink
[AMQ-9646] Support selecting specific messages for command line backup
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Jan 27, 2025
1 parent a4a4227 commit 4cbb760
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,17 @@ public interface MessageStore extends Service {

void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception;
default void recoverNextMessages(long offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
throw new UnsupportedOperationException("recoverNextMessages(offset,maxReturned,listener) is not supported");
}

default void recoverNextMessages(long offset, int maxReturned, MessageRecoveryListener listener, boolean useDedicatedCursor) throws Exception {
throw new UnsupportedOperationException("recoverNextMessages(offset,maxReturned,listener,useDedicatedCursor) is not supported");
}

default void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception {
throw new UnsupportedOperationException("recoverNextMessages(startMsgId,endMsgId,maxReturned,listener,useDedicatedCursor) is not supported");
}

void dispose(ConnectionContext context);

Expand Down Expand Up @@ -211,4 +221,5 @@ public interface MessageStore extends Service {
void updateMessage(Message message) throws IOException;

void registerIndexListener(IndexListener indexListener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,25 @@ public long getMessageSize() throws IOException {
}

@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener);
}

@Override
public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener, useDedicatedCursor);
}

@Override
public void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception {
delegate.recoverNextMessages(startMsgId, endMsgId, maxReturned, listener, useDedicatedCursor);
}

@Override
public void resetBatching() {
delegate.resetBatching();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void delete() {
}

@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
Expand All @@ -130,32 +130,6 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if(offset > 0 && offset > position) {
position++;
continue;
}
if (pastLackBatch) {
Object msg = entry.getValue();
lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId) msg);
} else {
listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
}
position++;
}
}
}

@Override
public void resetBatching() {
lastBatchId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
Expand All @@ -43,7 +46,7 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.UTF8Buffer;
Expand All @@ -62,6 +65,11 @@ public class StoreBackup {
String queue;
Integer offset;
Integer count;
String indexes;
Collection<Integer> indexesList;

String startMsgId;
String endMsgId;

private final ObjectMapper mapper = new ObjectMapper();
private final AsciiBuffer ds_kind = new AsciiBuffer("ds");
Expand All @@ -88,6 +96,14 @@ public void execute() throws Exception {
throw new Exception("optional --offset and --count must be specified together");
}

if ((startMsgId != null || endMsgId != null) && queue == null) {
throw new Exception("optional --queue must be specified when using startMsgId or endMsgId");
}

if (indexes != null && !indexes.isBlank()) {
indexesList = Stream.of(indexes.split(",")).map(index -> Integer.parseInt(index.trim())).collect(Collectors.toList());
}

setFile(new File(filename));
System.out.println("Loading: " + config);
BrokerFactory.setStartDefault(false); // to avoid the broker auto-starting..
Expand Down Expand Up @@ -178,7 +194,19 @@ public boolean recoverMessage(Message message) throws IOException {
return true;
}
};
if(offset != null) {
if(startMsgId != null || endMsgId != null) {
System.out.println("Backing up from startMsgId: " + startMsgId + " to endMsgId: " + endMsgId);
queue.recoverNextMessages(startMsgId, endMsgId, (count != null ? count : Integer.MAX_VALUE), queueRecoveryListener, true);
} else if(indexesList != null) {
System.out.println("Backing up using indexes count: " + indexesList.size());
for(int idx : indexesList) {
if(idx < 0) {
continue;
}
queue.recoverNextMessages(idx, 1, queueRecoveryListener, true);
}
} else if(offset != null) {
System.out.println("Backing up from offset: " + offset + " count: " + count);
queue.recoverNextMessages(offset, count, queueRecoveryListener);
} else {
queue.recover(queueRecoveryListener);
Expand Down Expand Up @@ -265,14 +293,14 @@ private QueueEntryPB createQueueEntryPB(Message message, long queueKey, long que
private MessagePB createMessagePB(Message message, long messageKey) throws IOException {
DataByteArrayOutputStream mos = new DataByteArrayOutputStream();
mos.writeBoolean(TIGHT_ENCODING);
mos.writeVarInt(OPENWIRE_VERSION);
mos.writeInt(OPENWIRE_VERSION);
wireformat.marshal(message, mos);

MessagePB messageRecord = new MessagePB();
messageRecord.setCodec(codec_id);
messageRecord.setMessageKey(messageKey);
messageRecord.setSize(message.getSize());
messageRecord.setValue(new Buffer(mos.toBuffer().getData()));
messageRecord.setValue(new Buffer(mos.getData()));
return messageRecord;
}

Expand Down Expand Up @@ -323,4 +351,28 @@ public void setCount(int count) {
public Integer getCount() {
return count;
}

public void setIndexes(String indexes) {
this.indexes = indexes;
}

public String getIndexes() {
return indexes;
}

public String getStartMsgId() {
return startMsgId;
}

public void setStartMsgId(String startMsgId) {
this.startMsgId = startMsgId;
}

public String getEndMsgId() {
return endMsgId;
}

public void setEndMsgId(String endMsgId) {
this.endMsgId = endMsgId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,6 @@ public boolean recoverMessageReference(String reference) throws Exception {

}

/**
* @param offset
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
@Override
public void recoverNextMessages(int offset, int maxReturned, final MessageRecoveryListener listener) throws Exception {
throw new UnsupportedOperationException("recoverNextMesage(offset,maxReturned,listener) is not supported.");
}

public void trackRollbackAck(Message message) {
synchronized (rolledBackAcks) {
rolledBackAcks.put((Long)message.getMessageId().getEntryLocator(), message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && iterator
.hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
Expand Down Expand Up @@ -733,35 +733,95 @@ public void execute(Transaction tx) throws Exception {
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
recoverNextMessages(offset, maxReturned, listener, false);
}

@Override
public void recoverNextMessages(final long offset, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
int position = 0;
long tmpOffset = (offset >= 0 ? offset : 0);
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
Set<String> ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
Iterator<Entry<Long, MessageKeys>> iterator = (useDedicatedCursor ? sd.orderIndex.iterator(tx, new MessageOrderCursor(tmpOffset)) : sd.orderIndex.iterator(tx));
while ( iterator.hasNext()) {
entry = iterator.next();

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}

if(offset > 0 && offset > position) {
position++;
Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
sd.orderIndex.stoppedIterating();
}
});
} finally {
indexLock.writeLock().unlock();
}
}

@Override
public void recoverNextMessages(final String startMsgId, final String endMsgId, final int maxReturned, final MessageRecoveryListener listener, final boolean useDedicatedCursor) throws Exception {
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long startOffset = null;
Long endOffset = null;

if(startMsgId != null && !startMsgId.isBlank()) {
startOffset = sd.messageIdIndex.get(tx, startMsgId);
}

if(startOffset == null) {
startOffset = Long.valueOf(0l);
}

if(endMsgId != null && !endMsgId.isBlank()) {
endOffset = sd.messageIdIndex.get(tx, endMsgId);
}

if(endOffset == null) {
endOffset = startOffset + Long.valueOf(maxReturned);
}

if(endOffset < startOffset) {
// Fast fail on invalid arguments
return;
}

Entry<Long, MessageKeys> entry = null;
long tmpOffset = (startOffset >= 0 ? startOffset : 0);
int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener);
Set<String> ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName());
Iterator<Entry<Long, MessageKeys>> iterator = (useDedicatedCursor ? sd.orderIndex.iterator(tx, new MessageOrderCursor(tmpOffset)) : sd.orderIndex.iterator(tx));
while (iterator.hasNext()) {
entry = iterator.next();

if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}

Message msg = loadMessage(entry.getValue().location);
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg);
counter++;
position++;
if (counter >= maxReturned || !listener.canRecoveryNextMessage()) {
if (entry.getKey() >= endOffset || counter >= maxReturned || !listener.canRecoveryNextMessage()) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,43 +241,11 @@ public void execute(Transaction tx) throws Exception {
}
}

@Override
public void recoverNextMessages(final int offset, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageRecord> entry=null;
int counter = 0;
int position = 0;
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
entry = iterator.next();
if(offset > 0 && offset > position) {
position++;
continue;
}
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
counter++;
position++;
if( counter >= maxReturned ) {
break;
}
}
if( entry!=null ) {
cursorPos = entry.getKey()+1;
}
}
});
}
}

@Override
public void resetBatching() {
cursorPos=0;
}


@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,6 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {

}

@Override
public void setBatch(MessageId message) {
batch.set((Long)message.getFutureOrSequenceLong());
Expand Down
Loading

0 comments on commit 4cbb760

Please sign in to comment.