Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Consolidated SessionServices into common #194

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using ArmoniK.Api.Common.Utils;
using ArmoniK.Api.gRPC.V1;
using ArmoniK.Api.gRPC.V1.Results;
using ArmoniK.Api.gRPC.V1.Sessions;
using ArmoniK.Api.gRPC.V1.Submitter;
using ArmoniK.Api.gRPC.V1.Tasks;
using ArmoniK.DevelopmentKit.Client.Common.Status;
Expand All @@ -34,6 +35,7 @@
using ArmoniK.Utils;

using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

using Grpc.Core;

Expand All @@ -46,68 +48,156 @@
namespace ArmoniK.DevelopmentKit.Client.Common.Submitter;

/// <summary>
/// Base Object for all Client submitter
/// Need to pass the child object Class Type
/// Session service to interact with ArmoniK in a session conscious way
/// </summary>
[PublicAPI]
public class BaseClientSubmitter<T>
public class SessionService
{
/// <summary>
/// Base Object for all Client submitter
/// Pool of Grpc channels
/// </summary>
/// <param name="channelPool">Channel used to create grpc clients</param>
/// <param name="loggerFactory">the logger factory to pass for root object</param>
/// <param name="chunkSubmitSize">The size of chunk to split the list of tasks</param>
public BaseClientSubmitter(ChannelPool channelPool,
[CanBeNull] ILoggerFactory loggerFactory = null,
int chunkSubmitSize = 500)
{
channelPool_ = channelPool;
Logger = loggerFactory?.CreateLogger<T>();
chunkSubmitSize_ = chunkSubmitSize;
}
private readonly ChannelPool channelPool_;

private readonly int chunkSubmitSize_;

/// <summary>
/// Set or Get TaskOptions with inside MaxDuration, Priority, AppName, VersionName and AppNamespace
/// Logger
/// </summary>
public TaskOptions TaskOptions { get; set; }
protected readonly ILogger Logger;

/// <summary>
/// Get SessionId object stored during the call of SubmitTask, SubmitSubTask,
/// SubmitSubTaskWithDependencies or WaitForCompletion, WaitForSubTaskCompletion or GetResults
/// Creates a session service
/// </summary>
public Session SessionId { get; protected set; }
/// <param name="channelPool">Grpc channel pool</param>
/// <param name="sessionTaskOptions">Default task options for the session</param>
/// <param name="loggerFactory"></param>
/// <param name="session"></param>
/// <param name="chunkSubmitSize"></param>
public SessionService(ChannelPool channelPool,
TaskOptions sessionTaskOptions,
[CanBeNull] ILoggerFactory loggerFactory = null,
[CanBeNull] Session session = null,
int chunkSubmitSize = 500)
{
Logger = loggerFactory?.CreateLogger<SessionService>();
channelPool_ = channelPool;
chunkSubmitSize_ = chunkSubmitSize;

SessionId = session is null
? CreateSession(sessionTaskOptions)
: OpenSession(session);

#pragma warning restore CS1591
Logger?.LogDebug("Session {Status} {SessionId}",
session is not null
? "Opened"
: "Created",
SessionId);
}

/// <summary>
/// The channel pool to use for creating clients
/// Current session id
/// </summary>
protected ChannelPool channelPool_;

public Session SessionId { get; private set; }

/// <summary>
/// The number of chunk to split the payloadsWithDependencies
/// Get the default task options for a given engine type
/// </summary>
private int chunkSubmitSize_;
/// <param name="engineType">Engine type</param>
/// <returns>Default task options for the given engine type</returns>
[PublicAPI]
public static TaskOptions GetDefaultTaskOptions(EngineType engineType)
=> new()
{
MaxDuration = new Duration
{
Seconds = 300,
},
MaxRetries = 3,
Priority = 2,
EngineType = engineType.ToString(),
ApplicationName = engineType == EngineType.Symphony
? "ArmoniK.Samples.SymphonyPackage"
: "ArmoniK.DevelopmentKit.Worker.Unified",
ApplicationVersion = "1.X.X",
ApplicationNamespace = engineType == EngineType.Symphony
? "ArmoniK.Samples.Symphony.Packages"
: "ArmoniK.DevelopmentKit.Worker.Unified",
ApplicationService = engineType == EngineType.Unified
? "FallBackServerAdder"
: "",
};

/// <inheritdoc />
public override string ToString()
=> SessionId.Id;

/// <summary>
/// The logger to call the generate log in Seq
/// User method to submit task from the client
/// Need a client Service. In case of ServiceContainer
/// submitterService can be null until the OpenSession is called
/// </summary>

[CanBeNull]
protected ILogger<T> Logger { get; set; }
/// <param name="payloads">
/// The user payload list to execute. General used for subTasking.
/// </param>
/// <param name="maxRetries">The number of retry before fail to submit task. Default = 5 retries</param>
/// <param name="taskOptions">
/// TaskOptions argument to override default taskOptions in Session.
/// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker
/// </param>
public IEnumerable<string> SubmitTasks(IEnumerable<byte[]> payloads,
int maxRetries = 5,
TaskOptions taskOptions = null)
=> SubmitTasksWithDependencies(payloads.Select(payload => new Tuple<byte[], IList<string>>(payload,
null)),
maxRetries,
taskOptions);

/// <summary>
/// Service for interacting with results
/// User method to submit task from the client
/// </summary>
protected Results.ResultsClient ResultService { get; set; }
/// <param name="payload">
/// The user payload to execute.
/// </param>
/// <param name="maxRetries">The number of retry before fail to submit task. Default = 5 retries</param>
/// <param name="taskOptions">
/// TaskOptions argument to override default taskOptions in Session.
/// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker
/// </param>
public string SubmitTask(byte[] payload,
int maxRetries = 5,
TaskOptions taskOptions = null)
=> SubmitTasks(new[]
{
payload,
},
maxRetries,
taskOptions)
.Single();


/// <summary>
/// Service for interacting with results
/// The method to submit One task with dependencies tasks. This task will wait for
/// to start until all dependencies are completed successfully
/// </summary>
protected Tasks.TasksClient TaskService { get; set; }
/// <param name="payload">The payload to submit</param>
/// <param name="dependencies">A list of task Id in dependence of this created task</param>
/// <param name="maxRetries">The number of retry before fail to submit task. Default = 5 retries</param>
/// <param name="taskOptions">
/// TaskOptions argument to override default taskOptions in Session.
/// If non null it will override the default taskOptions in SessionService for client or given by taskHandler for worker
/// </param>
/// <returns>return the taskId of the created task </returns>
public string SubmitTaskWithDependencies(byte[] payload,
IList<string> dependencies,
int maxRetries = 5,
TaskOptions taskOptions = null)
=> SubmitTasksWithDependencies(new[]
{
Tuple.Create(payload,
dependencies),
},
maxRetries,
taskOptions)
.Single();

/// <summary>
/// Returns the status of the task
Expand Down Expand Up @@ -233,7 +323,7 @@ private IEnumerable<string> ChunkSubmitTasksWithDependencies(IEnumerable<Tuple<s

//Multiple enumeration occurs on a retry
var response = submitterService.CreateTasksAsync(SessionId.Id,
taskOptions ?? TaskOptions,
taskOptions,
payloadsWithDependencies.Select(pwd =>
{
var taskRequest = new TaskRequest
Expand Down Expand Up @@ -548,8 +638,8 @@ public byte[] GetResult(string taskId,

var res = Retry.WhileException(5,
200,
retry => TryGetResultAsync(resultRequest,
cancellationToken)
_ => TryGetResultAsync(resultRequest,
cancellationToken)
.Result,
true,
typeof(IOException),
Expand Down Expand Up @@ -832,4 +922,75 @@ public Dictionary<string, string> CreateResultsMetadata(IEnumerable<string> resu
}))
.Results.ToDictionary(r => r.Name,
r => r.ResultId);

/// <summary>
/// Creates a session
/// </summary>
/// <param name="defaultTaskOptions">Default task option for the session</param>
/// <param name="partitions">Partitions for the session</param>
/// <returns>Session id</returns>
[PublicAPI]
protected Session CreateSession(TaskOptions defaultTaskOptions,
[CanBeNull] IEnumerable<string> partitions = null)
=> channelPool_.WithChannel(c => new Session
{
Id = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(c).CreateSession(new CreateSessionRequest
{
DefaultTaskOption = defaultTaskOptions,
PartitionIds =
{
partitions ??
(string.IsNullOrEmpty(defaultTaskOptions
.PartitionId)
? Enumerable.Empty<string>()
: new List<string>
{
defaultTaskOptions.PartitionId,
}),
},
})
.SessionId,
});

/// <summary>
/// Opens an existing session
/// </summary>
/// <param name="session">Session to open</param>
/// <returns>Opened session</returns>
/// <exception cref="ClientApiException">Session cannot be opened</exception>
[PublicAPI]
public Session OpenSession(Session session)
{
Logger?.LogDebug("Opening session {Session}",
session);
try
{
var status = channelPool_.WithChannel(c => new Sessions.SessionsClient(c).GetSession(new GetSessionRequest
{
SessionId = session.Id,
}))
.Session.Status;

if (status != SessionStatus.Running)
{
throw new ClientApiException($"Cannot open session {session} because it is with status {status}");
}

SessionId = session;
return session;
}
catch (RpcException e)
{
throw new ClientApiException($"Cannot open session {session}",
e);
}
}

/// <summary>
/// Gets a channel from the session service's channel pool
/// </summary>
/// <returns>gRPC channel</returns>
[PublicAPI]
public ChannelBase GetChannel()
=> channelPool_.GetChannel();
}
21 changes: 7 additions & 14 deletions Client/src/Symphony/ArmonikSymphonyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using ArmoniK.DevelopmentKit.Client.Common.Submitter;
using ArmoniK.DevelopmentKit.Common;

using JetBrains.Annotations;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

Expand All @@ -35,7 +37,6 @@ namespace ArmoniK.DevelopmentKit.Client.Symphony;
[MarkDownDoc]
public class ArmonikSymphonyClient
{
private readonly IConfigurationSection controlPlanSection_;
private readonly ILogger<ArmonikSymphonyClient> Logger;


Expand All @@ -48,38 +49,29 @@ public ArmonikSymphonyClient(IConfiguration configuration,
ILoggerFactory loggerFactory)
{
Configuration = configuration;
controlPlanSection_ = configuration.GetSection(SectionGrpc)
.Exists()
? configuration.GetSection(SectionGrpc)
: null;
LoggerFactory = loggerFactory;
Logger = loggerFactory.CreateLogger<ArmonikSymphonyClient>();
}

private ILoggerFactory LoggerFactory { get; }

/// <summary>
/// Returns the section key Grpc from appSettings.json
/// </summary>
public string SectionGrpc { get; set; } = "Grpc";

private ChannelPool GrpcPool { get; set; }


private IConfiguration Configuration { get; }

/// <summary>
/// Create the session to submit task
/// </summary>
/// <param name="taskOptions">Optional parameter to set TaskOptions during the Session creation</param>
/// <returns>Returns the SessionService to submit, wait or get result</returns>
[PublicAPI]
public SessionService CreateSession(TaskOptions taskOptions = null)
{
ControlPlaneConnection();

return new SessionService(GrpcPool,
LoggerFactory,
taskOptions);
taskOptions ?? SessionService.GetDefaultTaskOptions(EngineType.Symphony),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options are not impacted in other clients ?

LoggerFactory);
}

/// <summary>
Expand All @@ -88,14 +80,15 @@ public SessionService CreateSession(TaskOptions taskOptions = null)
/// <param name="sessionId">The sessionId string which will opened</param>
/// <param name="clientOptions">the customer taskOptions send to the server by the client</param>
/// <returns>Returns the SessionService to submit, wait or get result</returns>
[PublicAPI]
public SessionService OpenSession(Session sessionId,
TaskOptions clientOptions = null)
{
ControlPlaneConnection();

return new SessionService(GrpcPool,
clientOptions ?? SessionService.GetDefaultTaskOptions(EngineType.Symphony),
LoggerFactory,
clientOptions ?? SessionService.InitializeDefaultTaskOptions(),
sessionId);
}

Expand Down
Loading