Skip to content

Commit

Permalink
Do not mutate message object on send
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Jun 19, 2024
1 parent 74b0196 commit ff46739
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 52 deletions.
23 changes: 8 additions & 15 deletions src/ArtemisNetCoreClient/AnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,16 @@ public ValueTask DisposeAsync()

public void SendMessage(string address, RoutingType? routingType, Message message)
{
message.Address = address;
if (routingType != null)
{
message.RoutingType = routingType;
}

session.SendMessage(message: message, producerId: ProducerId);
session.SendMessage(message: message, address: address, routingType: routingType, producerId: ProducerId);
}

public async ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default)
public Task SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default)
{
message.Address = address;
if (routingType != null)
{
message.RoutingType = routingType;
}

await session.SendMessageAsync(message: message, producerId: ProducerId, cancellationToken: cancellationToken);
return session.SendMessageAsync(message: message,
address: address,
routingType: routingType,
producerId: ProducerId,
cancellationToken: cancellationToken
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static void SendMessage(this IAnonymousProducer anonymousProducer, string
/// <param name="address">The address to which the message should be sent.</param>
/// <param name="message">The message to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static ValueTask SendMessageAsync(this IAnonymousProducer anonymousProducer, string address, Message message, CancellationToken cancellationToken = default)
public static Task SendMessageAsync(this IAnonymousProducer anonymousProducer, string address, Message message, CancellationToken cancellationToken = default)
{
return anonymousProducer.SendMessageAsync(address, routingType: null, message, cancellationToken);
}
Expand Down
20 changes: 11 additions & 9 deletions src/ArtemisNetCoreClient/Framing/Outgoing/SessionSendMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ namespace ActiveMQ.Artemis.Core.Client.Framing.Outgoing;
public required bool RequiresResponse { get; init; }
public required long CorrelationId { get; init; }
public required int ProducerId { get; init; }

public required string Address { get; init; }
public required RoutingType? RoutingType { get; init; }

public int GetRequiredBufferSize()
{
int byteCount = 0;
Expand All @@ -20,7 +22,7 @@ public int GetRequiredBufferSize()
byteCount += sizeof(int); // Message body length
byteCount += Message.Body.Length; // Actual message body length
byteCount += sizeof(long); // MessageId
byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(Message.Address);
byteCount += ArtemisBinaryConverter.GetNullableSimpleStringByteCount(Address);
byteCount += ArtemisBinaryConverter.GetNullableGuidByteCount(Message.UserId);
byteCount += sizeof(byte); // Type
byteCount += sizeof(bool); // Durable
Expand All @@ -29,7 +31,7 @@ public int GetRequiredBufferSize()
byteCount += sizeof(byte); // Priority

byteCount += sizeof(byte); // Properties nullability
if (Message.Properties?.Count > 0 || Message.RoutingType.HasValue)
if (Message.Properties?.Count > 0 || RoutingType.HasValue)
{
byteCount += sizeof(int); // Properties count
if (Message.Properties != null)
Expand All @@ -41,10 +43,10 @@ public int GetRequiredBufferSize()
}
}

if (Message.RoutingType.HasValue)
if (RoutingType.HasValue)
{
byteCount += ArtemisBinaryConverter.GetSimpleStringByteCount(MessageHeaders.RoutingType);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) Message.RoutingType);
byteCount += ArtemisBinaryConverter.GetNullableObjectByteCount((byte) RoutingType);
}
}

Expand Down Expand Up @@ -93,7 +95,7 @@ private int EncodeHeaders(Span<byte> buffer)
var offset = 0;

offset += ArtemisBinaryConverter.WriteInt64(ref buffer.GetReference(), Message.MessageId);
offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), Message.Address);
offset += ArtemisBinaryConverter.WriteNullableSimpleString(ref buffer.GetOffset(offset), Address);
offset += ArtemisBinaryConverter.WriteNullableGuid(ref buffer.GetOffset(offset), Message.UserId);
offset += ArtemisBinaryConverter.WriteByte(ref buffer.GetOffset(offset), Message.Type);
offset += ArtemisBinaryConverter.WriteBool(ref buffer.GetOffset(offset), Message.Durable);
Expand All @@ -110,7 +112,7 @@ private int EncodeProperties(Span<byte> buffer)
var offset = 0;

var propertiesCount = Message.Properties?.Count ?? 0;
if (Message.RoutingType.HasValue)
if (RoutingType.HasValue)
{
propertiesCount++;
}
Expand All @@ -129,11 +131,11 @@ private int EncodeProperties(Span<byte> buffer)
}
}

