Skip to content

Commit

Permalink
ARTEMIS-5032 Ensure AMQP message priority is honored after restart
Browse files Browse the repository at this point in the history
Ensure that on server restart the original priority value assigned to an
AMQP message is used when dispatching durable messages from the store.
The AMQP Header section is scanned if present and the priority value
is recovered in an efficient manner.
  • Loading branch information
tabish121 authored and clebertsuconic committed Sep 5, 2024
1 parent 50c2055 commit ec8026e
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
}
standardMessage.setMessageAnnotations(messageAnnotations);
standardMessage.setMessageID(messageID);
standardMessage.setPriority(getPriority());

return standardMessage.toCore();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Expand Down Expand Up @@ -199,7 +201,6 @@ public ByteBuf getSavedEncodeBuffer() {
}

private void saveEncoding(ByteBuf buf) {

WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer();

TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
Expand Down Expand Up @@ -241,6 +242,11 @@ protected void readSavedEncoding(ByteBuf buf) {
encodedHeaderSize = buf.readInt();
header = (Header)TLSEncode.getDecoder().readObject();

// Recover message priority from saved encoding as we store that separately
if (header != null && header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().byteValue(), MAX_MESSAGE_PRIORITY);
}

deliveryAnnotationsPosition = buf.readInt();
encodedDeliveryAnnotationsSize = buf.readInt();

Expand All @@ -264,8 +270,6 @@ protected void readSavedEncoding(ByteBuf buf) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
}


} finally {
TLSEncode.getDecoder().setBuffer(oldBuffer);
}
Expand Down Expand Up @@ -322,7 +326,6 @@ public ReadableBuffer getData() {
}

public void parseHeader(ReadableBuffer buffer) {

DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(buffer);

Expand All @@ -335,6 +338,9 @@ public void parseHeader(ReadableBuffer buffer) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
}
if (header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
}
}
} finally {
decoder.setBuffer(null);
Expand Down Expand Up @@ -489,6 +495,9 @@ public Message copy() {
newMessage.setParentRef(this);
newMessage.setFileDurable(this.isDurable());
newMessage.reloadExpiration(this.expiration);
if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) {
newMessage.setPriority(priority);
}
return newMessage;
}

Expand All @@ -502,6 +511,9 @@ public Message copy(final long newID, boolean isDLQOrExpiry) {
try {
AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager);
copy.setDurable(this.isDurable());
if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) {
copy.setPriority(priority);
}

final AtomicInteger place = new AtomicInteger(0);
ByteBuf bufferNewHeader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private static void checkCode(int code) {
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
protected byte priority = DEFAULT_MESSAGE_PRIORITY;

protected boolean isPaged;

Expand Down Expand Up @@ -678,6 +679,7 @@ protected synchronized void resetMessageData() {
if (!expirationReload) {
expiration = 0;
}
priority = DEFAULT_MESSAGE_PRIORITY;
encodedHeaderSize = 0;
memoryEstimate = -1;
originalEstimate = -1;
Expand Down Expand Up @@ -713,6 +715,9 @@ protected synchronized void scanMessageData(ReadableBuffer data) {
expiration = System.currentTimeMillis() + header.getTtl().longValue();
}
}
if (header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
}
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
deliveryAnnotationsPosition = constructorPos;
this.deliveryAnnotations = (DeliveryAnnotations) constructor.readValue();
Expand Down Expand Up @@ -1297,19 +1302,21 @@ public final org.apache.activemq.artemis.api.core.Message setTimestamp(long time

@Override
public final byte getPriority() {
if (header != null && header.getPriority() != null) {
return (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
} else {
return DEFAULT_MESSAGE_PRIORITY;
}
return priority;
}

@Override
public final org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
// Internally we can only deal with a limited range, but the AMQP value is allowed
// to span the full range of the unsigned byte so we store what was actually set in
// the AMQP Header section.
this.priority = (byte) Math.min(priority & 0xff, MAX_MESSAGE_PRIORITY);

if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority));

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecodeException;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
import org.apache.qpid.proton.codec.WritableBuffer;

// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
Expand Down Expand Up @@ -235,10 +239,87 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool

// Message state is now that the underlying buffer is loaded, but the contents not yet scanned
resetMessageData();
recoverHeaderDataFromEncoding();

modified = false;
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
}

private void recoverHeaderDataFromEncoding() {
final DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data);

