Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5037: option to limit mirror propagation #5220

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive

}

@Override
public boolean filterRef(MessageReference ref, ServerConsumer consumer) {
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
return plugSender.filterRef(ref);
}

@Override
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.connect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection
public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) {
if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) {
Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null);
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, dispatchCapability, null);
connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) {
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null);
connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null);
} else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) {
connectReceiver(protonRemotingConnection, session, sessionContext, queue);
}
Expand Down Expand Up @@ -450,21 +451,25 @@ private void doConnect() {
final Queue queue = server.locateQueue(getMirrorSNF(replica));

final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica);
final Symbol[] desiredCapabilities;

ArrayList<Symbol> desiredCapabilitiesList = new ArrayList<>();
desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY);
if (coreTunnelingEnabled) {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY,
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT};
} else {
desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT);
}
if (replica.isNoForward()) {
desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD);
}

final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};
final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{});

final Symbol[] requiredOfferedCapabilities = replica.isNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY};

connectSender(queue,
queue.getName().toString(),
mirrorControllerSource::setLink,
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
(r) -> mirrorControllerSource.shouldFilterRef(r),
server.getNodeID().toString(),
desiredCapabilities,
null,
Expand Down Expand Up @@ -771,6 +776,7 @@ private void connectSender(Queue queue,
String targetName,
java.util.function.Consumer<Sender> senderConsumer,
java.util.function.Consumer<? super MessageReference> beforeDeliver,
java.util.function.Predicate<? super MessageReference> shouldFilterRef,
String brokerID,
Symbol[] desiredCapabilities,
Symbol[] targetCapabilities,
Expand Down Expand Up @@ -831,7 +837,7 @@ private void connectSender(Queue queue,

// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver);
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef);
try {
if (!cancelled.get()) {
if (futureTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id");
public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.of(BROKER_ID.toString());
public static final SimpleString NO_FORWARD_SOURCE = SimpleString.of("x-opt-amq-mr-no-fwd-src");
public static final SimpleString RECEIVER_ID_FILTER = SimpleString.of("x-opt-amq-mr-rcv-id-filter");

// Events:
public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
Expand All @@ -89,9 +91,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward");

public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString());
public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString());

private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));

Expand Down Expand Up @@ -230,12 +234,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception {
public void deleteAddress(AddressInfo addressInfo) throws Exception {
logger.trace("{} deleteAddress {}", server, addressInfo);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
return;
}
if (ignoreAddress(addressInfo.getName())) {
return;
}

if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -246,6 +255,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception {
public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
logger.trace("{} createQueue {}", server, queueConfiguration);

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse());
Expand All @@ -264,6 +277,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception
}
return;
}

if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
routeMirrorCommand(server, message);
Expand All @@ -276,6 +290,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti
logger.trace("{} deleteQueue {}/{}", server, address, queue);
}

if (isBlockedByNoForward()) {
return;
}

if (invalidTarget(getControllerInUse())) {
return;
}
Expand Down Expand Up @@ -310,6 +328,14 @@ private boolean invalidTarget(MirrorController controller) {
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
}

private boolean isBlockedByNoForward() {
return getControllerInUse() != null && getControllerInUse().isNoForward();
}

private boolean isBlockedByNoForward(Message message) {
return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD));
}

private boolean ignoreAddress(SimpleString address) {
if (address.startsWith(server.getConfiguration().getManagementAddress())) {
return true;
Expand Down Expand Up @@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) {
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);

if (isBlockedByNoForward(message)) {
logger.trace("sendMessage::server {} is discarding the send because its source is setting a noForward policy", server);
return;
}

if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
Expand All @@ -353,6 +384,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context)
return;
}

logger.trace("sendMessage::{} send message {}", server, message);

try {
context.setReusable(false);

Expand Down Expand Up @@ -467,6 +500,29 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier,
}
}

/**
* Checks if the message ref should be filtered or not.
* @param ref the message to check
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
* than the remoteMirrorID, false otherwise.
*/
public boolean shouldFilterRef(MessageReference ref) {
Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER);
if (filterID != null) {
String remoteMirrorId = getRemoteMirrorId();
if (remoteMirrorId != null) {
if (remoteMirrorId.equals(filterID)) {
return false;
} else {
logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID);
return true;
}
}
return false;
}
return false;
}

/** This method will return the brokerID used by the message */
private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) {
String brokerID = referenceIDSupplier.getServerID(ref);
Expand Down Expand Up @@ -543,6 +599,21 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason);
}

String noForwardSource = null;
String remoteMirrorId = getRemoteMirrorId();
// The remote mirror ID might not be known at this point because the remote mirror hasn't been connected yet or connection was lost.
// However, if the remote mirror ID is known there's a possibility to early check if the acknowledgment is supposed to reach the destination
// based on the noForward policy of the message about to be acked.
if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) {
noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE));
if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) {
if (logger.isInfoEnabled()) {
logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
}
return;
}
}

MirrorController controllerInUse = getControllerInUse();

// Retried ACKs are not forwarded.
Expand Down Expand Up @@ -578,6 +649,13 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin
String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker.
long internalID = idSupplier.getID(ref);
Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason);

// When the remote mirror ID couldn't be known in advance, the ack is annotated with the ID it is supposed to reach. This value
// will be used to filter out acks that do violate the configured noForward policy.
if (remoteMirrorId == null && noForwardSource != null) {
messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource);
}

if (sync) {
OperationContext operationContext;
operationContext = OperationContextImpl.getContext(server.getExecutorFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
Expand All @@ -53,6 +53,7 @@
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
Expand All @@ -77,29 +78,39 @@
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;

public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<TargetMirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();

public static void setControllerInUse(MirrorController controller) {
public static void setControllerInUse(TargetMirrorController controller) {
CONTROLLER_THREAD_LOCAL.set(controller);
}

public static MirrorController getControllerInUse() {
public static TargetMirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}

private boolean noForwarding = false;

@Override
public boolean isNoForward() {
return noForwarding;
}

/**
* Objects of this class can be used by either transaction or by OperationContext.
* It is important that when you're using the transactions you clear any references to
Expand Down Expand Up @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
this.configuration = server.getConfiguration();
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
this.noForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD);
}

@Override
Expand Down Expand Up @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat

message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID);
if (noForwarding) {
message.setBrokerProperty(INTERNAL_NO_FORWARD, true);
lavocatt marked this conversation as resolved.
Show resolved Hide resolved
message.setBrokerProperty(NO_FORWARD_SOURCE, getRemoteMirrorId());
}

if (internalAddress != null) {
message.setAddress(internalAddress);
Expand Down
Loading