Skip to content

Commit

Permalink
[AMQ-9646] Support selecting specific messages for command line backup
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Feb 3, 2025
1 parent a4a4227 commit e09ced3
Show file tree
Hide file tree
Showing 11 changed files with 631 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;

public class MessageRecoveryContext implements MessageRecoveryListener {

public static final int DEFAULT_MAX_MESSAGE_COUNT_RETURNED = 100;
public static final boolean DEFAULT_USE_DEDICATED_CURSOR = true;

// Config
private boolean useDedicatedCursor;
private int maxMessageCountReturned;
private Long offset = null;
private String startMessageId = null;
private String endMessageId = null;
private final MessageRecoveryListener messageRecoveryListener;

// State
private Long endSequenceId = Long.MAX_VALUE;

// Stats
private AtomicInteger recoveredCount = new AtomicInteger(0);

MessageRecoveryContext(final MessageRecoveryListener messageRecoveryListener, final String startMessageId, final String endMessageId, final Long offset, final Integer maxMessageCountReturned, final Boolean useDedicatedCursor) {
if(maxMessageCountReturned != null && maxMessageCountReturned < 0) {
throw new IllegalArgumentException("maxMessageCountReturned must be a positive integer value");
}
if(messageRecoveryListener == null) {
throw new IllegalArgumentException("MessageRecoveryListener must be specified");
}
if(offset != null) {
if(offset < 0L) {
throw new IllegalArgumentException("offset must be a positive integer value");
}
if(startMessageId != null) {
throw new IllegalArgumentException("Only one of offset and startMessageId may be specified");
}
}
this.endMessageId = endMessageId;
this.maxMessageCountReturned = (maxMessageCountReturned != null ? maxMessageCountReturned : DEFAULT_MAX_MESSAGE_COUNT_RETURNED);
this.messageRecoveryListener = messageRecoveryListener;
this.offset = offset;
this.startMessageId = startMessageId;
this.useDedicatedCursor = (useDedicatedCursor != null ? useDedicatedCursor : DEFAULT_USE_DEDICATED_CURSOR);
}

public boolean isUseDedicatedCursor() {
return this.useDedicatedCursor;
}

public int getMaxMessageCountReturned() {
return this.maxMessageCountReturned;
}

public Long getOffset() {
return this.offset;
}

public String getEndMessageId() {
return this.endMessageId;
}

public String getStartMessageId() {
return this.startMessageId;
}

public MessageRecoveryListener getMessageRecoveryListener() {
return this.messageRecoveryListener;
}

// MessageRecoveryContext functions
public void setEndSequenceId(long endSequenceId) {
this.endSequenceId = endSequenceId;
}

public boolean canRecoveryNextMessage(Long sequenceId) {
if (getRecoveredCount() >= getMaxMessageCountReturned() ||
!this.messageRecoveryListener.canRecoveryNextMessage() ||
sequenceId >= endSequenceId) {
return false;
}
return true;
}

// MessageRecoveryListener functions
public boolean recoverMessage(Message message) throws Exception {
boolean tmpReturned = this.messageRecoveryListener.recoverMessage(message);
this.recoveredCount.incrementAndGet();
return tmpReturned;
}

@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return this.messageRecoveryListener.recoverMessageReference(ref);
}

@Override
public boolean hasSpace() {
return this.messageRecoveryListener.hasSpace();
}

@Override
public boolean isDuplicate(MessageId ref) {
return this.messageRecoveryListener.isDuplicate(ref);
}

// Metrics
public int getRecoveredCount() {
return this.recoveredCount.get();
}

public static class Builder {

private Boolean useDedicatedCursor;
private Integer maxMessageCountReturned;
private Long offset;
private String startMessageId;
private String endMessageId;
private MessageRecoveryListener messageRecoveryListener;

public Builder useDedicatedCursor(final boolean useDedicatedCursor) {
this.useDedicatedCursor = useDedicatedCursor;
return this;
}

public Builder maxMessageCountReturned(final int maxMessageCountReturned) {
this.maxMessageCountReturned = maxMessageCountReturned;
return this;
}

public Builder offset(final long offset) {
this.offset = offset;
return this;
}

public Builder endMessageId(final String endMessageId) {
this.endMessageId = endMessageId;
return this;
}

public Builder startMessageId(final String startMessageId) {
this.startMessageId = startMessageId;
return this;
}

public Builder messageRecoveryListener(final MessageRecoveryListener messageRecoveryListener) {
this.messageRecoveryListener = messageRecoveryListener;
return this;
}

public MessageRecoveryContext build() {
return new MessageRecoveryContext(messageRecoveryListener, startMessageId, endMessageId, offset, maxMessageCountReturned, useDedicatedCursor);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,12 @@ public interface MessageStore extends Service {
*/
void resetBatching();

void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;

void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception;
default void recoverMessages(final MessageRecoveryContext messageRecoveryContext) throws Exception {
throw new UnsupportedOperationException("recoverMessages(messageRecoveryContext) is not supported");
}

void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;

void dispose(ConnectionContext context);

Expand Down Expand Up @@ -211,4 +214,5 @@ public interface MessageStore extends Service {
void updateMessage(Message message) throws IOException;

void registerIndexListener(IndexListener indexListener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public long getMessageSize() throws IOException {
}

@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(offset, maxReturned, listener);
public void recoverMessages(final MessageRecoveryContext messageRecoveryContext) throws Exception {
delegate.recoverMessages(messageRecoveryContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void delete() {
}

@Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
Expand All @@ -130,32 +130,6 @@ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listene
}
}

@Override
public void recoverNextMessages(int offset, int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
int position = 0;
for (Map.Entry<MessageId, Message> entry : messageTable.entrySet()) {
if(offset > 0 && offset > position) {
position++;
continue;
}
if (pastLackBatch) {
Object msg = entry.getValue();
lastBatchId = entry.getKey();
if (msg.getClass() == MessageId.class) {
listener.recoverMessageReference((MessageId) msg);
} else {
listener.recoverMessage((Message) msg);
}
} else {
pastLackBatch = entry.getKey().equals(lastBatchId);
}
position++;
}
}
}

@Override
public void resetBatching() {
lastBatchId = null;
Expand Down
Loading

0 comments on commit e09ced3

Please sign in to comment.