Skip to content

Commit 89a65c1

Browse files
committed
Migrated and fixed DiscoveryNetworkReconnectTest (it was not working before the migration)
1 parent 49e9976 commit 89a65c1

File tree

1 file changed

+86
-128
lines changed

1 file changed

+86
-128
lines changed

activemq-unit-tests/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java

Lines changed: 86 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@
1717
package org.apache.activemq.transport.discovery;
1818

1919
import static org.junit.Assert.assertTrue;
20+
import static org.junit.Assert.fail;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.timeout;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.when;
2027

2128
import java.net.URI;
2229
import java.util.concurrent.Semaphore;
@@ -30,24 +37,13 @@
3037
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
3138
import org.apache.activemq.util.SocketProxy;
3239
import org.apache.activemq.util.Wait;
33-
import org.hamcrest.BaseMatcher;
34-
import org.hamcrest.Description;
35-
import org.jmock.Expectations;
36-
import org.jmock.Mockery;
37-
import org.jmock.api.Invocation;
38-
import org.jmock.integration.junit4.JMock;
39-
import org.jmock.integration.junit4.JUnit4Mockery;
40-
import org.jmock.lib.action.CustomAction;
41-
import org.jmock.lib.legacy.ClassImposteriser;
4240
import org.junit.After;
4341
import org.junit.Before;
4442
import org.junit.Test;
45-
import org.junit.runner.RunWith;
4643
import org.slf4j.Logger;
4744
import org.slf4j.LoggerFactory;
4845

4946

