Skip to content

Commit

Permalink
Add thread to Worker.Process
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Jul 22, 2024
1 parent 165c20d commit 955bddd
Showing 1 changed file with 35 additions and 3 deletions.
38 changes: 35 additions & 3 deletions Worker/src/DLLWorker/Services/ComputerService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
Expand All @@ -26,6 +28,7 @@
using ArmoniK.Api.Worker.Worker;
using ArmoniK.DevelopmentKit.Common;
using ArmoniK.DevelopmentKit.Common.Exceptions;
using ArmoniK.Utils;

using Grpc.Core;

Expand All @@ -36,6 +39,8 @@ namespace ArmoniK.DevelopmentKit.Worker.DLLWorker.Services;

public class ComputerService : WorkerStreamWrapper
{
private readonly ChannelWriter<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)> channel_;

public ComputerService(IConfiguration configuration,
GrpcChannelProvider provider,
ServiceRequestContext serviceRequestContext)
Expand All @@ -45,6 +50,30 @@ public ComputerService(IConfiguration configuration,
Configuration = configuration;
Logger = serviceRequestContext.LoggerFactory.CreateLogger<ComputerService>();
ServiceRequestContext = serviceRequestContext;

var channel = Channel.CreateBounded<(ArmonikServiceWorker, ITaskHandler, TaskCompletionSource<byte[]>)>(1);
var channelReader = channel.Reader;
channel_ = channel.Writer;
new Thread(() =>
{
var requests = channelReader.ToAsyncEnumerable(CancellationToken.None)
.ToEnumerable();
foreach (var (service, taskHandler, tcs) in requests)
{
try
{
tcs.SetResult(service.Execute(taskHandler));
}
catch (Exception e)
{
tcs.SetException(e);
}
}
})
{
IsBackground = true,
}.Start();

Logger.LogDebug("Starting worker...OK");
}

Expand Down Expand Up @@ -122,8 +151,11 @@ public override async Task<Output> Process(ITaskHandler taskHandler)
ServiceRequestContext.SessionId = sessionIdCaller;

Logger.LogInformation("Executing task");
var sw = Stopwatch.StartNew();
var result = serviceWorker.Execute(taskHandler);
var sw = Stopwatch.StartNew();
var tcs = new TaskCompletionSource<byte[]>();
await channel_.WriteAsync((serviceWorker, taskHandler, tcs))
.ConfigureAwait(false);
var result = await tcs.Task.ConfigureAwait(false);

if (result != null)
{
Expand Down

0 comments on commit 955bddd

Please sign in to comment.