Skip to content

Commit 1175a2c

Browse files
committed
repo use case
1 parent 723b647 commit 1175a2c

File tree

2 files changed

+152
-29
lines changed

2 files changed

+152
-29
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,6 @@ private void poisonAck(MessageDispatch md, String cause) throws JMSException {
549549
private boolean redeliveryExceeded(MessageDispatch md) {
550550
try {
551551
return session.getTransacted()
552-
&& !this.info.isBrowser()
553552
&& redeliveryPolicy != null
554553
&& redeliveryPolicy.isPreDispatchCheck()
555554
&& redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java

Lines changed: 152 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertTrue;
21+
import static org.junit.Assert.fail;
2122

2223
import java.io.IOException;
2324
import java.net.URI;
2425
import java.util.Enumeration;
26+
import java.util.Set;
27+
import java.util.concurrent.atomic.AtomicInteger;
2528

2629
import jakarta.jms.Connection;
2730
import jakarta.jms.JMSException;
@@ -35,10 +38,15 @@
3538
import org.apache.activemq.broker.BrokerService;
3639
import org.apache.activemq.broker.TransportConnector;
3740
import org.apache.activemq.broker.jmx.DestinationView;
41+
import org.apache.activemq.broker.region.Destination;
42+
import org.apache.activemq.broker.region.Queue;
43+
import org.apache.activemq.broker.region.QueueMessageReference;
3844
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
3945
import org.apache.activemq.broker.region.policy.PolicyEntry;
4046
import org.apache.activemq.broker.region.policy.PolicyMap;
47+
import org.apache.activemq.command.ActiveMQMessage;
4148
import org.apache.activemq.command.ActiveMQQueue;
49+
import org.apache.qpid.proton.InterruptException;
4250
import org.junit.After;
4351
import org.junit.Before;
4452
import org.junit.Test;
@@ -69,9 +77,10 @@ public void startBroker() throws Exception {
6977

7078
connectUri = connector.getConnectUri();
7179
factory = new ActiveMQConnectionFactory(connectUri);
72-
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(1l);
73-
factory.getRedeliveryPolicy().setRedeliveryDelay(1l);
74-
factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(1l);
80+
factory.setWatchTopicAdvisories(false);
81+
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0l);
82+
factory.getRedeliveryPolicy().setRedeliveryDelay(0l);
83+
factory.getRedeliveryPolicy().setMaximumRedeliveryDelay(0l);
7584
}
7685

7786
public BrokerService createBroker() throws IOException {
@@ -230,53 +239,168 @@ public void testBrowseRedeliveryMaxTransacted() throws Exception {
230239
individualDeadLetterStrategy.setQueueSuffix(".dlq");
231240
individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
232241
broker.getDestinationPolicy().getDefaultEntry().setDeadLetterStrategy(individualDeadLetterStrategy);
242+
broker.getDestinationPolicy().getDefaultEntry().setPersistJMSRedelivered(true);
233243

234-
int messageToSend = 1;
244+
String messageId = null;
235245

236246
String queueName = "browse.redeliverd.tx";
237247
String dlqQueueName = "browse.redeliverd.tx.dlq";
238-
ActiveMQQueue queue = new ActiveMQQueue(queueName);
239-
ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName);
248+
String dlqDlqQueueName = "browse.redeliverd.tx.dlq.dlq";
249+
250+
ActiveMQQueue queue = new ActiveMQQueue(queueName + "?consumer.prefetchSize=0");
251+
ActiveMQQueue queueDLQ = new ActiveMQQueue(dlqQueueName + "?consumer.prefetchSize=0");
252+
ActiveMQQueue queueDLQDLQ = new ActiveMQQueue(dlqDlqQueueName);
253+
254+
broker.getAdminView().addQueue(queueName);
255+
broker.getAdminView().addQueue(dlqQueueName);
256+
257+
DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName);
258+
DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName);
259+
260+
verifyQueueStats(0l, 0l, 0l, dlqQueueView);
261+
verifyQueueStats(0l, 0l, 0l, queueView);
240262

241263
Connection connection = factory.createConnection();
242264
connection.start();
243265
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
244266
MessageProducer producer = session.createProducer(queue);
245267

246-
for( int i=0; i < messageToSend; i++ ) {
247-
producer.send(session.createTextMessage("Hello world!"));
248-
}
268+
Message sendMessage = session.createTextMessage("Hello world!");
269+
producer.send(sendMessage);
270+
messageId = sendMessage.getJMSMessageID();
249271
session.commit();
272+
producer.close();
250273

251-
//Consume one message to free memory and allow the cursor to pageIn messages
274+
verifyQueueStats(0l, 0l, 0l, dlqQueueView);
275+
verifyQueueStats(1l, 0l, 1l, queueView);
276+
277+
// Redeliver message to DLQ
252278
Message message = null;
253279
MessageConsumer consumer = session.createConsumer(queue);
280+
int rollbackCount = 0;
254281
do {
255-
message = consumer.receive(5000l);
282+
message = consumer.receive(2000l);
256283
if(message != null) {
257284
session.rollback();
285+
rollbackCount++;
258286
}
259287
} while (message != null);
260288

261-
DestinationView queueView = broker.getAdminView().getBroker().getQueueView(queueName);
262-
DestinationView dlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqQueueName);
263-
assertEquals(Long.valueOf(1), Long.valueOf(queueView.getEnqueueCount()));
264-
assertEquals(Long.valueOf(1), Long.valueOf(queueView.getDequeueCount()));
265-
assertEquals(Long.valueOf(0), Long.valueOf(queueView.getQueueSize()));
289+
assertEquals(Integer.valueOf(7), Integer.valueOf(rollbackCount));
290+
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
291+
verifyQueueStats(1l, 1l, 0l, queueView);
266292

