diff --git a/Worker/src/DLLWorker/Program.cs b/Worker/src/DLLWorker/Program.cs index 262da44b3..e748f729f 100644 --- a/Worker/src/DLLWorker/Program.cs +++ b/Worker/src/DLLWorker/Program.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -20,5 +20,9 @@ using Microsoft.Extensions.DependencyInjection; -WorkerServer.Create(serviceConfigurator: collection => collection.AddSingleton()) +WorkerServer.Create(serviceConfigurator: collection => + { + collection.AddSingleton(); + collection.AddSingleton(); + }) .Run(); diff --git a/Worker/src/DLLWorker/Services/ComputerService.cs b/Worker/src/DLLWorker/Services/ComputerService.cs index e89a769db..b07ae2732 100644 --- a/Worker/src/DLLWorker/Services/ComputerService.cs +++ b/Worker/src/DLLWorker/Services/ComputerService.cs @@ -1,6 +1,6 @@ // This file is part of the ArmoniK project // -// Copyright (C) ANEO, 2021-2023. All rights reserved. +// Copyright (C) ANEO, 2021-2024. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License") // you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using ArmoniK.Api.Common.Channel.Utils; @@ -27,6 +29,7 @@ using ArmoniK.DevelopmentKit.Common; using ArmoniK.DevelopmentKit.Common.Exceptions; using ArmoniK.DevelopmentKit.Worker.Common; +using ArmoniK.Utils; using Grpc.Core; @@ -37,7 +40,8 @@ namespace ArmoniK.DevelopmentKit.Worker.DLLWorker.Services; public class ComputerService : WorkerStreamWrapper { - private readonly ApplicationPackageManager appPackageManager_; + private readonly ApplicationPackageManager appPackageManager_; + private readonly ChannelWriter<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource)> channel_; public ComputerService(IConfiguration configuration, GrpcChannelProvider provider, @@ -50,6 +54,30 @@ public ComputerService(IConfiguration configuration, ServiceRequestContext = serviceRequestContext; appPackageManager_ = new ApplicationPackageManager(configuration, serviceRequestContext.LoggerFactory); + + var channel = Channel.CreateBounded<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource)>(1); + var channelReader = channel.Reader; + channel_ = channel.Writer; + new Thread(() => + { + var requests = channelReader.ToAsyncEnumerable(CancellationToken.None) + .ToEnumerable(); + foreach (var (service, taskHandler, tcs) in requests) + { + try + { + tcs.SetResult(service.Execute(taskHandler)); + } + catch (Exception e) + { + tcs.SetException(e); + } + } + }) + { + IsBackground = true, + }.Start(); + Logger.LogDebug("Starting worker...OK"); } @@ -124,8 +152,11 @@ public override async Task Process(ITaskHandler taskHandler) ServiceRequestContext.SessionId = sessionIdCaller; Logger.LogInformation("Executing task"); - var sw = Stopwatch.StartNew(); - var result = serviceWorker.Execute(taskHandler); + var sw = Stopwatch.StartNew(); + var tcs = new TaskCompletionSource(); + await channel_.WriteAsync((serviceWorker, taskHandler, tcs)) + .ConfigureAwait(false); + var result = await tcs.Task.ConfigureAwait(false); if (result != null) {