Skip to content

Commit

Permalink
Derive pool rent size from buffer size option
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Jan 31, 2024
1 parent 08e1095 commit 3787133
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
10 changes: 8 additions & 2 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal sealed class CommandWriter : IAsyncDisposable
{
private readonly ILogger<CommandWriter> _logger;
private readonly ObjectPool _pool;
private readonly int _arrayPoolInitialSize;
private readonly object _lock = new();
private readonly CancellationTokenSource _cts;
private readonly ConnectionStatsCounter _counter;
Expand All @@ -41,6 +42,11 @@ public CommandWriter(ObjectPool pool, NatsOpts opts, ConnectionStatsCounter coun
{
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
_pool = pool;

// Derive ArrayPool rent size from buffer size to
// avoid defining another option.
_arrayPoolInitialSize = opts.WriterBufferSize / 256;

_counter = counter;
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
_enqueuePing = enqueuePing;
Expand Down Expand Up @@ -189,13 +195,13 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
if (headers != null)
{
if (!_pool.TryRent(out headersBuffer))
headersBuffer = new NatsPooledBufferWriter<byte>();
headersBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
_headerWriter.Write(headersBuffer, headers);
}

NatsPooledBufferWriter<byte> payloadBuffer;
if (!_pool.TryRent(out payloadBuffer!))
payloadBuffer = new NatsPooledBufferWriter<byte>();
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
if (value != null)
serializer.Serialize(payloadBuffer, value);

Expand Down
21 changes: 17 additions & 4 deletions src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,30 @@ namespace NATS.Client.Core.Commands;
// adapted from https://github.com/CommunityToolkit/dotnet/blob/v8.2.2/src/CommunityToolkit.HighPerformance/Buffers/ArrayPoolBufferWriter%7BT%7D.cs
internal sealed class NatsPooledBufferWriter<T> : IBufferWriter<T>, IObjectPoolNode<NatsPooledBufferWriter<T>>
{
private const int DefaultInitialBufferSize = 256;
private const int DefaultInitialMinBufferSize = 256;
private const int DefaultInitialMaxBufferSize = 65536;

private readonly ArrayPool<T> _pool;
private readonly int _size;
private T[]? _array;
private int _index;
private NatsPooledBufferWriter<T>? _next;

public NatsPooledBufferWriter()
public NatsPooledBufferWriter(int size)
{
if (size < DefaultInitialMinBufferSize)
{
size = DefaultInitialMinBufferSize;
}

if (size > DefaultInitialMaxBufferSize)
{
size = DefaultInitialMaxBufferSize;
}

_size = size;
_pool = ArrayPool<T>.Shared;
_array = _pool.Rent(DefaultInitialBufferSize);
_array = _pool.Rent(size);
_index = 0;
}

Expand Down Expand Up @@ -111,7 +124,7 @@ public void Reset()
{
if (_array != null)
_pool.Return(_array);
_array = _pool.Rent(DefaultInitialBufferSize);
_array = _pool.Rent(_size);
_index = 0;
}

Expand Down

0 comments on commit 3787133

Please sign in to comment.