Skip to content

Commit

Permalink
feat: A minimalist program to extract TaskData
Browse files Browse the repository at this point in the history
  • Loading branch information
ereali-aneo committed Jul 17, 2024
1 parent 78b74a1 commit d33c0e2
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 44 deletions.
16 changes: 16 additions & 0 deletions src/TaskDump/ArmoniK.TaskDump.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="ArmoniK.Api.Client" Version="3.17.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.2" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<ProjectReference Include="..\Common\ArmoniK.TaskReRunner.Common.csproj" />

</ItemGroup>

</Project>
147 changes: 147 additions & 0 deletions src/TaskDump/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.CommandLine;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using ArmoniK.Api.Client;
using ArmoniK.Api.Client.Options;
using ArmoniK.Api.Client.Submitter;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Results;
using ArmoniK.Api.gRPC.V1.Tasks;

using Newtonsoft.Json;

using System.Collections.Concurrent;

using Empty = ArmoniK.Api.gRPC.V1.Empty;

using ArmoniK.TaskReRunner.Common;

internal static class Program
{
/// <summary>
/// Method for sending task and retrieving their results from ArmoniK
/// </summary>
/// <param name="endpoint">The endpoint url of ArmoniK's control plane</param>
/// <returns>
/// Task representing the asynchronous execution of the method
/// </returns>
/// <exception cref="Exception">Issues with results from tasks</exception>
/// <exception cref="ArgumentOutOfRangeException">Unknown response type from control plane</exception>
internal static async Task Run(string endpoint,
string task)
{
var channel = GrpcChannelFactory.CreateChannel(new GrpcClient
{
Endpoint = endpoint,
});

// Create client for events listening
var taskClient = new Tasks.TasksClient(channel);
var resultClient = new Results.ResultsClient(channel);


var taskResponse = taskClient.GetTask(new GetTaskRequest
{
TaskId = task,
});

var rawData = new ConcurrentDictionary<string, byte[]?>();

//rawData[plop.Task.PayloadId] = resultClient.DownloadResultData(plop.Task.SessionId, plop.Task.PayloadId, CancellationToken.None);

foreach (var data in taskResponse.Task.DataDependencies)
{
if (data != null)
{
rawData[data] = await resultClient.DownloadResultData(taskResponse.Task.SessionId,
data,
CancellationToken.None);
}
}

foreach (var data in taskResponse.Task.ExpectedOutputIds)
{
if (data != null)
{
rawData[data] = await resultClient.DownloadResultData(taskResponse.Task.SessionId,
data,
CancellationToken.None);
}
}

var DumpData = new TaskDump
{
SessionId = taskResponse.Task.SessionId,
TaskId = task,
TaskOptions = taskResponse.Task.Options,
DataDependencies = taskResponse.Task.DataDependencies,
ExpectedOutputKeys = taskResponse.Task.ExpectedOutputIds,
Configuration = new Configuration
{
DataChunkMaxSize = resultClient.GetServiceConfiguration(new Empty())
.DataChunkMaxSize,
},
RawData = rawData,
PayloadId = Guid.NewGuid()
.ToString(), // change with plop.Task.PayloadId when in core
};
var taskdata = taskResponse.Task;

var JSONresult = JsonConvert.SerializeObject(DumpData);

using (var tw = new StreamWriter("toProcess.json",
false))
{
tw.WriteLine(JSONresult);
}
}

public static async Task<int> Main(string[] args)
{
// Define the options for the application with their description and default value
var endpoint = new Option<string>("--endpoint",
description: "Endpoint for the connection to ArmoniK control plane.",
getDefaultValue: () => "http://localhost:5001");

var task = new Option<string>("--task",
description: "TaskId of the task to dump.",
getDefaultValue: () => "default");

// Describe the application and its purpose
var rootCommand = new RootCommand($"A program to extract data for a specific task. Connect to ArmoniK through <{endpoint.Name}>");

// Add the options to the parser
rootCommand.AddOption(endpoint);
//rootCommand.AddOption(partition);
rootCommand.AddOption(task);

// Configure the handler to call the function that will do the work
rootCommand.SetHandler(Run,
endpoint,
task);

// Parse the command line parameters and call the function that represents the application
return await rootCommand.InvokeAsync(args);
}
}
2 changes: 2 additions & 0 deletions src/TaskReRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