if (Message.RoutingType.HasValue)
if (RoutingType.HasValue)
{
// TODO: Maybe we can cache this string?
offset += ArtemisBinaryConverter.WriteSimpleString(ref buffer.GetOffset(offset), MessageHeaders.RoutingType);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) Message.RoutingType.Value);
offset += ArtemisBinaryConverter.WriteNullableObject(ref buffer.GetOffset(offset), (byte) RoutingType.Value);
}
}
else
Expand Down
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/IAnonymousProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ public interface IAnonymousProducer : IAsyncDisposable
/// </param>
/// <param name="message">The message to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
ValueTask SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default);
Task SendMessageAsync(string address, RoutingType? routingType, Message message, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IProducer : IAsyncDisposable
/// confirms the persistence of the message. For non-durable messages, the completion of the task
/// indicates that the broker has received the message.
/// </remarks>
ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken = default);
Task SendMessageAsync(Message message, CancellationToken cancellationToken = default);
}
7 changes: 0 additions & 7 deletions src/ArtemisNetCoreClient/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ namespace ActiveMQ.Artemis.Core.Client;
public class Message
{
public long MessageId { get; set; }

internal string? Address { get; set; }

public Guid? UserId { get; set; }

Expand Down Expand Up @@ -36,11 +34,6 @@ public class Message

// TODO: Enum?
public byte Priority { get; set; }

/// <summary>
/// Gets or sets the Routing Type for this message. Ensures that this message is only routed to queues with matching routing type.
/// </summary>
internal RoutingType? RoutingType { get; set; }

/// <summary>
/// The message properties
Expand Down
22 changes: 8 additions & 14 deletions src/ArtemisNetCoreClient/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@ public ValueTask DisposeAsync()

public void SendMessage(Message message)
{
message.Address = Address;
if (RoutingType.HasValue)
{
message.RoutingType = RoutingType.Value;
}
session.SendMessage(message: message, producerId: ProducerId);
session.SendMessage(message: message, address: Address, routingType: RoutingType, producerId: ProducerId);
}

public async ValueTask SendMessageAsync(Message message, CancellationToken cancellationToken)
public Task SendMessageAsync(Message message, CancellationToken cancellationToken)
{
message.Address = Address;
if (RoutingType.HasValue)
{
message.RoutingType = RoutingType.Value;
}

await session.SendMessageAsync(message: message, producerId: ProducerId, cancellationToken: cancellationToken);
return session.SendMessageAsync(message: message,
address: Address,
routingType: RoutingType,
producerId: ProducerId,
cancellationToken: cancellationToken
);
}
}
8 changes: 6 additions & 2 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,26 +373,30 @@ internal ValueTask RemoveProducerAsync(int producerId)
return ValueTask.CompletedTask;
}

internal void SendMessage(Message message, int producerId)
internal void SendMessage(Message message, string address, RoutingType? routingType, int producerId)
{
var request = new SessionSendMessage
{
Message = message,
ProducerId = producerId,
RequiresResponse = false,
CorrelationId = -1,
Address = address,
RoutingType = routingType
};
connection.Send(request, ChannelId);
}

internal async ValueTask SendMessageAsync(Message message, int producerId, CancellationToken cancellationToken)
internal async Task SendMessageAsync(Message message, string address, RoutingType? routingType, int producerId, CancellationToken cancellationToken)
{
var request = new SessionSendMessage
{
Message = message,
ProducerId = producerId,
RequiresResponse = true,
CorrelationId = _correlationIdGenerator.GenerateId(),
Address = address,
RoutingType = routingType
};
try
{
Expand Down
4 changes: 2 additions & 2 deletions test/ArtemisNetCoreClient.Tests/MessageFlowSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public async Task Should_receive_messages_in_the_same_order_as_they_were_sent()
Address = addressName
}, testFixture.CancellationToken);
var tasks = new List<ValueTask>(numberOfMessages);
var tasks = new List<Task>(numberOfMessages);
for (int i = 0; i < numberOfMessages; i++)
{
tasks.Add(producer.SendMessageAsync(new Message
Expand All @@ -58,7 +58,7 @@ public async Task Should_receive_messages_in_the_same_order_as_they_were_sent()
}, testFixture.CancellationToken));
}
await Task.WhenAll(tasks.Select(t => t.AsTask()));
await Task.WhenAll(tasks);
});

var messages = await consumedMessagesTask.WaitAsync(testFixture.CancellationToken);
Expand Down

0 comments on commit ff46739

Please sign in to comment.