Skip to content

Commit

Permalink
refactor: Better usage of Utils in PushQueue (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo authored Jul 11, 2023
2 parents 8ee0cdc + 49b2671 commit b0265e1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 55 deletions.
84 changes: 30 additions & 54 deletions Adaptors/Amqp/src/PushQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,12 @@

using ArmoniK.Core.Base;
using ArmoniK.Core.Base.DataStructures;
using ArmoniK.Utils;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;

namespace ArmoniK.Core.Adapters.Amqp;

/// <summary>
/// Policy for creating a <see cref="Session" /> for the <see cref="ObjectPool{Session}" />
/// </summary>
internal sealed class SessionPooledObjectPolicy : IPooledObjectPolicy<Session>
{
private readonly IConnectionAmqp connectionAmqp_;

/// <summary>
/// Initializes a <see cref="SessionPooledObjectPolicy" />
/// </summary>
/// <param name="connectionAmqp">AMQP connection that will be used to create new sessions</param>
public SessionPooledObjectPolicy(IConnectionAmqp connectionAmqp)
=> connectionAmqp_ = connectionAmqp;

/// <inheritdoc />
public Session Create()
=> new(connectionAmqp_.Connection);

/// <inheritdoc />
public bool Return(Session obj)
=> !obj.IsClosed;
}

public class PushQueueStorage : QueueStorage, IPushQueueStorage
{
private const int MaxInternalQueuePriority = 10;
Expand All @@ -70,8 +47,10 @@ public PushQueueStorage(QueueCommon.Amqp options,
: base(options,
connectionAmqp)
{
logger_ = logger;
sessionPool_ = new DefaultObjectPool<Session>(new SessionPooledObjectPolicy(ConnectionAmqp));
logger_ = logger;
sessionPool_ = new ObjectPool<Session>(200,
() => new Session(connectionAmqp.Connection),
session => !session.IsClosed);
}

/// <inheritdoc />
Expand All @@ -80,11 +59,12 @@ public async Task PushMessagesAsync(IEnumerable<MessageData> messages,
CancellationToken cancellationToken = default)
{
var priorityGroups = messages.GroupBy(msgData => msgData.Options.Priority);
await Task.WhenAll(priorityGroups.Select(group => PushMessagesAsync(group,
partitionId,
group.Key,
cancellationToken)))
.ConfigureAwait(false);
await priorityGroups.ParallelForEach(new ParallelTaskOptions(cancellationToken),
group => PushMessagesAsync(group,
partitionId,
group.Key,
cancellationToken))
.ConfigureAwait(false);
}

private async Task PushMessagesAsync(IEnumerable<MessageData> messages,
Expand Down Expand Up @@ -114,29 +94,25 @@ private async Task PushMessagesAsync(IEnumerable<MessageData> messages,
whichQueue,
internalPriority);

var session = sessionPool_.Get();
try
{
var sender = new SenderLink(session,
$"{partitionId}###SenderLink{whichQueue}",
$"{partitionId}###q{whichQueue}");

await Task.WhenAll(messages.Select(msgData => sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId))
{
Header = new Header
{
Priority = (byte)internalPriority,
},
Properties = new Properties(),
})))
.ConfigureAwait(false);

await sender.CloseAsync()
await using var session = await sessionPool_.GetAsync(cancellationToken)
.ConfigureAwait(false);

var sender = new SenderLink(session,
$"{partitionId}###SenderLink{whichQueue}",
$"{partitionId}###q{whichQueue}");

await messages.ParallelForEach(new ParallelTaskOptions(cancellationToken),
msgData => sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId))
{
Header = new Header
{
Priority = (byte)internalPriority,
},
Properties = new Properties(),
}))
.ConfigureAwait(false);
}
finally
{
sessionPool_.Return(session);
}

await sender.CloseAsync()
.ConfigureAwait(false);
}
}
2 changes: 1 addition & 1 deletion Utils/src/ArmoniK.Core.Utils.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Utils" Version="0.2.0" />
<PackageReference Include="ArmoniK.Utils" Version="0.3.0" />
<PackageReference Include="JetBrains.Annotations" Version="2022.3.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
Expand Down

0 comments on commit b0265e1

Please sign in to comment.