Skip to content

Commit

Permalink
fix: Thread for Worker.Process (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
ngruelaneo authored Jul 22, 2024
2 parents 165c20d + 27a215b commit efa2ed7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
6 changes: 5 additions & 1 deletion Worker/src/DLLWorker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@
AppContext.SetSwitch("System.Net.SocketsHttpHandler.Http2FlowControl.DisableDynamicWindowSizing",
true);

WorkerServer.Create<ComputerService>(serviceConfigurator: collection => collection.AddSingleton<ServiceRequestContext>())
WorkerServer.Create<ComputerService>(serviceConfigurator: collection =>
{
collection.AddSingleton<ComputerService>();
collection.AddSingleton<ServiceRequestContext>();
})
.Run();
38 changes: 35 additions & 3 deletions Worker/src/DLLWorker/Services/ComputerService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
Expand All @@ -26,6 +28,7 @@
using ArmoniK.Api.Worker.Worker;
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.Utils;

using Grpc.Core;

Expand All @@ -36,6 +39,8 @@ namespace ArmoniK.DevelopmentKit.Worker.DLLWorker.Services;

public class ComputerService : WorkerStreamWrapper
{
private readonly ChannelWriter<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)> channel_;

public ComputerService(IConfiguration configuration,
GrpcChannelProvider provider,
ServiceRequestContext serviceRequestContext)
Expand All @@ -45,6 +50,30 @@ public ComputerService(IConfiguration configuration,
Configuration = configuration;
Logger = serviceRequestContext.LoggerFactory.CreateLogger<ComputerService>();
ServiceRequestContext = serviceRequestContext;

var channel = Channel.CreateBounded<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)>(1);
var channelReader = channel.Reader;
channel_ = channel.Writer;
new Thread(() =>
{
var requests = channelReader.ToAsyncEnumerable(CancellationToken.None)
.ToEnumerable();
foreach (var (service, taskHandler, tcs) in requests)
{
try
{
tcs.SetResult(service.Execute(taskHandler));
}
catch (Exception e)
{
tcs.SetException(e);
}
}
})
{
IsBackground = true,
}.Start();

Logger.LogDebug("Starting worker...OK");
}

Expand Down Expand Up @@ -122,8 +151,11 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
ServiceRequestContext.SessionId = sessionIdCaller;

Logger.LogInformation("Executing task");
var sw = Stopwatch.StartNew();
var result = serviceWorker.Execute(taskHandler);
var sw = Stopwatch.StartNew();
var tcs = new TaskCompletionSource<byte[]>();
await channel_.WriteAsync((serviceWorker, taskHandler, tcs))
.ConfigureAwait(false);
var result = await tcs.Task.ConfigureAwait(false);

if (result != null)
{
Expand Down

0 comments on commit efa2ed7

Please sign in to comment.