Skip to content

Commit

Permalink
This is PR rh-messaging#277
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 29, 2019
2 parents 35d5d76 + d4b3bf2 commit ff41fa6
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extr
* @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
*/
public MessageImpl getProtonMessage() {
ensureMessageDataScanned();
ensureDataIsValid();
if (data == null) {
throw new NullPointerException("Data is not initialized");
}
ensureScanning();

MessageImpl protonMessage = null;
if (data != null) {
Expand All @@ -228,20 +230,23 @@ public MessageImpl getProtonMessage() {
* @return a copy of the Message Header if one exists or null if none present.
*/
public Header getHeader() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(headerPosition, Header.class);
}

private void ensureScanning() {
ensureDataIsValid();
ensureMessageDataScanned();
}

/**
* Returns a copy of the MessageAnnotations in the message if present or null. Changes to the
* returned DeliveryAnnotations instance do not affect the original Message.
*
* @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
*/
public DeliveryAnnotations getDeliveryAnnotations() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
}

Expand All @@ -267,8 +272,7 @@ public void setDeliveryAnnotationsForSendBuffer(DeliveryAnnotations deliveryAnno
* @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
*/
public MessageAnnotations getMessageAnnotations() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
}

Expand All @@ -279,8 +283,7 @@ public MessageAnnotations getMessageAnnotations() {
* @return a copy of the Message Properties if one exists or null if none present.
*/
public Properties getProperties() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(propertiesPosition, Properties.class);
}

Expand All @@ -291,8 +294,7 @@ public Properties getProperties() {
* @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
*/
public ApplicationProperties getApplicationProperties() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
}

Expand All @@ -310,8 +312,7 @@ public String toDebugString() {
* @return the Section that makes up the body of this message.
*/
public Section getBody() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();

// We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
// There could also be a Footer and no body so this will prevent a faulty return type in case
Expand All @@ -327,8 +328,7 @@ public Section getBody() {
* @return the Footer that was encoded into this AMQP Message.
*/
public Footer getFooter() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}

Expand Down Expand Up @@ -445,11 +445,11 @@ private void setMessageAnnotation(Symbol annotation, Object value) {
private synchronized void ensureMessageDataScanned() {
if (!messageDataScanned) {
scanMessageData();
messageDataScanned = true;
}
}

private synchronized void scanMessageData() {
this.messageDataScanned = true;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());

Expand Down Expand Up @@ -781,21 +781,17 @@ public void reencode() {

encodeMessage();
scanMessageData();

messageDataScanned = true;
modified = false;
}

private synchronized void ensureDataIsValid() {
assert data != null;

if (modified) {
encodeMessage();
modified = false;
}
}

private synchronized void encodeMessage() {
this.modified = false;
this.messageDataScanned = false;
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
EncoderImpl encoder = TLSEncode.getEncoder();
Expand Down Expand Up @@ -1542,14 +1538,15 @@ public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleS

@Override
public String toString() {
return "AMQPMessage [durable=" + isDurable() +
/* return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
", applicationProperties=" + applicationProperties +
", properties=" + properties +
", extraProperties = " + getExtraProperties() +
"]";
"]"; */
return super.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,11 @@ public void testExpandPropertiesAndConvert() throws Exception {
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));

for (int i = 0; i < 100; i++) {
encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
encodedMessage.putStringProperty("another" + i, "value" + i);
encodedMessage.messageChanged();
encodedMessage.reencode();
AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String)value.getValue());
AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String) value.getValue());
ICoreMessage coreMessage = encodedMessage.toCore();
if (logger.isDebugEnabled()) {
logger.debug("Converted message: " + coreMessage);
Expand All @@ -311,7 +311,43 @@ public void testExpandPropertiesAndConvert() throws Exception {
encodedMessage = encodeAndCreateAMQPMessage(message);

}
}
}

@Test
public void testExpandNoReencode() throws Exception {

Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
properties.getValue().put("hello", "hello");
MessageImpl message = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
message.setMessageAnnotations(annotations);
message.setApplicationProperties(properties);

String text = "someText";
message.setBody(new AmqpValue(text));

AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = new TypedProperties();
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));

