-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
333 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Throughput_ArtemisNetCoreClient; | ||
|
||
public class Consumer : IAsyncDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IConsumer _consumer; | ||
|
||
private Consumer(IConnection connection, ISession session, IConsumer consumer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_consumer = consumer; | ||
} | ||
|
||
public static async Task<Consumer> CreateAsync(Endpoint endpoint) | ||
{ | ||
var connectionFactory = new ConnectionFactory(); | ||
var connection = await connectionFactory.CreateAsync(endpoint); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration { QueueName = "throughput" }); | ||
return new Consumer(connection, session, consumer); | ||
} | ||
|
||
public async Task StartConsumingAsync(int messages) | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var message = await _consumer.ReceiveMessageAsync(); | ||
|
||
// AMQP doesn't support waiting for the confirmation from the broker for message acknowledgment. | ||
// So if we want to compare apples to apples we need to use the fire-and-forget method of acknowledgment. | ||
_consumer.Acknowledge(message.MessageDelivery); | ||
} | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
{ | ||
await _consumer.DisposeAsync(); | ||
await _session.DisposeAsync(); | ||
await _connection.DisposeAsync(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
using System.Buffers; | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Throughput_ArtemisNetCoreClient; | ||
|
||
public class Producer : IAsyncDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IProducer _producer; | ||
private readonly Random _random; | ||
|
||
private Producer(IConnection connection, ISession session, IProducer producer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_producer = producer; | ||
_random = new Random(); | ||
} | ||
|
||
public static async Task<Producer> CreateAsync(Endpoint endpoint) | ||
{ | ||
var connectionFactory = new ConnectionFactory(); | ||
var connection = await connectionFactory.CreateAsync(endpoint); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var producer = await session.CreateProducerAsync(new ProducerConfiguration { Address = "throughput" }); | ||
return new Producer(connection, session, producer); | ||
} | ||
|
||
public Task SendMessagesAsync(int messages, int payloadSize) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var buffer = ArrayPool<byte>.Shared.Rent(payloadSize); | ||
try | ||
{ | ||
FillRandomData(buffer.AsSpan(0, payloadSize)); | ||
var message = new Message { Body = new ReadOnlyMemory<byte>(buffer, 0, payloadSize), }; | ||
await _producer.SendMessageAsync(message); | ||
} | ||
finally | ||
{ | ||
ArrayPool<byte>.Shared.Return(buffer); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private void FillRandomData(Span<byte> buffer) | ||
{ | ||
_random.NextBytes(buffer); | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
{ | ||
await _producer.DisposeAsync(); | ||
await _session.DisposeAsync(); | ||
await _connection.DisposeAsync(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
using System.Diagnostics; | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Throughput_ArtemisNetCoreClient; | ||
|
||
class Program | ||
{ | ||
static async Task Main(string[] args) | ||
{ | ||
var endpoint = new Endpoint | ||
{ | ||
Host = "localhost", | ||
Port = 61616, | ||
User = "artemis", | ||
Password = "artemis" | ||
}; | ||
|
||
var messages = 100_000; | ||
|
||
for (int i = 0; i < 10; i++) | ||
{ | ||
await using var producer = await Producer.CreateAsync(endpoint); | ||
|
||
var stopwatch = Stopwatch.StartNew(); | ||
await producer.SendMessagesAsync(messages: messages, payloadSize: 1024); | ||
stopwatch.Stop(); | ||
Console.WriteLine($"Sending throughput: {messages / stopwatch.Elapsed.TotalSeconds:F2} msgs/s"); | ||
|
||
await using var consumer = await Consumer.CreateAsync(endpoint); | ||
stopwatch.Restart(); | ||
await consumer.StartConsumingAsync(messages: messages); | ||
stopwatch.Stop(); | ||
Console.WriteLine($"Consuming throughput: {messages / stopwatch.Elapsed.TotalSeconds:F2} msgs/s"); | ||
} | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
Throughput_ArtemisNetCoreClient/Throughput_ArtemisNetCoreClient.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\src\ArtemisNetCoreClient\ArtemisNetCoreClient.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
using Apache.NMS; | ||
using Apache.NMS.AMQP; | ||
|
||
namespace Throughput_NMS_AMQP; | ||
|
||
public class Consumer : IDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IMessageConsumer _consumer; | ||
|
||
private Consumer(IConnection connection, ISession session, IMessageConsumer consumer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_consumer = consumer; | ||
} | ||
|
||
public static async Task<Consumer> CreateAsync(NmsConnectionFactory connectionFactory) | ||
{ | ||
var connection = await connectionFactory.CreateConnectionAsync(); | ||
var session = await connection.CreateSessionAsync(); | ||
await connection.StartAsync(); | ||
|
||
var consumer = await session.CreateConsumerAsync(await session.GetQueueAsync("throughput")); | ||
return new Consumer(connection, session, consumer); | ||
} | ||
|
||
public async Task StartConsumingAsync(int messages) | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var message = await _consumer.ReceiveAsync(); | ||
await message.AcknowledgeAsync(); | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_consumer.Dispose(); | ||
_session.Dispose(); | ||
_connection.Dispose(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
using Apache.NMS; | ||
using Apache.NMS.AMQP; | ||
|
||
namespace Throughput_NMS_AMQP; | ||
|
||
public class Producer : IDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IMessageProducer _producer; | ||
private readonly Random _random; | ||
|
||
private Producer(IConnection connection, ISession session, IMessageProducer producer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_producer = producer; | ||
_random = new Random(); | ||
} | ||
|
||
public static async Task<Producer> CreateAsync(NmsConnectionFactory connectionFactory) | ||
{ | ||
var connection = await connectionFactory.CreateConnectionAsync(); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var producer = await session.CreateProducerAsync(await session.GetQueueAsync("throughput")); | ||
producer.DeliveryMode = MsgDeliveryMode.Persistent; | ||
return new Producer(connection, session, producer); | ||
} | ||
|
||
public Task SendMessagesAsync(int messages, int payloadSize) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var pingMessage = await _producer.CreateBytesMessageAsync(GenerateRandomData(payloadSize)); | ||
await _producer.SendAsync(pingMessage); | ||
} | ||
}); | ||
} | ||
|
||
private byte[] GenerateRandomData(int size) | ||
{ | ||
byte[] data = new byte[size]; | ||
_random.NextBytes(data); | ||
return data; | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_producer.Dispose(); | ||
_session.Dispose(); | ||
_connection.Dispose(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
using System.Diagnostics; | ||
using Apache.NMS.AMQP; | ||
|
||
namespace Throughput_NMS_AMQP; | ||
|
||
class Program | ||
{ | ||
static async Task Main(string[] args) | ||
{ | ||
var connectionFactory = new NmsConnectionFactory | ||
{ | ||
UserName = "artemis", | ||
Password = "artemis" | ||
}; | ||
|
||
var messages = 100_000; | ||
|
||
for (int i = 0; i < 10; i++) | ||
{ | ||
using var producer = await Producer.CreateAsync(connectionFactory); | ||
|
||
var stopwatch = Stopwatch.StartNew(); | ||
await producer.SendMessagesAsync(messages: messages, payloadSize: 1024); | ||
stopwatch.Stop(); | ||
Console.WriteLine($"Sending throughput: {messages / stopwatch.Elapsed.TotalSeconds:F2} msgs/s"); | ||
|
||
using var consumer = await Consumer.CreateAsync(connectionFactory); | ||
stopwatch.Restart(); | ||
await consumer.StartConsumingAsync(messages: messages); | ||
stopwatch.Stop(); | ||
Console.WriteLine($"Consuming throughput: {messages / stopwatch.Elapsed.TotalSeconds:F2} msgs/s"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<RootNamespace>Throughput_NMS_AMQP</RootNamespace> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Apache.NMS.AMQP" Version="2.2.0" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters