-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
370 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
using System.Diagnostics; | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Latency_ArtemisNetCoreClient; | ||
|
||
public class Consumer : IAsyncDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IConsumer _consumer; | ||
|
||
private Consumer(IConnection connection, ISession session, IConsumer consumer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_consumer = consumer; | ||
} | ||
|
||
public static async Task<Consumer> CreateAsync(Endpoint endpoint) | ||
{ | ||
var connectionFactory = new ConnectionFactory(); | ||
var connection = await connectionFactory.CreateAsync(endpoint); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var consumer = await session.CreateConsumerAsync(new ConsumerConfiguration { QueueName = "latency" }); | ||
return new Consumer(connection, session, consumer); | ||
} | ||
|
||
public Task<double[]> StartConsumingAsync(int messages) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
var latencies = new double[messages]; | ||
|
||
for (var i = 0; i < messages; i++) | ||
{ | ||
var message = await _consumer.ReceiveMessageAsync(); | ||
var receiveTimestamp = Stopwatch.GetTimestamp(); | ||
var sendTimestamp = (long) message.Properties["Timestamp"]!; | ||
latencies[i] = (receiveTimestamp - sendTimestamp) * 1_000_000.0 / Stopwatch.Frequency; | ||
|
||
// AMQP doesn't support waiting for the confirmation from the broker for message acknowledgment. | ||
// So if we want to compare apples to apples we need to use the fire-and-forget method of acknowledgment. | ||
_consumer.Acknowledge(message.MessageDelivery); | ||
} | ||
|
||
return latencies; | ||
}); | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
{ | ||
await _consumer.DisposeAsync(); | ||
await _session.DisposeAsync(); | ||
await _connection.DisposeAsync(); | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
benchmark/Latency_ArtemisNetCoreClient/Latency_ArtemisNetCoreClient.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\..\src\ArtemisNetCoreClient\ArtemisNetCoreClient.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
using System.Diagnostics; | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Latency_ArtemisNetCoreClient; | ||
|
||
public class Producer : IAsyncDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IProducer _producer; | ||
private readonly Random _random; | ||
|
||
private Producer(IConnection connection, ISession session, IProducer producer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_producer = producer; | ||
_random = new Random(); | ||
} | ||
|
||
public static async Task<Producer> CreateAsync(Endpoint endpoint) | ||
{ | ||
var connectionFactory = new ConnectionFactory(); | ||
var connection = await connectionFactory.CreateAsync(endpoint); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var producer = await session.CreateProducerAsync(new ProducerConfiguration { Address = "latency" }); | ||
return new Producer(connection, session, producer); | ||
} | ||
|
||
public Task SendMessagesAsync(int messages, int payloadSize) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var timestamp = Stopwatch.GetTimestamp(); // Get high precision timestamp | ||
var message = new Message | ||
{ | ||
Body = GenerateRandomData(payloadSize), | ||
Properties = new Dictionary<string, object?> | ||
{ | ||
["Timestamp"] = timestamp | ||
}, | ||
Durable = true | ||
}; | ||
await _producer.SendMessageAsync(message); | ||
} | ||
}); | ||
} | ||
|
||
private byte[] GenerateRandomData(int size) | ||
{ | ||
var data = new byte[size]; | ||
_random.NextBytes(data); | ||
return data; | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
{ | ||
await _producer.DisposeAsync(); | ||
await _session.DisposeAsync(); | ||
await _connection.DisposeAsync(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
using ActiveMQ.Artemis.Core.Client; | ||
|
||
namespace Latency_ArtemisNetCoreClient; | ||
|
||
public static class Program | ||
{ | ||
static async Task Main(string[] args) | ||
{ | ||
var endpoint = new Endpoint | ||
{ | ||
Host = "localhost", | ||
Port = 61616, | ||
User = "artemis", | ||
Password = "artemis" | ||
}; | ||
|
||
var messages = 10_000; | ||
|
||
for (int i = 0; i < 10; i++) | ||
{ | ||
await using var consumer = await Consumer.CreateAsync(endpoint); | ||
var startConsumingTask = consumer.StartConsumingAsync(messages: messages); | ||
|
||
await using var producer = await Producer.CreateAsync(endpoint); | ||
await producer.SendMessagesAsync(messages: messages, payloadSize: 1024); | ||
|
||
var latencies = await startConsumingTask; | ||
Console.WriteLine($"Latency: avg:{latencies.Average():F2}µs, min:{latencies.Min():F2}µs, max:{latencies.Max():F2}µs"); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
using System.Diagnostics; | ||
using Apache.NMS; | ||
using Apache.NMS.AMQP; | ||
|
||
namespace Latency_NMS.AMQP; | ||
|
||
public class Consumer : IDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IMessageConsumer _consumer; | ||
|
||
private Consumer(IConnection connection, ISession session, IMessageConsumer consumer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_consumer = consumer; | ||
} | ||
|
||
public static async Task<Consumer> CreateAsync(NmsConnectionFactory connectionFactory) | ||
{ | ||
var connection = await connectionFactory.CreateConnectionAsync(); | ||
var session = await connection.CreateSessionAsync(); | ||
await connection.StartAsync(); | ||
|
||
var consumer = await session.CreateConsumerAsync(await session.GetQueueAsync("latency")); | ||
return new Consumer(connection, session, consumer); | ||
} | ||
|
||
public Task<double[]> StartConsumingAsync(int messages) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
var latencies = new double[messages]; | ||
|
||
for (var i = 0; i < messages; i++) | ||
{ | ||
var message = await _consumer.ReceiveAsync(); | ||
var receiveTimestamp = Stopwatch.GetTimestamp(); | ||
var sendTimestamp = (long) message.Properties["Timestamp"]; | ||
latencies[i] = (receiveTimestamp - sendTimestamp) * 1_000_000.0 / Stopwatch.Frequency; | ||
// ReSharper disable once MethodHasAsyncOverload | ||
message.Acknowledge(); | ||
} | ||
|
||
return latencies; | ||
}); | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_consumer.Dispose(); | ||
_session.Dispose(); | ||
_connection.Dispose(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<ImplicitUsings>enable</ImplicitUsings> | ||
<Nullable>enable</Nullable> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="Apache.NMS.AMQP" Version="2.2.0" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
using System.Diagnostics; | ||
using Apache.NMS; | ||
using Apache.NMS.AMQP; | ||
|
||
namespace Latency_NMS.AMQP; | ||
|
||
public class Producer : IDisposable | ||
{ | ||
private readonly IConnection _connection; | ||
private readonly ISession _session; | ||
private readonly IMessageProducer _producer; | ||
private readonly Random _random; | ||
|
||
private Producer(IConnection connection, ISession session, IMessageProducer producer) | ||
{ | ||
_connection = connection; | ||
_session = session; | ||
_producer = producer; | ||
_random = new Random(); | ||
} | ||
|
||
public static async Task<Producer> CreateAsync(NmsConnectionFactory connectionFactory) | ||
{ | ||
var connection = await connectionFactory.CreateConnectionAsync(); | ||
var session = await connection.CreateSessionAsync(); | ||
|
||
var producer = await session.CreateProducerAsync(await session.GetQueueAsync("latency")); | ||
return new Producer(connection, session, producer); | ||
} | ||
|
||
public Task SendMessagesAsync(int messages, int payloadSize) | ||
{ | ||
return Task.Run(async () => | ||
{ | ||
for (var i = 0; i < messages; i++) | ||
{ | ||
var timestamp = Stopwatch.GetTimestamp(); // Get high precision timestamp | ||
var pingMessage = await _producer.CreateBytesMessageAsync(GenerateRandomData(payloadSize)); | ||
// pingMessage.NMSDeliveryMode = MsgDeliveryMode.Persistent; | ||
pingMessage.Properties["Timestamp"] = timestamp; | ||
|
||
// ReSharper disable once MethodHasAsyncOverload | ||
_producer.Send(pingMessage); | ||
} | ||
}); | ||
} | ||
|
||
private byte[] GenerateRandomData(int size) | ||
{ | ||
byte[] data = new byte[size]; | ||
_random.NextBytes(data); | ||
return data; | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_producer.Dispose(); | ||
_session.Dispose(); | ||
_connection.Dispose(); | ||
} | ||
} |
Oops, something went wrong.