Skip to content

Commit

Permalink
feat: add result comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
ereali-aneo committed Sep 11, 2024
1 parent 48afc4b commit 68d885d
Show file tree
Hide file tree
Showing 6 changed files with 462 additions and 126 deletions.
2 changes: 1 addition & 1 deletion src/TaskDumper/ArmoniK.TaskDumper.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.20.0-edge.20.f49315d" />
<PackageReference Include="ArmoniK.Api.Client" Version="3.20.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
Expand Down
286 changes: 181 additions & 105 deletions src/TaskDumper/Program.cs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/TaskReRunner/ArmoniK.TaskReRunner.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ArmoniK.Api.Core" Version="3.20.0-edge.20.f49315d" />
<PackageReference Include="ArmoniK.Api.Core" Version="3.20.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Settings.Configuration" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Formatting.Compact" Version="3.0.0" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<None Include="../../README.md" Pack="true" PackagePath="/" />
</ItemGroup>
Expand Down
290 changes: 273 additions & 17 deletions src/TaskReRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
// limitations under the License.

using System;
using System.Collections.Concurrent;
using System.CommandLine;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;

using ArmoniK.Api.Common.Channel.Utils;
using ArmoniK.Api.Common.Options;
using ArmoniK.Api.gRPC.V1.Results;
using ArmoniK.Api.gRPC.V1.Tasks;
using ArmoniK.Api.gRPC.V1.Worker;
using ArmoniK.TaskReRunner.Storage;

Expand All @@ -29,16 +34,59 @@

using Serilog;

using Spectre.Console;

namespace ArmoniK.TaskReRunner;

internal static class Program
{
public static void CopyDirectory(string sourceDir,
string destinationDir,
bool recursive)
{
// Get information about the source directory
var dir = new DirectoryInfo(sourceDir);

// Check if the source directory exists
if (!dir.Exists)
{
throw new DirectoryNotFoundException($"Source directory not found: {dir.FullName}");
}

// Cache directories before we start copying
var dirs = dir.GetDirectories();

// Create the destination directory
Directory.CreateDirectory(destinationDir);

// Get the files in the source directory and copy to the destination directory
foreach (var file in dir.GetFiles())
{
var targetFilePath = Path.Combine(destinationDir,
file.Name);
file.CopyTo(targetFilePath);
}

// If recursive and copying subdirectories, recursively call this method
if (recursive)
{
foreach (var subDir in dirs)
{
var newDestinationDir = Path.Combine(destinationDir,
subDir.Name);
CopyDirectory(subDir.FullName,
newDestinationDir,
true);
}
}
}

/// <summary>
/// Connect to a Worker to process tasks with specific process parameter.
/// </summary>
/// <param name="path">Path to the json file containing the data needed to rerun the Task.</param>
/// <exception cref="ArgumentException"></exception>
public static void Run(string path)
public static async Task Run(string path)
{
// Create a logger configuration to write output to the console with contextual information.
var loggerConfiguration_ = new LoggerConfiguration().WriteTo.Console()
Expand Down Expand Up @@ -70,15 +118,26 @@ public static void Run(string path)
}

//Deserialize the Data in the Json
var input = ProcessRequest.Parser.ParseJson(File.ReadAllText(path));

var input = ProcessRequest.Parser.ParseJson(File.ReadAllText(path));
var oldDataFolder = input.DataFolder;
// Create a CommunicationToken if there isn't
if (string.IsNullOrEmpty(input.CommunicationToken))
{
input.CommunicationToken = token;
}

// Set the dataFolder
var copyPath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + "ak_dumper_" + input.TaskId;

if (!Directory.Exists(copyPath))
{
CopyDirectory(input.DataFolder + Path.DirectorySeparatorChar + "..",
copyPath,
true);
}
else // Set the dataFolder
{
input.DataFolder = copyPath + Path.DirectorySeparatorChar + "Results";
}

// Create an AgentStorage to keep the Agent Data After Process
var storage = new AgentStorage();
Expand All @@ -101,18 +160,19 @@ public static void Run(string path)
}

// Print everything in agent storage
logger_.LogInformation("resultsIds : {results}",
storage.NotifiedResults);

logger_.LogInformation("Number of Notified result : {results}",
storage.NotifiedResults.Count);

