diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index c329bbd70b6..5006f548828 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1789,4 +1789,9 @@ public void run() { } }); } + + @Override + public SessionContext getSessionContext() { + return sessionContext; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index b25e1a869ef..01dd9e43f8e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public interface ClientSessionInternal extends ClientSession { @@ -130,4 +131,6 @@ void handleReceiveContinuation(ConsumerContext consumerID, String getNodeId(); boolean isWritable(ReadyListener callback); + + SessionContext getSessionContext(); } diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index f3bd337c3d1..0a64c540002 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -16,28 +16,6 @@ */ package org.apache.activemq.artemis.jms.bridge.impl; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.api.core.client.FailoverEventListener; -import org.apache.activemq.artemis.api.core.client.FailoverEventType; -import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; -import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger; -import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; -import org.apache.activemq.artemis.jms.bridge.DestinationFactory; -import org.apache.activemq.artemis.jms.bridge.JMSBridge; -import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl; -import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; -import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.jms.client.ActiveMQMessage; -import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle; -import org.apache.activemq.artemis.service.extensions.ServiceUtils; -import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry; -import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; -import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; -import org.apache.activemq.artemis.utils.PasswordMaskingUtil; -import org.apache.activemq.artemis.utils.SensitiveDataCodec; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -72,6 +50,29 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.client.FailoverEventListener; +import org.apache.activemq.artemis.api.core.client.FailoverEventType; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.jms.bridge.ActiveMQJMSBridgeLogger; +import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; +import org.apache.activemq.artemis.jms.bridge.DestinationFactory; +import org.apache.activemq.artemis.jms.bridge.JMSBridge; +import org.apache.activemq.artemis.jms.bridge.JMSBridgeControl; +import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; +import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.jms.server.ActiveMQJMSServerBundle; +import org.apache.activemq.artemis.service.extensions.ServiceUtils; +import org.apache.activemq.artemis.service.extensions.xa.recovery.ActiveMQRegistry; +import org.apache.activemq.artemis.service.extensions.xa.recovery.XARecoveryConfig; +import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; +import org.apache.activemq.artemis.utils.PasswordMaskingUtil; +import org.apache.activemq.artemis.utils.SensitiveDataCodec; + public final class JMSBridgeImpl implements JMSBridge { private static final String[] RESOURCE_RECOVERY_CLASS_NAMES = new String[]{"org.jboss.as.messaging.jms.AS7RecoveryRegistry"}; @@ -483,6 +484,8 @@ public void stop() throws Exception { ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); } + stopSessionFailover(); + try { tx.rollback(); } @@ -523,6 +526,15 @@ public void stop() throws Exception { } } + private void stopSessionFailover() { + XASession xaSource = (XASession) sourceSession; + XASession xaTarget = (XASession) targetSession; + + ((ClientSessionInternal) xaSource.getXAResource()).getSessionContext().releaseCommunications(); + ((ClientSessionInternal) xaTarget.getXAResource()).getSessionContext().releaseCommunications(); + } + + @Override public synchronized boolean isStarted() { return started; } diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index f7f439d1b1e..fe7f8b0d098 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -34,22 +34,31 @@ import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; +import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory; import org.apache.activemq.artemis.service.extensions.ServiceUtils; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.ra.DummyTransactionManager; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; public class JMSBridgeTest extends BridgeTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + @Rule + public Timeout timeout = new Timeout(120000); + // MaxBatchSize but no MaxBatchTime @Test @@ -1319,6 +1328,48 @@ public void testNoMessageIDInHeader() throws Exception { } } + @Test + public void testCrashDestStopBridge() throws Exception { + cff1xa = new ConnectionFactoryFactory() { + @Override + public Object createConnectionFactory() throws Exception { + ActiveMQXAConnectionFactory cf = (ActiveMQXAConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.XA_CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, params1)); + + cf.setReconnectAttempts(-1); + cf.setCallFailoverTimeout(-1); + cf.setCallTimeout(10000); + cf.setBlockOnNonDurableSend(true); + cf.setBlockOnDurableSend(true); + cf.setCacheLargeMessagesClient(true); + + return cf; + } + + }; + + JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 5000, null, null, false).setBridgeName("test-bridge"); + addActiveMQComponent(bridge); + bridge.setTransactionManager(newTransactionManager()); + + bridge.start(); + + // Now crash the dest server + + JMSBridgeTest.log.info("About to crash server"); + + jmsServer1.stop(); + + // Now stop the bridge while the failover is happening + + JMSBridgeTest.log.info("About to stop the bridge"); + + bridge.stop(); + + // Shutdown the source server + + jmsServer0.stop(); + } + // Private ------------------------------------------------------------------------------- private void testStress(final QualityOfServiceMode qosMode,