From 49b2671c0716483ecd8119af7c674d808ff08bf4 Mon Sep 17 00:00:00 2001 From: lemaitre-aneo Date: Tue, 11 Jul 2023 12:07:42 +0200 Subject: [PATCH] Better usage of Utils in PushQueue --- Adaptors/Amqp/src/PushQueueStorage.cs | 84 ++++++++++----------------- 1 file changed, 30 insertions(+), 54 deletions(-) diff --git a/Adaptors/Amqp/src/PushQueueStorage.cs b/Adaptors/Amqp/src/PushQueueStorage.cs index 20b0a8f32..9bd6113d9 100644 --- a/Adaptors/Amqp/src/PushQueueStorage.cs +++ b/Adaptors/Amqp/src/PushQueueStorage.cs @@ -27,35 +27,12 @@ using ArmoniK.Core.Base; using ArmoniK.Core.Base.DataStructures; +using ArmoniK.Utils; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.ObjectPool; namespace ArmoniK.Core.Adapters.Amqp; -/// -/// Policy for creating a for the -/// -internal sealed class SessionPooledObjectPolicy : IPooledObjectPolicy -{ - private readonly IConnectionAmqp connectionAmqp_; - - /// - /// Initializes a - /// - /// AMQP connection that will be used to create new sessions - public SessionPooledObjectPolicy(IConnectionAmqp connectionAmqp) - => connectionAmqp_ = connectionAmqp; - - /// - public Session Create() - => new(connectionAmqp_.Connection); - - /// - public bool Return(Session obj) - => !obj.IsClosed; -} - public class PushQueueStorage : QueueStorage, IPushQueueStorage { private const int MaxInternalQueuePriority = 10; @@ -70,8 +47,10 @@ public PushQueueStorage(QueueCommon.Amqp options, : base(options, connectionAmqp) { - logger_ = logger; - sessionPool_ = new DefaultObjectPool(new SessionPooledObjectPolicy(ConnectionAmqp)); + logger_ = logger; + sessionPool_ = new ObjectPool(200, + () => new Session(connectionAmqp.Connection), + session => !session.IsClosed); } /// @@ -80,11 +59,12 @@ public async Task PushMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default) { var priorityGroups = messages.GroupBy(msgData => msgData.Options.Priority); - await Task.WhenAll(priorityGroups.Select(group => PushMessagesAsync(group, - partitionId, - group.Key, - cancellationToken))) - .ConfigureAwait(false); + await priorityGroups.ParallelForEach(new ParallelTaskOptions(cancellationToken), + group => PushMessagesAsync(group, + partitionId, + group.Key, + cancellationToken)) + .ConfigureAwait(false); } private async Task PushMessagesAsync(IEnumerable messages, @@ -114,29 +94,25 @@ private async Task PushMessagesAsync(IEnumerable messages, whichQueue, internalPriority); - var session = sessionPool_.Get(); - try - { - var sender = new SenderLink(session, - $"{partitionId}###SenderLink{whichQueue}", - $"{partitionId}###q{whichQueue}"); - - await Task.WhenAll(messages.Select(msgData => sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId)) - { - Header = new Header - { - Priority = (byte)internalPriority, - }, - Properties = new Properties(), - }))) - .ConfigureAwait(false); - - await sender.CloseAsync() + await using var session = await sessionPool_.GetAsync(cancellationToken) + .ConfigureAwait(false); + + var sender = new SenderLink(session, + $"{partitionId}###SenderLink{whichQueue}", + $"{partitionId}###q{whichQueue}"); + + await messages.ParallelForEach(new ParallelTaskOptions(cancellationToken), + msgData => sender.SendAsync(new Message(Encoding.UTF8.GetBytes(msgData.TaskId)) + { + Header = new Header + { + Priority = (byte)internalPriority, + }, + Properties = new Properties(), + })) .ConfigureAwait(false); - } - finally - { - sessionPool_.Return(session); - } + + await sender.CloseAsync() + .ConfigureAwait(false); } }