for (int i = 0; i < 100; i++) {
encodedMessage.setMessageID(333L);
if (i % 3 == 0) {
encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
} else {
encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
}
encodedMessage.putStringProperty("another " + i, "value " + i);
encodedMessage.messageChanged();
if (i % 2 == 0) {
encodedMessage.setAddress("THIS-IS-A-BIG-THIS-IS-A-BIG-ADDRESS-THIS-IS-A-BIG-ADDRESS-RIGHT");
} else {
encodedMessage.setAddress("A"); // small address
}
encodedMessage.messageChanged();
ICoreMessage coreMessage = encodedMessage.toCore();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ public void route(final Message message, final RoutingContext context) throws Ex

copy.setExpiration(message.getExpiration());

copy.reencode();

switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
Expand All @@ -126,7 +124,8 @@ public void route(final Message message, final RoutingContext context) throws Ex
copy = transformer.transform(copy);
}

copy.messageChanged();
// We call reencode at the end only, in a single call.
copy.reencode();
} else {
copy = message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.divert;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

Expand All @@ -34,6 +41,7 @@
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.api.core.RoutingType;

Expand All @@ -42,6 +50,7 @@
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -128,6 +137,85 @@ public void testSingleNonExclusiveDivert() throws Exception {
Assert.assertNull(consumer2.receiveImmediate());
}

@Test
public void testCrossProtocol() throws Exception {
final String testForConvert = "testConvert";

final String testAddress = "testAddress";

final String forwardAddress = "forwardAddress";

DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).
setRoutingType(ComponentConfigurationRoutingType.ANYCAST);

Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);

ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));

server.start();

final SimpleString queueName1 = SimpleString.toSimpleString(testAddress);

final SimpleString queueName2 = SimpleString.toSimpleString(forwardAddress);

{ // this is setting up the queues
ServerLocator locator = createInVMNonHALocator();

ClientSessionFactory sf = createSessionFactory(locator);

ClientSession session = sf.createSession(false, true, true);

session.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName1, null, true);

session.createQueue(new SimpleString(testForConvert), RoutingType.ANYCAST, SimpleString.toSimpleString(testForConvert), null, true);

session.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName2, null, true);
}

ConnectionFactory coreCF = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
Connection coreConnection = coreCF.createConnection();
Session coreSession = coreConnection.createSession(Session.AUTO_ACKNOWLEDGE);
MessageProducer producerCore = coreSession.createProducer(coreSession.createQueue(testForConvert));

for (int i = 0; i < 10; i++) {
TextMessage textMessage = coreSession.createTextMessage("text" + i);
//if (i % 2 == 0) textMessage.setIntProperty("key", i);
producerCore.send(textMessage);
}

producerCore.close();

ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");

Connection amqpConnection = amqpCF.createConnection();
Session amqpSession = amqpConnection.createSession(Session.AUTO_ACKNOWLEDGE);
Queue amqpQueue = amqpSession.createQueue(testAddress);
MessageProducer producer = amqpSession.createProducer(amqpQueue);
MessageConsumer consumerFromConvert = amqpSession.createConsumer(amqpSession.createQueue(testForConvert));
amqpConnection.start();

for (int i = 0; i < 10; i++) {
javax.jms.Message received = consumerFromConvert.receive(5000);
Assert.assertNotNull(received);
producer.send(received);
}


Queue outQueue = coreSession.createQueue(queueName2.toString());
MessageConsumer consumer = coreSession.createConsumer(outQueue);
coreConnection.start();

for (int i = 0; i < 10; i++) {
TextMessage textMessage = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(textMessage);
Assert.assertEquals("text" + i, textMessage.getText());
//if (i % 2 == 0) Assert.assertEquals(i, textMessage.getIntProperty("key"));
}

Assert.assertNull(consumer.receiveNoWait());

}

@Test
public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
final String testAddress = "testAddress";
Expand Down

0 comments on commit ff41fa6

Please sign in to comment.