var i = 0;
foreach (var result in storage.NotifiedResults)
{
logger_.LogInformation("Notified result{i} Id: {res}",
logger_.LogInformation("Notified result {i} Id: {res}",
i,
result);
var byteArray = File.ReadAllBytes(Path.Combine(input.DataFolder,
result));
logger_.LogInformation("Notified result{i} Data : {str}",
logger_.LogInformation("Notified result {i} Data : {str}",
i,
byteArray);
i++;
Expand All @@ -133,15 +193,211 @@ public static void Run(string path)
task.Value);
}

// recover results create a folder taskrerunner
// comparer nombres de results
// comparer name si name ou id identique comparer binaire de maniere intelligente genre diff
// comparer nombre de task creer
// comparer payload des task si meme id
// mettre dans bon dossier
// question est ce que le result Id et task iD peut etre controler via option ?
// creer deuxieme json with all get result , get task de la tache rejouer ?
// package nuget don't support caps ?
var jsonString = File.ReadAllText(copyPath + Path.DirectorySeparatorChar + "CreatedResults.json");
var resultOutputs = JsonSerializer.Deserialize<ConcurrentDictionary<string, ResultRaw>>(jsonString);

var jsonStringTasks = File.ReadAllText(copyPath + Path.DirectorySeparatorChar + "Subtasks.json");
var tasksOutputs = JsonSerializer.Deserialize<ConcurrentDictionary<string, TaskSummary>>(jsonStringTasks);

var test = File.ReadAllBytes(input.DataFolder + Path.DirectorySeparatorChar + resultOutputs!.Single()
.Key);

if (resultOutputs!.Count == storage.Results.Count)
{
logger_.LogInformation("ArmoniK Output and TaskReRunner output have Equal Created Results");
}
else
{
logger_.LogInformation("ArmoniK and TaskReRunner have Different Created Results : ArmoniK = {Count} TaskReRunner = {storage}",
resultOutputs!.Count.ToString(),
storage.Results.Count.ToString());
}

if (tasksOutputs!.Count == storage.Tasks.Count)
{
logger_.LogInformation("ArmoniK Output and TaskReRunner output have Equal Created Tasks");
}
else
{
logger_.LogInformation("ArmoniK and TaskReRunner have Different Created Tasks : ArmoniK = {tasksOutputs} TaskReRunner = {storage}",
tasksOutputs!.Count.ToString(),
storage.Tasks.Count.ToString());
}

var table = new Table();
table.AddColumn("Category");
table.AddColumn("Downloaded");
table.AddColumn("Replayed");

if (input.ExpectedOutputKeys.Count == 1)
{
var downloadedOutput = File.ReadAllBytes(oldDataFolder + Path.DirectorySeparatorChar + input.ExpectedOutputKeys.Single());
var replayedOutput = File.ReadAllBytes(input.DataFolder + Path.DirectorySeparatorChar + input.ExpectedOutputKeys.Single());
table.AddRow("Result Id",
input.ExpectedOutputKeys.Single(),
input.ExpectedOutputKeys.Single());
table.AddRow("└─ size",
downloadedOutput.LongLength.ToString(),
replayedOutput.LongLength.ToString());


if (!downloadedOutput.SequenceEqual(replayedOutput))
{
AnsiConsole.Write(table);
table = new Table();
table.AddColumn("Category");
table.AddColumn("Downloaded");
table.AddColumn("Replayed");
logger_.LogInformation("ArmoniK Expected output were {test}",
downloadedOutput);
logger_.LogInformation("TaskReRunner Expected output were {storage}",
replayedOutput);
}
}

if (tasksOutputs!.Count == 1 && storage.Tasks.Count == 1)
{
table.AddRow("Task Id",
tasksOutputs.Single()
.Key,
storage.Tasks.Single()
.Key);
table.AddRow("├─ nbr subtask",
tasksOutputs.Single()
.Value.CountDataDependencies.ToString(),
storage.Tasks.Single()
.Value.DataDependencies.Count.ToString());
table.AddRow("├─ nbr result",
tasksOutputs.Single()
.Value.CountExpectedOutputIds.ToString(),
storage.Tasks.Single()
.Value.ExpectedOutputKeys.Count.ToString());
table.AddRow("└─ TaskOption",
"-",
"-");
table.AddRow(" ├─ Options",
JsonSerializer.Serialize(tasksOutputs.Single()
.Value.Options.Options),
JsonSerializer.Serialize(storage.Tasks.Single()
.Value.TaskOptions.Options));
table.AddRow(" ├─ MaxDuration",
"-",
"-");
table.AddRow(" │ ├─ Second",
tasksOutputs.Single()
.Value.Options.MaxDuration.Seconds.ToString(),
storage.Tasks.Single()
.Value.TaskOptions.MaxDuration.Seconds.ToString());
table.AddRow(" │ └─ Nanos",
tasksOutputs.Single()
.Value.Options.MaxDuration.Nanos.ToString(),
storage.Tasks.Single()
.Value.TaskOptions.MaxDuration.Nanos.ToString());
table.AddRow(" ├─ MaxRetries",
tasksOutputs.Single()
.Value.Options.MaxRetries.ToString(),
storage.Tasks.Single()
.Value.TaskOptions.MaxRetries.ToString());
table.AddRow(" ├─ Priority",
tasksOutputs.Single()
.Value.Options.Priority.ToString(),
storage.Tasks.Single()
.Value.TaskOptions.Priority.ToString());
table.AddRow(" ├─ PartitionId",
tasksOutputs.Single()
.Value.Options.PartitionId,
storage.Tasks.Single()
.Value.TaskOptions.PartitionId);
table.AddRow(" ├─ ApplicationName",
tasksOutputs.Single()
.Value.Options.ApplicationName,
storage.Tasks.Single()
.Value.TaskOptions.ApplicationName);
table.AddRow(" ├─ ApplicationNamespace",
tasksOutputs.Single()
.Value.Options.ApplicationVersion,
storage.Tasks.Single()
.Value.TaskOptions.ApplicationVersion);
table.AddRow(" ├─ ApplicationNamespace",
tasksOutputs.Single()
.Value.Options.ApplicationNamespace,
storage.Tasks.Single()
.Value.TaskOptions.ApplicationNamespace);
table.AddRow(" ├─ ApplicationService",
tasksOutputs.Single()
.Value.Options.ApplicationService,
storage.Tasks.Single()
.Value.TaskOptions.ApplicationService);
table.AddRow(" └─ EngineType",
tasksOutputs.Single()
.Value.Options.EngineType,
storage.Tasks.Single()
.Value.TaskOptions.EngineType);
if (storage.Results.Count == 1 && resultOutputs.Count == 1)
{
table.AddRow("Result Id",
resultOutputs.Single()
.Key,
storage.Results.Single()
.Value.ResultId);
table.AddRow("├─ name",
resultOutputs.Single()
.Value.Name,
storage.Results.Single()
.Value.Name);
table.AddRow("└─ size",
resultOutputs.Single()
.Value.Size.ToString(),
storage.Results.Single()
.Value.Data!.LongLength.ToString());

AnsiConsole.Write(table);

if (!test.SequenceEqual(storage.Results.Single()
.Value.Data!))
{
logger_.LogInformation("ArmoniK SubTask Payload were {test}",
test);
logger_.LogInformation("TaskReRunner SubTask Payload were {storage}",
storage.Results.Single()
.Value.Data);
}
}
else
{
if (table.Rows.Count > 4)
{
AnsiConsole.Write(table);
}
}
}

using (var tw = new StreamWriter(copyPath + Path.DirectorySeparatorChar + "Subtasks.json",
false))
{
await tw.WriteLineAsync(JsonSerializer.Serialize(storage.Tasks));
}

using (var tw = new StreamWriter(copyPath + Path.DirectorySeparatorChar + "CreatedResults.json",
false))
{
await tw.WriteLineAsync(JsonSerializer.Serialize(storage.Results));
}

using (var tw = new StreamWriter(copyPath + Path.DirectorySeparatorChar + "Task.json",
false))
{
await tw.WriteLineAsync(JsonSerializer.Serialize(input));
}

foreach (var result in storage.Results)
{
using (var tw = new StreamWriter(copyPath + Path.DirectorySeparatorChar + result.Key,
false))
{
await tw.WriteLineAsync(JsonSerializer.Serialize(result.Value.Data));
}
}
}

public static async Task<int> Main(string[] args)
Expand Down
5 changes: 4 additions & 1 deletion src/TaskReRunner/ReRunnerAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

namespace ArmoniK.TaskReRunner;

/// <summary>
/// An heritor of agent class that storethings
/// </summary>
public class ReRunnerAgent : Agent.AgentBase
{
private readonly ILogger<ReRunnerAgent> logger_;
Expand Down Expand Up @@ -185,7 +188,7 @@ public override Task<SubmitTasksResponse> SubmitTasks(SubmitTasksRequest request
ExpectedOutputKeys = rc.ExpectedOutputKeys,
PayloadId = rc.PayloadId,
TaskId = taskId,
TaskOptions = rc.TaskOptions,
TaskOptions = request.TaskOptions,
};
storage_.Tasks[taskId] = current;
return current;
Expand Down
Loading

0 comments on commit 68d885d

Please sign in to comment.