try {
// At one point the broker could write the header and delivery annotations out of order
// which means a full scan is required for maximum compatibility with that older data
// where delivery annotations could be found ahead of the Header in the encoding.
//
// We manually extract the priority from the Header encoding if present to ensure we do
// not create any unneeded GC overhead during load from storage. We don't directly store
// other values from the header except for a value that is computed based on TTL and or
// absolute expiration time in the Properties section, but that value is stored in the
// data of the persisted message.
for (int section = 0; section < 2 && data.hasRemaining(); section++) {
final TypeConstructor<?> constructor = decoder.readConstructor();

if (Header.class.equals(constructor.getTypeClass())) {
final byte typeCode = data.get();

@SuppressWarnings("unused")
int size = 0;
int count = 0;

switch (typeCode) {
case EncodingCodes.LIST0:
break;
case EncodingCodes.LIST8:
size = data.get() & 0xff;
count = data.get() & 0xff;
break;
case EncodingCodes.LIST32:
size = data.getInt();
count = data.getInt();
break;
default:
throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
}

// Priority is stored in the second slot of the Header list encoding if present
if (count >= 2) {
decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere.

final byte encodingCode = data.get();
final int priority;

switch (encodingCode) {
case EncodingCodes.UBYTE:
priority = data.get() & 0xff;
break;
case EncodingCodes.NULL:
priority = DEFAULT_MESSAGE_PRIORITY;
break;
default:
throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode));
}

// Scaled here so do not call setPriority as that will store the set value in the AMQP header
// and we don't want to create that Header instance at this stage.
this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY);
}

return;
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
constructor.skipValue();
} else {
return;
}
}
} finally {
decoder.setBuffer(null);
data.rewind(); // Ensure next scan start at the beginning.
}
}

@Override
public long getPersistentSize() throws ActiveMQException {
return getEncodeSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
public class AMQPPersisterTest {

protected Message createMessage(SimpleString address, int msgId, byte[] content) {
final MessageImpl protonMessage = createProtonMessage(address.toString(), content);
return createMessage(address, (byte) AMQPMessage.MAX_MESSAGE_PRIORITY, msgId, content);
}

protected Message createMessage(SimpleString address, byte priority, int msgId, byte[] content) {
final MessageImpl protonMessage = createProtonMessage(address.toString(), priority, content);
final AMQPStandardMessage msg = encodeAndDecodeMessage(protonMessage, content.length);
msg.setAddress(address);
msg.setMessageID(msgId);
Expand All @@ -62,12 +66,12 @@ private AMQPStandardMessage encodeAndDecodeMessage(MessageImpl message, int expe
return new AMQPStandardMessage(0, bytes, null);
}

private MessageImpl createProtonMessage(String address, byte[] content) {
private MessageImpl createProtonMessage(String address, byte priority, byte[] content) {
MessageImpl message = (MessageImpl) Proton.message();

Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 9));
header.setPriority(UnsignedByte.valueOf(priority));

Properties properties = new Properties();
properties.setCreationTime(new Date(System.currentTimeMillis()));
Expand All @@ -88,10 +92,8 @@ private MessageImpl createProtonMessage(String address, byte[] content) {
return message;
}


@Test
public void testEncodeSize() throws Exception {

Message message = createMessage(SimpleString.of("Test"), 1, new byte[10]);

MessagePersister persister = AMQPMessagePersisterV3.getInstance();
Expand All @@ -100,7 +102,36 @@ public void testEncodeSize() throws Exception {
persister.encode(buffer, message);

assertEquals(persister.getEncodeSize(message), buffer.writerIndex());
}

@Test
public void testV1PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersister.getInstance());
}

@Test
public void testV2PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersisterV2.getInstance());
}

@Test
public void testV3PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersisterV3.getInstance());
}

private void doTestPersisterRecoversPriority(MessagePersister persister) {
for (byte priority = 0; priority <= AMQPMessage.MAX_MESSAGE_PRIORITY; ++priority) {
final Message message = createMessage(SimpleString.of("Test"), priority, 1, new byte[10]);

final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);

persister.encode(buffer, message);

assertEquals(persister.getID(), buffer.readByte());

final Message decoded = persister.decode(buffer, message, null);

assertEquals(priority, decoded.getPriority());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private void encodeAndSend(Message message, Delivery delivery) throws IOExceptio

@Override
public void processFlowUpdates(AmqpConnection connection) throws IOException {
logger.trace("Sender {} flow update, credit = {}", getEndpoint().getCredit());
logger.trace("Sender {} flow update, credit = {}", senderId, getEndpoint().getCredit());

doCreditInspection();
}
Expand Down
Loading

0 comments on commit ec8026e

Please sign in to comment.