diff --git a/src/TaskDumper/ArmoniK.TaskDumper.csproj b/src/TaskDumper/ArmoniK.TaskDumper.csproj index c1bae73..62f5c5c 100644 --- a/src/TaskDumper/ArmoniK.TaskDumper.csproj +++ b/src/TaskDumper/ArmoniK.TaskDumper.csproj @@ -15,7 +15,7 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="ArmoniK.Api.Client" Version="3.20.0-edge.20.f49315d" /> + <PackageReference Include="ArmoniK.Api.Client" Version="3.20.0" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" /> diff --git a/src/TaskDumper/Program.cs b/src/TaskDumper/Program.cs index ffbb38a..42aeb19 100644 --- a/src/TaskDumper/Program.cs +++ b/src/TaskDumper/Program.cs @@ -16,8 +16,10 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.CommandLine; using System.IO; +using System.Linq; using System.Text; using System.Text.Json; using System.Threading; @@ -32,6 +34,8 @@ using ArmoniK.Api.gRPC.V1.Tasks; using ArmoniK.Api.gRPC.V1.Worker; +using Grpc.Net.Client; + using Microsoft.Extensions.Configuration; using FilterField = ArmoniK.Api.gRPC.V1.Tasks.FilterField; @@ -40,6 +44,66 @@ namespace ArmoniK.TaskDumper; +public static class TaskPagination +{ + public static async IAsyncEnumerable<TaskSummary> ListTasksAsync(this GrpcChannel channel, + Filters filters, + ListTasksRequest.Types.Sort sort, + int pageSize = 50) + { + var page = 0; + var taskClient = new Tasks.TasksClient(channel); + ListTasksResponse res; + + while ((res = await taskClient.ListTasksAsync(new ListTasksRequest + { + Filters = filters, + Sort = sort, + PageSize = pageSize, + Page = page, + }) + .ConfigureAwait(false)).Tasks.Any()) + { + foreach (var taskSummary in res.Tasks) + { + yield return taskSummary; + } + + page++; + } + } +} + +public static class ResultPagination +{ + public static async IAsyncEnumerable<ResultRaw> ListResultsAsync(this GrpcChannel channel, + Api.gRPC.V1.Results.Filters filters, + ListResultsRequest.Types.Sort sort, + int pageSize = 50) + { + var page = 0; + var taskClient = new Results.ResultsClient(channel); + ListResultsResponse res; + + while ((res = await taskClient.ListResultsAsync(new ListResultsRequest + { + Filters = filters, + Sort = sort, + PageSize = pageSize, + Page = page, + }) + .ConfigureAwait(false)).Results.Any()) + { + foreach (var taskSummary in res.Results) + { + yield return taskSummary; + } + + page++; + } + } +} + internal static class Program { /// <summary> @@ -48,6 +112,7 @@ internal static class Program /// <param name="endpoint">The endpoint URL of ArmoniK's control plane.</param> /// <param name="taskId">The TaskId of the task to retrieve.</param> /// <param name="dataFolder">The folder to store all required binaries.</param> + /// <param name="grpcClientOptions">grpc Specific Option, can be set through appsettings.json or environment variable</param> /// <returns> /// Task representing the asynchronous execution of the method /// </returns> @@ -63,8 +128,10 @@ internal static async Task Run(string endpoint, { Endpoint = endpoint, }); - + // Set folder var folder = dataFolder ?? Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "ak_dumper_" + taskId; + folder += Path.DirectorySeparatorChar; + // Create clients for tasks and results. var taskClient = new Tasks.TasksClient(channel); var resultClient = new Results.ResultsClient(channel); @@ -75,8 +142,6 @@ internal static async Task Run(string endpoint, TaskId = taskId, }); - Console.WriteLine(taskResponse); - // Create a ProcessRequest object with information obtained from the task request. var DumpData = new ProcessRequest { @@ -97,23 +162,21 @@ internal static async Task Run(string endpoint, .DataChunkMaxSize, }, PayloadId = taskResponse.Task.PayloadId, - DataFolder = folder + Path.DirectorySeparatorChar + "Results", + DataFolder = folder + "Results", }; + // Convert the ProcessRequest object to JSON. var JSONresult = DumpData.ToString(); // Create the dataFolder directory if it doesn't exist. - if (!Directory.Exists(folder + Path.DirectorySeparatorChar + "Results")) + if (!Directory.Exists(folder + "Results")) { - Directory.CreateDirectory(folder + Path.DirectorySeparatorChar + "Results"); + Directory.CreateDirectory(folder + "Results"); } // Write the JSON to a file with the specified name. - using (var tw = new StreamWriter(folder + Path.DirectorySeparatorChar + "task.json", - false)) - { - await tw.WriteLineAsync(JSONresult); - } + await File.WriteAllTextAsync(folder + "Task.json", + JsonSerializer.Serialize(JSONresult)); // Save DataDependencies data to files in the folder named <resultId>. foreach (var data in taskResponse.Task.DataDependencies) @@ -126,7 +189,27 @@ internal static async Task Run(string endpoint, { if (dataDependency.Result.Status == ResultStatus.Completed) { - await File.WriteAllBytesAsync(Path.Combine(folder + Path.DirectorySeparatorChar + "Results", + await File.WriteAllBytesAsync(Path.Combine(folder + "Results", + data), + await resultClient.DownloadResultData(taskResponse.Task.SessionId, + data, + CancellationToken.None) ?? Encoding.ASCII.GetBytes("")); + } + } + } + + // Save ExpectedOutputs data to files in the folder named <resultId>. + foreach (var data in taskResponse.Task.ExpectedOutputIds) + { + var expectedOutputs = resultClient.GetResult(new GetResultRequest + { + ResultId = data, + }); + if (!string.IsNullOrEmpty(data)) + { + if (expectedOutputs.Result.Status == ResultStatus.Completed) + { + await File.WriteAllBytesAsync(Path.Combine(folder + "Results", data), await resultClient.DownloadResultData(taskResponse.Task.SessionId, data, @@ -140,140 +223,125 @@ await resultClient.DownloadResultData(taskResponse.Task.SessionId, { ResultId = taskResponse.Task.PayloadId, }); + // Download payload if (payload.Result.Status == ResultStatus.Completed) { - await File.WriteAllBytesAsync(Path.Combine(folder + Path.DirectorySeparatorChar + "Results", + await File.WriteAllBytesAsync(Path.Combine(folder + "Results", taskResponse.Task.PayloadId), await resultClient.DownloadResultData(taskResponse.Task.SessionId, taskResponse.Task.PayloadId, CancellationToken.None) ?? Encoding.ASCII.GetBytes("")); } - // Found all results createdBy taskId - var taskCreated = await taskClient.ListTasksAsync(new ListTasksRequest - { - Filters = new Filters - { - Or = - { - new FiltersAnd + //Search subtasks created by TaskId + var taskCreated = channel.ListTasksAsync(new Filters + { + Or = + { + new FiltersAnd + { + And = + { + new FilterField + { + FilterString = new FilterString { - And = - { - new FilterField - { - FilterString = new FilterString - { - Operator = FilterStringOperator.Equal, - Value = taskId, - }, - Field = new TaskField - { - TaskSummaryField = new TaskSummaryField - { - Field = TaskSummaryEnumField.CreatedBy, - }, - }, - }, - }, + Operator = FilterStringOperator.Equal, + Value = taskId, }, - }, - }, - Sort = new ListTasksRequest.Types.Sort + Field = new TaskField { - Direction = SortDirection.Asc, - Field = new TaskField - { - TaskSummaryField = new TaskSummaryField - { - Field = TaskSummaryEnumField.TaskId, - }, - }, + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.CreatedBy, + }, }, - PageSize = 1, - Page = 0, - }); - - if (!Directory.Exists(folder + Path.DirectorySeparatorChar + "Outputs" + Path.DirectorySeparatorChar + "Results")) - { - Directory.CreateDirectory(folder + Path.DirectorySeparatorChar + "Outputs" + Path.DirectorySeparatorChar + "Results"); - } - + }, + }, + }, + }, + }, + new ListTasksRequest.Types.Sort + { + Direction = SortDirection.Asc, + Field = new TaskField + { + TaskSummaryField = new TaskSummaryField + { + Field = TaskSummaryEnumField.TaskId, + }, + }, + }); + + // Create subtasks json in var folder var tasks = new ConcurrentDictionary<string, TaskSummary>(); - foreach (var task in taskCreated.Tasks) + await foreach (var task in taskCreated) { tasks[task.Id] = task; } - var resultsCreated = await resultClient.ListResultsAsync(new ListResultsRequest - { - Filters = new Api.gRPC.V1.Results.Filters - { - Or = + await File.WriteAllTextAsync(folder + "Subtasks.json", + JsonSerializer.Serialize(tasks)); + + //Search results created by TaskId + var resultsCreated = channel.ListResultsAsync(new Api.gRPC.V1.Results.Filters + { + Or = + { + new Api.gRPC.V1.Results.FiltersAnd + { + And = + { + new Api.gRPC.V1.Results.FilterField + { + FilterString = new FilterString { - new Api.gRPC.V1.Results.FiltersAnd - { - And = + Operator = FilterStringOperator.Equal, + Value = taskId, + }, + Field = new ResultField + { + ResultRawField = new ResultRawField + { + Field = ResultRawEnumField.CreatedBy, + }, + }, + }, + }, + }, + }, + }, + new ListResultsRequest.Types.Sort + { + Direction = SortDirection.Asc, + Field = new ResultField + { + ResultRawField = new ResultRawField { - new Api.gRPC.V1.Results.FilterField - { - FilterString = new FilterString - { - Operator = FilterStringOperator.Equal, - Value = taskId, - }, - Field = new ResultField - { - ResultRawField = new ResultRawField - { - Field = ResultRawEnumField.CreatedBy, - }, - }, - }, + Field = ResultRawEnumField.ResultId, }, - }, - }, - }, - Sort = new ListResultsRequest.Types.Sort - { - Direction = SortDirection.Asc, - Field = new ResultField - { - ResultRawField = new ResultRawField - { - Field = ResultRawEnumField.ResultId, - }, - }, - }, - Page = 0, - PageSize = 1, - }); + }, + }); + // Create created results json in var folder var results = new ConcurrentDictionary<string, ResultRaw>(); - foreach (var result in resultsCreated.Results) + await foreach (var result in resultsCreated) { results[result.ResultId] = result; - await File.WriteAllBytesAsync(Path.Combine(folder + Path.DirectorySeparatorChar + "Outputs" + Path.DirectorySeparatorChar + "Results", + // Put subtask results in var folder + "Results" + await File.WriteAllBytesAsync(Path.Combine(folder + "Results", result.ResultId), await resultClient.DownloadResultData(taskResponse.Task.SessionId, result.ResultId, CancellationToken.None) ?? Encoding.ASCII.GetBytes("")); } - using (var tw = new StreamWriter(folder + Path.DirectorySeparatorChar + "Outputs" + Path.DirectorySeparatorChar + "Results.json", - false)) - { - await tw.WriteLineAsync(JsonSerializer.Serialize(results)); - } - - using (var tw = new StreamWriter(folder + Path.DirectorySeparatorChar + "Outputs" + Path.DirectorySeparatorChar + "Tasks.json", - false)) - { - await tw.WriteLineAsync(JsonSerializer.Serialize(tasks)); - } + await File.WriteAllTextAsync(folder + "CreatedResults.json", + JsonSerializer.Serialize(results)); } + public static async Task<int> Main(string[] args) { // Load configuration from environment variables and appsettings.json diff --git a/src/TaskReRunner/ArmoniK.TaskReRunner.csproj b/src/TaskReRunner/ArmoniK.TaskReRunner.csproj index d030206..b2de71b 100644 --- a/src/TaskReRunner/ArmoniK.TaskReRunner.csproj +++ b/src/TaskReRunner/ArmoniK.TaskReRunner.csproj @@ -17,12 +17,13 @@ </PropertyGroup> <ItemGroup> - <PackageReference Include="ArmoniK.Api.Core" Version="3.20.0-edge.20.f49315d" /> + <PackageReference Include="ArmoniK.Api.Core" Version="3.20.0" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.3" /> <PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Serilog.Settings.Configuration" Version="8.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" /> <PackageReference Include="Serilog.Formatting.Compact" Version="3.0.0" /> + <PackageReference Include="Spectre.Console" Version="0.49.1" /> <PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" /> <None Include="../../README.md" Pack="true" PackagePath="/" /> </ItemGroup> diff --git a/src/TaskReRunner/Program.cs b/src/TaskReRunner/Program.cs index 3c724cc..9e1a163 100644 --- a/src/TaskReRunner/Program.cs +++ b/src/TaskReRunner/Program.cs @@ -15,12 +15,17 @@ // limitations under the License. using System; +using System.Collections.Concurrent; using System.CommandLine; using System.IO; +using System.Linq; +using System.Text.Json; using System.Threading.Tasks; using ArmoniK.Api.Common.Channel.Utils; using ArmoniK.Api.Common.Options; +using ArmoniK.Api.gRPC.V1.Results; +using ArmoniK.Api.gRPC.V1.Tasks; using ArmoniK.Api.gRPC.V1.Worker; using ArmoniK.TaskReRunner.Storage; @@ -29,16 +34,61 @@ using Serilog; +using Spectre.Console; + namespace ArmoniK.TaskReRunner; internal static class Program { + public static void CopyDirectory(string sourceDir, + string destinationDir, + bool recursive) + { + // Get information about the source directory + var dir = new DirectoryInfo(sourceDir); + + // Check if the source directory exists + if (!dir.Exists) + { + throw new DirectoryNotFoundException($"Source directory not found: {dir.FullName}"); + } + + // Cache directories before we start copying + var dirs = dir.GetDirectories(); + + // Create the destination directory + Directory.CreateDirectory(destinationDir); + + // Get the files in the source directory and copy to the destination directory + foreach (var file in dir.GetFiles()) + { + var targetFilePath = Path.Combine(destinationDir, + file.Name); + file.CopyTo(targetFilePath); + } + + // If recursive and copying subdirectories, recursively call this method + if (recursive) + { + foreach (var subDir in dirs) + { + var newDestinationDir = Path.Combine(destinationDir, + subDir.Name); + CopyDirectory(subDir.FullName, + newDestinationDir, + true); + } + } + } + /// <summary> - /// Connect to a Worker to process tasks with specific process parameter. + /// Connect to a Worker to process tasks with specific parameters. /// </summary> /// <param name="path">Path to the json file containing the data needed to rerun the Task.</param> + /// <param name="force">Allow deletion of the previous output of this program.</param> /// <exception cref="ArgumentException"></exception> - public static void Run(string path) + public static async Task Run(string path, + bool force) { // Create a logger configuration to write output to the console with contextual information. var loggerConfiguration_ = new LoggerConfiguration().WriteTo.Console() @@ -70,20 +120,37 @@ public static void Run(string path) } //Deserialize the Data in the Json - var input = ProcessRequest.Parser.ParseJson(File.ReadAllText(path)); - + var input = ProcessRequest.Parser.ParseJson(File.ReadAllText(path)); + var oldDataFolder = input.DataFolder; // Create a CommunicationToken if there isn't if (string.IsNullOrEmpty(input.CommunicationToken)) { input.CommunicationToken = token; } - // Set the dataFolder + var copyPath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "ak_dumper_" + input.TaskId; + if (Directory.Exists(copyPath)) + { + if (force) + { + Directory.Delete(copyPath, + true); + } + else + { + throw new Exception("Use --force to allow deletion or delete ak_dumper_ + taskId"); + } + } + + CopyDirectory(input.DataFolder + Path.DirectorySeparatorChar + "..", + copyPath, + true); + input.DataFolder = copyPath + Path.DirectorySeparatorChar + "Results"; // Create an AgentStorage to keep the Agent Data After Process var storage = new AgentStorage(); - // Scope for the Task to run + // Scope for running the Task { // Launch an Agent server to listen the worker using var server = new Server(Path.GetTempPath() + "sockets" + Path.DirectorySeparatorChar + "agent.sock", @@ -100,19 +167,20 @@ public static void Run(string path) ret.ToString()); } - // Print everything in agent storage - logger_.LogInformation("resultsIds : {results}", - storage.NotifiedResults); + // Print all the data in agent storage + + logger_.LogInformation("Number of Notified result : {results}", + storage.NotifiedResults.Count); var i = 0; foreach (var result in storage.NotifiedResults) { - logger_.LogInformation("Notified result{i} Id: {res}", + logger_.LogInformation("Notified result {i} Id: {res}", i, result); var byteArray = File.ReadAllBytes(Path.Combine(input.DataFolder, result)); - logger_.LogInformation("Notified result{i} Data : {str}", + logger_.LogInformation("Notified result {i} Data : {str}", i, byteArray); i++; @@ -133,33 +201,223 @@ public static void Run(string path) task.Value); } - // recover results create a folder taskrerunner - // comparer nombres de results - // comparer name si name ou id identique comparer binaire de maniere intelligente genre diff - // comparer nombre de task creer - // comparer payload des task si meme id - // mettre dans bon dossier - // question est ce que le result Id et task iD peut etre controler via option ? - // creer deuxieme json with all get result , get task de la tache rejouer ? - // package nuget don't support caps ? + var jsonString = File.ReadAllText(copyPath + Path.DirectorySeparatorChar + "CreatedResults.json"); + var resultOutputs = JsonSerializer.Deserialize<ConcurrentDictionary<string, ResultRaw>>(jsonString); + + var jsonStringTasks = File.ReadAllText(copyPath + Path.DirectorySeparatorChar + "Subtasks.json"); + var tasksOutputs = JsonSerializer.Deserialize<ConcurrentDictionary<string, TaskSummary>>(jsonStringTasks); + + var test = File.ReadAllBytes(input.DataFolder + Path.DirectorySeparatorChar + resultOutputs!.Single() + .Key); + + if (resultOutputs!.Count == storage.Results.Count) + { + logger_.LogInformation("ArmoniK Output and TaskReRunner output have Equal Created Results"); + } + else + { + logger_.LogInformation("ArmoniK and TaskReRunner have Different Created Results : ArmoniK = {Count} TaskReRunner = {storage}", + resultOutputs!.Count.ToString(), + storage.Results.Count.ToString()); + } + + if (tasksOutputs!.Count == storage.Tasks.Count) + { + logger_.LogInformation("ArmoniK Output and TaskReRunner output have Equal Created Tasks"); + } + else + { + logger_.LogInformation("ArmoniK and TaskReRunner have Different Created Tasks : ArmoniK = {tasksOutputs} TaskReRunner = {storage}", + tasksOutputs!.Count.ToString(), + storage.Tasks.Count.ToString()); + } + + var table = new Table(); + table.AddColumn("Category"); + table.AddColumn("Downloaded"); + table.AddColumn("Replayed"); + + if (input.ExpectedOutputKeys.Count == 1) + { + var downloadedOutput = File.ReadAllBytes(oldDataFolder + Path.DirectorySeparatorChar + input.ExpectedOutputKeys.Single()); + var replayedOutput = File.ReadAllBytes(input.DataFolder + Path.DirectorySeparatorChar + input.ExpectedOutputKeys.Single()); + table.AddRow("Result Id", + input.ExpectedOutputKeys.Single(), + input.ExpectedOutputKeys.Single()); + table.AddRow("└─ size", + downloadedOutput.LongLength.ToString(), + replayedOutput.LongLength.ToString()); + + + if (!downloadedOutput.SequenceEqual(replayedOutput)) + { + AnsiConsole.Write(table); + table = new Table(); + table.AddColumn("Category"); + table.AddColumn("Downloaded"); + table.AddColumn("Replayed"); + logger_.LogInformation("ArmoniK Expected output is {test}", + downloadedOutput); + logger_.LogInformation("TaskReRunner Expected output is {storage}", + replayedOutput); + } + } + + if (tasksOutputs!.Count == 1 && storage.Tasks.Count == 1) + { + table.AddRow("Task Id", + tasksOutputs.Single() + .Key, + storage.Tasks.Single() + .Key); + table.AddRow("├─ nbr subtask", + tasksOutputs.Single() + .Value.CountDataDependencies.ToString(), + storage.Tasks.Single() + .Value.DataDependencies.Count.ToString()); + table.AddRow("├─ nbr result", + tasksOutputs.Single() + .Value.CountExpectedOutputIds.ToString(), + storage.Tasks.Single() + .Value.ExpectedOutputKeys.Count.ToString()); + table.AddRow("└─ TaskOption", + "-", + "-"); + table.AddRow(" ├─ Options", + JsonSerializer.Serialize(tasksOutputs.Single() + .Value.Options.Options), + JsonSerializer.Serialize(storage.Tasks.Single() + .Value.TaskOptions.Options)); + table.AddRow(" ├─ MaxDuration", + "-", + "-"); + table.AddRow(" │ ├─ Second", + tasksOutputs.Single() + .Value.Options.MaxDuration.Seconds.ToString(), + storage.Tasks.Single() + .Value.TaskOptions.MaxDuration.Seconds.ToString()); + table.AddRow(" │ └─ Nanos", + tasksOutputs.Single() + .Value.Options.MaxDuration.Nanos.ToString(), + storage.Tasks.Single() + .Value.TaskOptions.MaxDuration.Nanos.ToString()); + table.AddRow(" ├─ MaxRetries", + tasksOutputs.Single() + .Value.Options.MaxRetries.ToString(), + storage.Tasks.Single() + .Value.TaskOptions.MaxRetries.ToString()); + table.AddRow(" ├─ Priority", + tasksOutputs.Single() + .Value.Options.Priority.ToString(), + storage.Tasks.Single() + .Value.TaskOptions.Priority.ToString()); + table.AddRow(" ├─ PartitionId", + tasksOutputs.Single() + .Value.Options.PartitionId, + storage.Tasks.Single() + .Value.TaskOptions.PartitionId); + table.AddRow(" ├─ ApplicationName", + tasksOutputs.Single() + .Value.Options.ApplicationName, + storage.Tasks.Single() + .Value.TaskOptions.ApplicationName); + table.AddRow(" ├─ ApplicationNamespace", + tasksOutputs.Single() + .Value.Options.ApplicationVersion, + storage.Tasks.Single() + .Value.TaskOptions.ApplicationVersion); + table.AddRow(" ├─ ApplicationNamespace", + tasksOutputs.Single() + .Value.Options.ApplicationNamespace, + storage.Tasks.Single() + .Value.TaskOptions.ApplicationNamespace); + table.AddRow(" ├─ ApplicationService", + tasksOutputs.Single() + .Value.Options.ApplicationService, + storage.Tasks.Single() + .Value.TaskOptions.ApplicationService); + table.AddRow(" └─ EngineType", + tasksOutputs.Single() + .Value.Options.EngineType, + storage.Tasks.Single() + .Value.TaskOptions.EngineType); + if (storage.Results.Count == 1 && resultOutputs.Count == 1) + { + table.AddRow("Result Id", + resultOutputs.Single() + .Key, + storage.Results.Single() + .Value.ResultId); + table.AddRow("├─ name", + resultOutputs.Single() + .Value.Name, + storage.Results.Single() + .Value.Name); + table.AddRow("└─ size", + resultOutputs.Single() + .Value.Size.ToString(), + storage.Results.Single() + .Value.Data!.LongLength.ToString()); + + AnsiConsole.Write(table); + + if (!test.SequenceEqual(storage.Results.Single() + .Value.Data!)) + { + logger_.LogInformation("ArmoniK SubTask Payload is {test}", + test); + logger_.LogInformation("TaskReRunner SubTask Payload is {storage}", + storage.Results.Single() + .Value.Data); + } + } + else + { + if (table.Rows.Count > 4) + { + AnsiConsole.Write(table); + } + } + } + + await File.WriteAllTextAsync(copyPath + Path.DirectorySeparatorChar + "Subtasks.json", + JsonSerializer.Serialize(storage.Tasks)); + + + await File.WriteAllTextAsync(copyPath + Path.DirectorySeparatorChar + "CreatedResults.json", + JsonSerializer.Serialize(storage.Results)); + + await File.WriteAllTextAsync(copyPath + Path.DirectorySeparatorChar + "Task.json", + JsonSerializer.Serialize(input)); + + foreach (var result in storage.Results) + { + await File.WriteAllBytesAsync(copyPath + Path.DirectorySeparatorChar + "Results" + Path.DirectorySeparatorChar + result.Key, + result.Value.Data!); + } } public static async Task<int> Main(string[] args) { // Define the options for the application with their description and default value var path = new Option<string>("--path", - description: "Path to the json file containing the data needed to rerun the Task.", + description: "Path to the JSON file containing the data needed to rerun the Task.", getDefaultValue: () => "task.json"); + var force = new Option<bool>("--force", + description: "Allow this program previous output deletion", + getDefaultValue: () => false); // Describe the application and its purpose var rootCommand = - new RootCommand("This application allows you to rerun ArmoniK individual task in local. It reads the data in <path>, connect to a worker and rerun the Task."); + new RootCommand("This application allows you to rerun an individual ArmoniK task locally. It reads the data in <path>, connects to a worker, and reruns the Task."); rootCommand.AddOption(path); + rootCommand.AddOption(force); // Configure the handler to call the function that will do the work rootCommand.SetHandler(Run, - path); + path, + force); // Parse the command line parameters and call the function that represents the application return await rootCommand.InvokeAsync(args); diff --git a/src/TaskReRunner/ReRunnerAgent.cs b/src/TaskReRunner/ReRunnerAgent.cs index 9e05e18..f28071f 100644 --- a/src/TaskReRunner/ReRunnerAgent.cs +++ b/src/TaskReRunner/ReRunnerAgent.cs @@ -30,6 +30,9 @@ namespace ArmoniK.TaskReRunner; +/// <summary> +/// An heritor of agent class that storethings +/// </summary> public class ReRunnerAgent : Agent.AgentBase { private readonly ILogger<ReRunnerAgent> logger_; @@ -185,7 +188,7 @@ public override Task<SubmitTasksResponse> SubmitTasks(SubmitTasksRequest request ExpectedOutputKeys = rc.ExpectedOutputKeys, PayloadId = rc.PayloadId, TaskId = taskId, - TaskOptions = rc.TaskOptions, + TaskOptions = request.TaskOptions, }; storage_.Tasks[taskId] = current; return current; diff --git a/src/TaskReRunner/Storage/AgentStorage.cs b/src/TaskReRunner/Storage/AgentStorage.cs index 2277e00..4fa0ea5 100644 --- a/src/TaskReRunner/Storage/AgentStorage.cs +++ b/src/TaskReRunner/Storage/AgentStorage.cs @@ -15,7 +15,6 @@ // limitations under the License. using System.Collections.Concurrent; -using System.Collections.Generic; namespace ArmoniK.TaskReRunner.Storage; @@ -27,7 +26,7 @@ public class AgentStorage /// <summary> /// The data obtained through the notified result call. This set contains the IDs of the results that have been notified. /// </summary> - public readonly HashSet<string> NotifiedResults = new(); + public readonly ConcurrentBag<string> NotifiedResults = new(); /// <summary> /// The data obtained through the CreateResult or CreateMetaDataResult call. This dictionary stores results keyed by diff --git a/src/TaskReRunner/Storage/TaskData.cs b/src/TaskReRunner/Storage/TaskData.cs index 7e2246a..9c08bc9 100644 --- a/src/TaskReRunner/Storage/TaskData.cs +++ b/src/TaskReRunner/Storage/TaskData.cs @@ -46,5 +46,8 @@ public record TaskData /// </summary> public ICollection<string> ExpectedOutputKeys { get; init; } = new List<string>(); - public TaskOptions TaskOptions { get; init; } + /// <summary> + /// Get or init the task options. + /// </summary> + public required TaskOptions TaskOptions { get; init; } }