using Serilog;

using ArmoniK.TaskReRunner.Common;

namespace ArmoniK.TaskReRunner;

internal static class Program
Expand Down
90 changes: 46 additions & 44 deletions src/TaskReRunner/Storage/TaskDump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,59 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Concurrent;
using System.Collections.Generic;
//using System.Collections.Concurrent;
//using System.Collections.Generic;

using ArmoniK.Api.gRPC.V1;
//using ArmoniK.Api.gRPC.V1;

namespace ArmoniK.TaskReRunner.Storage;
//namespace ArmoniK.TaskReRunner.Storage;

/// <summary>
/// Represents all the parameters extracted from ArmoniK required to rerun a task.
/// Properties: PayloadId, SessionId, Configuration, TaskId, TaskOptions,
/// DataDependencies, ExpectedOutputKeys, RawData.
/// </summary>
public record TaskDump
{
/// <summary>
/// Gets or sets the session identifier.
/// </summary>
public required string SessionId { get; init; }
///// <summary>
///// Represents all the parameters extracted from ArmoniK required to rerun a task.
///// Properties: PayloadId, SessionId, Configuration, TaskId, TaskOptions,
///// DataDependencies, ExpectedOutputKeys, RawData.
///// </summary>
//public record TaskDump
//{
// /// <summary>
// /// Gets or sets the session identifier.
// /// </summary>
// public required string SessionId { get; init; }

/// <summary>
/// Gets or init the payload identifier.
/// </summary>
public required string PayloadId { get; init; }
// /// <summary>
// /// Gets or init the payload identifier.
// /// </summary>
// public required string PayloadId { get; init; }

/// <summary>
/// Gets or sets the task identifier.
/// </summary>
public required string TaskId { get; init; }
// /// <summary>
// /// Gets or sets the task identifier.
// /// </summary>
// public required string TaskId { get; init; }

/// <summary>
/// Gets or sets the task options for the process.
/// </summary>
public required TaskOptions TaskOptions { get; init; }
// /// <summary>
// /// Gets or sets the task options for the process.
// /// </summary>
// public required TaskOptions TaskOptions { get; init; }

/// <summary>
/// Gets the list of data dependencies required for the process.
/// </summary>
public ICollection<string> DataDependencies { get; } = new List<string>();
// /// <summary>
// /// Gets the list of data dependencies required for the process.
// /// </summary>
// public ICollection<string> DataDependencies { get; } = new List<string>();

/// <summary>
/// Gets the list of expected output keys.
/// </summary>
public ICollection<string> ExpectedOutputKeys { get; } = new List<string>();
// /// <summary>
// /// Gets the list of expected output keys.
// /// </summary>
// public ICollection<string> ExpectedOutputKeys { get; } = new List<string>();

// /// <summary>
// /// Gets or sets the configuration settings for the process.
// /// </summary>
// public required Configuration Configuration { get; init; }

// /// <summary>
// /// Get or init a dictionary containing the payload, data dependencies, and expected outputs corresponding byte array.
// /// </summary>
// public ConcurrentDictionary<string, byte[]?> RawData { get; init; } = new();
//}

/// <summary>
/// Gets or sets the configuration settings for the process.
/// </summary>
public required Configuration Configuration { get; init; }

/// <summary>
/// Get or init a dictionary containing the payload, data dependencies, and expected outputs corresponding byte array.
/// </summary>
public ConcurrentDictionary<string, byte[]?> RawData { get; init; } = new();
}

0 comments on commit d33c0e2

Please sign in to comment.