267-
assertEquals(Long.valueOf(1), Long.valueOf(dlqQueueView.getEnqueueCount()));
268-
assertEquals(Long.valueOf(0), Long.valueOf(dlqQueueView.getDequeueCount()));
269-
assertEquals(Long.valueOf(1), Long.valueOf(dlqQueueView.getQueueSize()));
293+
session.commit();
294+
consumer.close();
295+
296+
// Increment redelivery counter on the message in the DLQ
297+
// Close the consumer to force broker to dispatch
298+
Message messageDLQ = null;
299+
MessageConsumer consumerDLQ = session.createConsumer(queueDLQ);
300+
int dlqRollbackCount = 0;
301+
int dlqRollbackCountLimit = 5;
302+
do {
303+
messageDLQ = consumerDLQ.receive(2000l);
304+
if(messageDLQ != null) {
305+
session.rollback();
306+
session.close();
307+
consumerDLQ.close();
308+
session = connection.createSession(true, Session.SESSION_TRANSACTED);
309+
consumerDLQ = session.createConsumer(queueDLQ);
310+
dlqRollbackCount++;
311+
}
312+
} while (messageDLQ != null && dlqRollbackCount < dlqRollbackCountLimit);
313+
session.commit();
314+
consumerDLQ.close();
315+
316+
// Browse in tx mode works when we are at the edge of maxRedeliveries
317+
// aka browse does not increment redeliverCounter as expected
318+
Queue brokerQueueDLQ = resolveQueue(broker, queueDLQ);
319+
320+
for(int i=0; i<16; i++) {
321+
QueueBrowser browser = session.createBrowser(queueDLQ);
322+
Enumeration<?> enumeration = browser.getEnumeration();
323+
ActiveMQMessage activemqMessage = null;
324+
int received = 0;
325+
while (enumeration.hasMoreElements()) {
326+
activemqMessage = (ActiveMQMessage)enumeration.nextElement();
327+
received++;
328+
}
329+
browser.close();
330+
assertEquals(Integer.valueOf(1), Integer.valueOf(received));
331+
assertEquals(Integer.valueOf(6), Integer.valueOf(activemqMessage.getRedeliveryCounter()));
270332

271-
QueueBrowser browser = session.createBrowser(queueDLQ);
272-
Enumeration<?> enumeration = browser.getEnumeration();
273-
int received = 0;
274-
while (enumeration.hasMoreElements()) {
275-
Message m = (Message) enumeration.nextElement();
276-
received++;
277-
LOG.info("Browsed session tx dlq message " + received + ": " + m.getJMSMessageID());
333+
// Confirm broker-side redeliveryCounter
334+
QueueMessageReference queueMessageReference = brokerQueueDLQ.getMessage(messageId);
335+
assertEquals(Integer.valueOf(6), Integer.valueOf(queueMessageReference.getRedeliveryCounter()));
278336
}
279-
browser.close();
280-
assertEquals(Integer.valueOf(1), Integer.valueOf(received));
337+
338+
session.close();
339+
connection.close();
340+
341+
// Change redelivery max and the browser will fail
342+
factory.getRedeliveryPolicy().setMaximumRedeliveries(3);
343+
final Connection browseConnection = factory.createConnection();
344+
browseConnection.start();
345+
346+
final AtomicInteger browseCounter = new AtomicInteger(0);
347+
final AtomicInteger jmsExceptionCounter = new AtomicInteger(0);
348+
349+
final Session browseSession = browseConnection.createSession(true, Session.SESSION_TRANSACTED);
350+
351+
Thread browseThread = new Thread() {
352+
public void run() {
353+
354+
QueueBrowser browser = null;
355+
try {
356+
browser = browseSession.createBrowser(queueDLQ);
357+
Enumeration<?> enumeration = browser.getEnumeration();
358+
while (enumeration.hasMoreElements()) {
359+
Message message = (Message)enumeration.nextElement();
360+
if(message != null) {
361+
browseCounter.incrementAndGet();
362+
}
363+
}
364+
} catch (JMSException e) {
365+
jmsExceptionCounter.incrementAndGet();
366+
} catch (InterruptException ie) {
367+
if(browser != null) { try { browser.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
368+
if(browseSession != null) { try { browseSession.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
369+
if(browseConnection != null) { try { browseConnection.close(); } catch (JMSException e) { jmsExceptionCounter.incrementAndGet(); } }
370+
Thread.currentThread().interrupt();
371+
}
372+
}
373+
};
374+
browseThread.start();
375+
Thread.sleep(2000l);
376+
browseThread.interrupt();
377+
378+
assertEquals(Integer.valueOf(0), Integer.valueOf(browseCounter.get()));
379+
assertEquals(Integer.valueOf(0), Integer.valueOf(jmsExceptionCounter.get()));
380+
381+
// ActiveMQConsumer sends a poison ack, messages gets moved to .dlq.dlq AND remains on the .dlq
382+
DestinationView dlqDlqQueueView = broker.getAdminView().getBroker().getQueueView(dlqDlqQueueName);
383+
verifyQueueStats(1l, 1l, 0l, queueView);
384+
verifyQueueStats(1l, 0l, 1l, dlqQueueView);
385+
verifyQueueStats(1l, 0l, 1l, dlqDlqQueueView);
386+
}
387+
388+
protected static void verifyQueueStats(long enqueueCount, long dequeueCount, long queueSize, DestinationView queueView) {
389+
assertEquals(Long.valueOf(enqueueCount), Long.valueOf(queueView.getEnqueueCount()));
390+
assertEquals(Long.valueOf(dequeueCount), Long.valueOf(queueView.getDequeueCount()));
391+
assertEquals(Long.valueOf(queueSize), Long.valueOf(queueView.getQueueSize()));
392+
}
393+
394+
protected static Queue resolveQueue(BrokerService brokerService, ActiveMQQueue activemqQueue) throws Exception {
395+
Set<Destination> destinations = brokerService.getBroker().getDestinations(activemqQueue);
396+
if(destinations == null || destinations.isEmpty()) {
397+
return null;
398+
}
399+
400+
if(destinations.size() > 1) {
401+
fail("Expected one-and-only one queue for: " + activemqQueue);
402+
}
403+
404+
return (Queue)destinations.iterator().next();
281405
}
282406
}

0 commit comments

Comments
 (0)