Skip to content

Commit

Permalink
Dedicated thread to process worker requests
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Jul 22, 2024
1 parent 5dd06d9 commit 7e8f20d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
8 changes: 6 additions & 2 deletions Worker/src/DLLWorker/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
Expand All @@ -20,5 +20,9 @@

using Microsoft.Extensions.DependencyInjection;

WorkerServer.Create<ComputerService>(serviceConfigurator: collection => collection.AddSingleton<ServiceRequestContext>())
WorkerServer.Create<ComputerService>(serviceConfigurator: collection =>
{
collection.AddSingleton<ServiceRequestContext>();
collection.AddSingleton<ComputerService>();
})
.Run();
39 changes: 35 additions & 4 deletions Worker/src/DLLWorker/Services/ComputerService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
Expand All @@ -27,6 +29,7 @@
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.DevelopmentKit.Worker.Common;
using ArmoniK.Utils;

using Grpc.Core;

Expand All @@ -37,7 +40,8 @@ namespace ArmoniK.DevelopmentKit.Worker.DLLWorker.Services;

public class ComputerService : WorkerStreamWrapper
{
private readonly ApplicationPackageManager appPackageManager_;
private readonly ApplicationPackageManager appPackageManager_;
private readonly ChannelWriter<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)> channel_;

public ComputerService(IConfiguration configuration,
GrpcChannelProvider provider,
Expand All @@ -50,6 +54,30 @@ public ComputerService(IConfiguration configuration,
ServiceRequestContext = serviceRequestContext;
appPackageManager_ = new ApplicationPackageManager(configuration,
serviceRequestContext.LoggerFactory);

var channel = Channel.CreateBounded<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)>(1);
var channelReader = channel.Reader;
channel_ = channel.Writer;
new Thread(() =>
{
var requests = channelReader.ToAsyncEnumerable(CancellationToken.None)
.ToEnumerable();
foreach (var (service, taskHandler, tcs) in requests)
{
try
{
tcs.SetResult(service.Execute(taskHandler));
}
catch (Exception e)
{
tcs.SetException(e);
}
}
})
{
IsBackground = true,
}.Start();

Logger.LogDebug("Starting worker...OK");
}

Expand Down Expand Up @@ -124,8 +152,11 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
ServiceRequestContext.SessionId = sessionIdCaller;

Logger.LogInformation("Executing task");
var sw = Stopwatch.StartNew();
var result = serviceWorker.Execute(taskHandler);
var sw = Stopwatch.StartNew();
var tcs = new TaskCompletionSource<byte[]>();
await channel_.WriteAsync((serviceWorker, taskHandler, tcs))
.ConfigureAwait(false);
var result = await tcs.Task.ConfigureAwait(false);

if (result != null)
{
Expand Down

0 comments on commit 7e8f20d

Please sign in to comment.