50-
@RunWith(JMock.class)
5147
public class DiscoveryNetworkReconnectTest {
5248

5349
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryNetworkReconnectTest.class);
@@ -57,42 +53,12 @@ public class DiscoveryNetworkReconnectTest {
5753
final Semaphore mbeanRegistered = new Semaphore(0);
5854
final Semaphore mbeanUnregistered = new Semaphore(0);
5955
BrokerService brokerA, brokerB;
60-
Mockery context;
6156
ManagementContext managementContext;
6257
DiscoveryAgent agent;
6358
SocketProxy proxy;
6459

65-
// ignore the hostname resolution component as this is machine dependent
66-
class NetworkBridgeObjectNameMatcher<T> extends BaseMatcher<T> {
67-
T name;
68-
NetworkBridgeObjectNameMatcher(T o) {
69-
name = o;
70-
}
71-
72-
@Override
73-
public boolean matches(Object arg0) {
74-
ObjectName other = (ObjectName) arg0;
75-
ObjectName mine = (ObjectName) name;
76-
LOG.info("Match: " + mine + " vs: " + other);
77-
78-
if (!"networkConnectors".equals(other.getKeyProperty("connector"))) {
79-
return false;
80-
}
81-
return other.getKeyProperty("connector").equals(mine.getKeyProperty("connector")) &&
82-
other.getKeyProperty("networkBridge") != null && mine.getKeyProperty("networkBridge") != null;
83-
}
84-
85-
@Override
86-
public void describeTo(Description arg0) {
87-
arg0.appendText(this.getClass().getName());
88-
}
89-
}
90-
9160
@Before
9261
public void setUp() throws Exception {
93-
context = new JUnit4Mockery() {{
94-
setImposteriser(ClassImposteriser.INSTANCE);
95-
}};
9662
brokerA = new BrokerService();
9763
brokerA.setBrokerName("BrokerA");
9864
configure(brokerA);
@@ -101,73 +67,31 @@ public void setUp() throws Exception {
10167
brokerA.waitUntilStarted();
10268

10369
proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
104-
managementContext = context.mock(ManagementContext.class);
105-
106-
context.checking(new Expectations(){{
107-
allowing(managementContext).getJmxDomainName(); will (returnValue("Test"));
108-
allowing(managementContext).setBrokerName("BrokerNC");
109-
allowing(managementContext).start();
110-
allowing(managementContext).isCreateConnector();
111-
allowing(managementContext).stop();
112-
allowing(managementContext).isConnectorStarted();
113-
114-
// expected MBeans
115-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
116-
new ObjectName("Test:type=Broker,brokerName=BrokerNC"))));
117-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
118-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,service=Health"))));
119-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
120-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,connector=networkConnectors,networkConnectorName=NC"))));
121-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
122-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,service=Log4JConfiguration"))));
123-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
124-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.Connection"))));
125-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
126-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.NetworkBridge"))));
127-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
128-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.MasterBroker"))));
129-
allowing(managementContext).registerMBean(with(any(Object.class)), with(equal(
130-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,service=jobScheduler,jobSchedulerName=JMS"))));
131-
allowing(managementContext).getObjectInstance(with(equal(
132-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,connector=networkConnectors,networkConnectorName=NC"))));
133-
134-
135-
atLeast(maxReconnects - 1).of (managementContext).registerMBean(with(any(Object.class)), with(new NetworkBridgeObjectNameMatcher<ObjectName>(
136-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,connector=networkConnectors,networkConnectorName=NC,networkBridge=localhost/127.0.0.1_"
137-
+ proxy.getUrl().getPort())))); will(new CustomAction("signal register network mbean") {
138-
@Override
139-
public Object invoke(Invocation invocation) throws Throwable {
140-
LOG.info("Mbean Registered: " + invocation.getParameter(0));
141-
mbeanRegistered.release();
142-
return new ObjectInstance((ObjectName)invocation.getParameter(1), "discription");
143-
}
144-
});
145-
atLeast(maxReconnects - 1).of (managementContext).unregisterMBean(with(new NetworkBridgeObjectNameMatcher<ObjectName>(
146-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,connector=networkConnectors,networkConnectorName=NC,networkBridge=localhost/127.0.0.1_"
147-
+ proxy.getUrl().getPort())))); will(new CustomAction("signal unregister network mbean") {
148-
@Override
149-
public Object invoke(Invocation invocation) throws Throwable {
150-
LOG.info("Mbean Unregistered: " + invocation.getParameter(0));
151-
mbeanUnregistered.release();
152-
return null;
153-
}
154-
});
155-
156-
allowing(managementContext).unregisterMBean(with(equal(
157-
new ObjectName("Test:type=Broker,brokerName=BrokerNC"))));
158-
allowing(managementContext).unregisterMBean(with(equal(
159-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,service=Health"))));
160-
allowing(managementContext).unregisterMBean(with(equal(
161-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,connector=networkConnectors,networkConnectorName=NC"))));
162-
allowing(managementContext).unregisterMBean(with(equal(
163-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,service=Log4JConfiguration"))));
164-
allowing(managementContext).unregisterMBean(with(equal(
165-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.Connection"))));
166-
allowing(managementContext).unregisterMBean(with(equal(
167-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.NetworkBridge"))));
168-
allowing(managementContext).unregisterMBean(with(equal(
169-
new ObjectName("Test:type=Broker,brokerName=BrokerNC,destinationType=Topic,destinationName=ActiveMQ.Advisory.MasterBroker"))));
170-
}});
70+
managementContext = mock(ManagementContext.class);
71+
72+
when(managementContext.getJmxDomainName()).thenReturn("Test");
73+
when(managementContext.isCreateConnector()).thenReturn(false);
74+
when(managementContext.isConnectorStarted()).thenReturn(false);
75+
76+
// Mock MBean registration to release semaphore - only for network connector
77+
doAnswer(invocation -> {
78+
ObjectName name = invocation.getArgument(1);
79+
LOG.info("MBean registered: " + name);
80+
if (name.toString().contains("networkConnectors")) {
81+
mbeanRegistered.release();
82+
}
83+
return mock(ObjectInstance.class);
84+
}).when(managementContext).registerMBean(any(), any(ObjectName.class));
85+
86+
// Mock MBean unregistration to release semaphore - only for network connector
87+
doAnswer(invocation -> {
88+
ObjectName name = invocation.getArgument(0);
89+
LOG.info("MBean unregistered: " + name);
90+
if (name.toString().contains("networkConnectors")) {
91+
mbeanUnregistered.release();
92+
}
93+
return null;
94+
}).when(managementContext).unregisterMBean(any(ObjectName.class));
17195

17296
brokerB = new BrokerService();
17397
brokerB.setManagementContext(managementContext);
@@ -177,11 +101,20 @@ public Object invoke(Invocation invocation) throws Throwable {
177101

178102
@After
179103
public void tearDown() throws Exception {
180-
brokerA.stop();
181-
brokerA.waitUntilStopped();
182-
brokerB.stop();
183-
brokerB.waitUntilStopped();
184-
proxy.close();
104+
if (agent != null) {
105+
agent.stop();
106+
}
107+
if (brokerB != null) {
108+
brokerB.stop();
109+
brokerB.waitUntilStopped();
110+
}
111+
if (brokerA != null) {
112+
brokerA.stop();
113+
brokerA.waitUntilStopped();
114+
}
115+
if (proxy != null) {
116+
proxy.close();
117+
}
185118
}
186119

187120
private void configure(BrokerService broker) {
@@ -191,16 +124,18 @@ private void configure(BrokerService broker) {
191124

192125
@Test
193126
public void testMulicastReconnect() throws Exception {
127+
// Start the discovery agent BEFORE starting brokerB to ensure it discovers via proxy
128+
agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
129+
agent.registerService(proxy.getUrl().toString());
130+
agent.start();
131+
132+
// Give the agent time to start advertising
133+
Thread.sleep(500);
194134

195135
brokerB.addNetworkConnector(discoveryAddress + "&discovered.trace=true&discovered.wireFormat.maxInactivityDuration=1000&discovered.wireFormat.maxInactivityDurationInitalDelay=1000");
196136
brokerB.start();
197137
brokerB.waitUntilStarted();
198138

199-
// control multicast advertise agent to inject proxy
200-
agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
201-
agent.registerService(proxy.getUrl().toString());
202-
agent.start();
203-
204139
doReconnect();
205140
}
206141

@@ -214,8 +149,13 @@ public void testSimpleReconnect() throws Exception {
214149
}
215150

216151
private void doReconnect() throws Exception {
152+
// Wait for initial network connector mbean registration
153+
assertTrue("network connector mbean registered within 1 minute",
154+
mbeanRegistered.tryAcquire(60, TimeUnit.SECONDS));
217155

218156
for (int i=0; i<maxReconnects; i++) {
157+
LOG.info("Reconnect iteration: " + (i + 1));
158+
219159
// Wait for connection
220160
assertTrue("we got a network connection in a timely manner", Wait.waitFor(new Wait.Condition() {
221161
@Override
@@ -224,20 +164,38 @@ public boolean isSatisified() throws Exception {
224164
}
225165
}));
226166

227-
// wait for network connector
228-
assertTrue("network connector mbean registered within 1 minute", mbeanRegistered.tryAcquire(60, TimeUnit.SECONDS));
229-
230167
// force an inactivity timeout via the proxy
231168
proxy.pause();
232-
233-
// wait for the inactivity timeout and network shutdown
234-
assertTrue("network connector mbean unregistered within 1 minute", mbeanUnregistered.tryAcquire(60, TimeUnit.SECONDS));
235-
236-
// whack all connections
169+
170+
// Wait a bit longer than the inactivity duration to ensure timeout triggers
171+
// maxInactivityDuration=1000ms, so wait at least 2-3x that
172+
Thread.sleep(3000);
173+
174+
// Now actually close the connections to force disconnect
237175
proxy.close();
176+
177+
// Wait for connections to be cleared
178+
assertTrue("proxy connections cleared", Wait.waitFor(new Wait.Condition() {
179+
@Override
180+
public boolean isSatisified() throws Exception {
181+
return proxy.connections.size() == 0;
182+
}
183+
}));
238184

239-
// let a reconnect succeed
240-
proxy.reopen();
185+
// let a reconnect succeed for next iteration
186+
if (i < maxReconnects - 1) {
187+
proxy.reopen();
188+
// Give some time for reconnection to be attempted
189+
Thread.sleep(1500);
190+
}
241191
}
192+
193+
// Now stop the broker which should unregister the network connector mbean
194+
brokerB.stop();
195+
brokerB.waitUntilStopped();
196+
197+
// wait for the network connector mbean to be unregistered
198+
assertTrue("network connector mbean unregistered within 1 minute",
199+
mbeanUnregistered.tryAcquire(60, TimeUnit.SECONDS));
242200
}
243201
}

0 commit comments

Comments
 (0)