Skip to content

Commit

Permalink
Merge branch 'main' into add-helm-chart
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Apr 9, 2024
2 parents b7c6519 + ad1fbff commit 5868850
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 26 deletions.
41 changes: 41 additions & 0 deletions src/Configurations/StreamClassOperatorServiceConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Diagnostics.CodeAnalysis;
using Arcane.Operator.Services.Operator;

namespace Arcane.Operator.Configurations;

/// <summary>
/// Configuration for the <see cref="StreamClassOperatorService"/>
/// </summary>
[ExcludeFromCodeCoverage(Justification = "Model")]
public class StreamClassOperatorServiceConfiguration
{
/// <summary>
/// Max buffer capacity for StreamClasses events stream
/// </summary>
public int MaxBufferCapacity { get; init; }

/// <summary>
/// Parallelism for StreamClasses events stream
/// </summary>
public int Parallelism { get; init; }

/// <summary>
/// Api group of the StreamClass CRD
/// </summary>
public string ApiGroup { get; init; }

/// <summary>
/// Version of the StreamClass CRD
/// </summary>
public string Version { get; init; }

/// <summary>
/// Plural name of the StreamClass CRD
/// </summary>
public string Plural { get; init; }

/// <summary>
/// The namespace where the StreamClass CRDs located
/// </summary>
public string NameSpace { get; set; }
}
3 changes: 2 additions & 1 deletion src/Models/StreamClass/V1Beta1StreamClass.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public StreamOperatorServiceConfiguration ToStreamOperatorServiceConfiguration()
{
return new StreamOperatorServiceConfiguration
{
MaxBufferCapacity = this.Spec.MaxBufferCapacity
MaxBufferCapacity = this.Spec.MaxBufferCapacity,
Parallelism = 1

Check failure on line 48 in src/Models/StreamClass/V1Beta1StreamClass.cs

View workflow job for this annotation

GitHub Actions / Validate commit

'StreamOperatorServiceConfiguration' does not contain a definition for 'Parallelism'

Check failure on line 48 in src/Models/StreamClass/V1Beta1StreamClass.cs

View workflow job for this annotation

GitHub Actions / Validate commit

'StreamOperatorServiceConfiguration' does not contain a definition for 'Parallelism'
};
}
}
13 changes: 13 additions & 0 deletions src/Services/Base/IStreamClassOperatorService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;

namespace Arcane.Operator.Services.Base;

public interface IStreamClassOperatorService
{
/// <summary>
/// Return graph that watches StreamClass events
/// </summary>
public IRunnableGraph<Task> GetStreamClassEventsGraph(CancellationToken cancellationToken);
}
9 changes: 9 additions & 0 deletions src/Services/Base/IStreamClassStateRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading.Tasks;
using Arcane.Operator.Models;

namespace Arcane.Operator.Services.Base;

public interface IStreamClassStateRepository
{
Task SetStreamClassState(StreamClassOperatorResponse response);
}
14 changes: 0 additions & 14 deletions src/Services/Base/IStreamOperatorServiceFactory.cs

This file was deleted.

14 changes: 14 additions & 0 deletions src/Services/Base/IStreamOperatorServiceWorkerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Services.Operator;

namespace Arcane.Operator.Services.Base;

/// <summary>
/// Abstract factory for creating <see cref="StreamOperatorServiceWorker"/> instances from configuration settings,
/// extracted from the given <see cref="IStreamClass"/> data Model.
/// </summary>
public interface IStreamOperatorServiceWorkerFactory
{
/// Creates a new instance of <see cref="StreamOperatorServiceWorker"/> for the given <see cref="IStreamClass"/>.
StreamOperatorServiceWorker Create(IStreamClass streamClass);
}
144 changes: 144 additions & 0 deletions src/Services/Operator/StreamClassOperatorService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Util;
using Arcane.Operator.Configurations;
using Arcane.Operator.Models;
using Arcane.Operator.Models.StreamClass;
using Arcane.Operator.Models.StreamClass.Base;
using Arcane.Operator.Services.Base;
using k8s;
using k8s.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Kubernetes.Base;

namespace Arcane.Operator.Services.Operator;

/// <inheritdoc cref="IStreamClassOperatorService"/>
public class StreamClassOperatorService : IStreamClassOperatorService
{
private readonly StreamClassOperatorServiceConfiguration configuration;
private readonly IKubeCluster kubeCluster;

private readonly Dictionary<string, StreamOperatorServiceWorker> streams = new();
private readonly ILogger<StreamClassOperatorService> logger;
private readonly IStreamClassStateRepository streamClassStateRepository;
private readonly IStreamOperatorServiceWorkerFactory streamOperatorServiceWorkerFactory;

public StreamClassOperatorService(IKubeCluster kubeCluster,
IOptions<StreamClassOperatorServiceConfiguration> streamOperatorServiceOptions,
IStreamClassStateRepository streamClassStateRepository,
IStreamOperatorServiceWorkerFactory streamOperatorServiceWorkerFactory,
ILogger<StreamClassOperatorService> logger)
{
this.kubeCluster = kubeCluster;
this.configuration = streamOperatorServiceOptions.Value;
this.logger = logger;
this.streamClassStateRepository = streamClassStateRepository;
this.streamOperatorServiceWorkerFactory = streamOperatorServiceWorkerFactory;
}

/// <inheritdoc cref="IStreamClassOperatorService.GetStreamClassEventsGraph"/>
public IRunnableGraph<Task> GetStreamClassEventsGraph(CancellationToken cancellationToken)
{
var synchronizationSource = this.GetStreamingJobSynchronizationGraph();
var actualStateEventSource = this.kubeCluster.StreamCustomResourceEvents<V1Beta1StreamClass>(
this.configuration.NameSpace,
this.configuration.ApiGroup,
this.configuration.Version,
this.configuration.Plural,
this.configuration.MaxBufferCapacity,
OverflowStrategy.Fail);

var sink = Sink.ForEachAsync<StreamClassOperatorResponse>(this.configuration.Parallelism, this.streamClassStateRepository.SetStreamClassState);

return synchronizationSource
.Concat(actualStateEventSource)
.Via(cancellationToken.AsFlow<(WatchEventType, V1Beta1StreamClass)>(true))
.Select(this.OnEvent)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError))
.CollectOption()
.ToMaterialized(sink, Keep.Right);
}

private Option<StreamClassOperatorResponse> OnEvent((WatchEventType, V1Beta1StreamClass) arg)
{
return arg switch
{
(WatchEventType.Added, var streamClass) => this.OnAdded(streamClass),
(WatchEventType.Modified, var streamClass) => this.OnModified(streamClass),
(WatchEventType.Deleted, var streamClass) => this.OnDeleted(streamClass),
_ => Option<StreamClassOperatorResponse>.None
};
}

private Directive HandleError(Exception exception)
{
this.logger.LogError(exception, "Failed to handle stream definition event");
return exception switch
{
BufferOverflowException => Directive.Stop,
_ => Directive.Resume
};
}

private Option<StreamClassOperatorResponse> OnAdded(IStreamClass streamClass)
{
try
{
if (this.streams.ContainsKey(streamClass.ToStreamClassId()))
{
return StreamClassOperatorResponse.Ready(streamClass.Namespace(), streamClass.Kind, streamClass.Name());
}

var listener = this.streamOperatorServiceWorkerFactory.Create(streamClass);
this.streams[streamClass.ToStreamClassId()] = listener;
this.streams[streamClass.ToStreamClassId()].Start(streamClass.ToStreamClassId());
return StreamClassOperatorResponse.Ready(streamClass.Namespace(), streamClass.Kind, streamClass.Name());
}
catch (Exception ex)
{
this.logger.LogError(ex, "Failed to initialize stream listener");
return StreamClassOperatorResponse.Failed(streamClass.Namespace(), streamClass.Kind, streamClass.Name());
}
}

private Option<StreamClassOperatorResponse> OnDeleted(IStreamClass streamClass)
{
if (!this.streams.ContainsKey(streamClass.ToStreamClassId()))
{
return Option<StreamClassOperatorResponse>.None;
}

this.streams[streamClass.ToStreamClassId()].Stop();
return StreamClassOperatorResponse.Stopped(streamClass.Namespace(), streamClass.Kind, streamClass.Name());

}

private Option<StreamClassOperatorResponse> OnModified(IStreamClass streamClass)
{
this.OnDeleted(streamClass);
return this.OnAdded(streamClass);
}

private Source<(WatchEventType, V1Beta1StreamClass), NotUsed> GetStreamingJobSynchronizationGraph()
{
var listTask = this.kubeCluster.ListCustomResources<V1Beta1StreamClass>(
this.configuration.ApiGroup,
this.configuration.Version,
this.configuration.Plural,
this.configuration.NameSpace);

return Source
.FromTask(listTask)
.SelectMany(l => l.Select(d => (WatchEventType.Modified, d)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ namespace Arcane.Operator.Services.Operator;
/// <summary>
/// Background service that listens for stream definition events and executes the stream operator service.
/// </summary>
public class BackgroundStreamOperatorService
public class StreamOperatorServiceWorker
{
private readonly ILogger<BackgroundStreamOperatorService> logger;
private readonly ILogger<StreamOperatorServiceWorker> logger;
private readonly IMaterializer materializer;
private readonly IStreamOperatorService<StreamDefinition> streamOperatorService;
private readonly CancellationTokenSource cts;
private Task streamExecutionTask;
private string streamClassId;

public BackgroundStreamOperatorService(
ILogger<BackgroundStreamOperatorService> logger,
public StreamOperatorServiceWorker(
ILogger<StreamOperatorServiceWorker> logger,
IStreamOperatorService<StreamDefinition> streamOperatorService,
IMaterializer materializer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace Arcane.Operator.Services.Operator;

/// <inheritdoc cref="IStreamOperatorServiceFactory"/>
public class StreamOperatorServiceFactory: IStreamOperatorServiceFactory
/// <inheritdoc cref="IStreamOperatorServiceWorkerFactory"/>
public class StreamOperatorServiceWorkerFactory: IStreamOperatorServiceWorkerFactory
{
private readonly ILoggerFactory loggerFactory;
private readonly IMaterializer materializer;
Expand All @@ -19,7 +19,7 @@ public class StreamOperatorServiceFactory: IStreamOperatorServiceFactory
private readonly IStreamDefinitionRepository streamDefinitionRepository;
private readonly IOptionsSnapshot<CustomResourceConfiguration> customResourceConfigurationsOptionsSnapshot;

public StreamOperatorServiceFactory(ILoggerFactory loggerFactory,IMaterializer materializer,
public StreamOperatorServiceWorkerFactory(ILoggerFactory loggerFactory,IMaterializer materializer,
IKubeCluster kubeCluster, IStreamingJobOperatorService jobOperatorService, IStreamDefinitionRepository streamDefinitionRepository,
IOptionsSnapshot<CustomResourceConfiguration> customResourceConfigurationsOptionsSnapshot)
{
Expand All @@ -31,8 +31,8 @@ public StreamOperatorServiceFactory(ILoggerFactory loggerFactory,IMaterializer m
this.customResourceConfigurationsOptionsSnapshot = customResourceConfigurationsOptionsSnapshot;
}

/// <inheritdoc cref="IStreamOperatorServiceFactory.Create"/>
public BackgroundStreamOperatorService Create(IStreamClass streamClass)
/// <inheritdoc cref="IStreamOperatorServiceWorkerFactory.Create"/>
public StreamOperatorServiceWorker Create(IStreamClass streamClass)
{
var streamOperatorService = new StreamOperatorService<StreamDefinition>(
this.kubeCluster,
Expand All @@ -42,8 +42,8 @@ public BackgroundStreamOperatorService Create(IStreamClass streamClass)
this.streamDefinitionRepository,
this.loggerFactory.CreateLogger<StreamOperatorService<StreamDefinition>>()
);
return new BackgroundStreamOperatorService(
this.loggerFactory.CreateLogger<BackgroundStreamOperatorService>(),
return new StreamOperatorServiceWorker(
this.loggerFactory.CreateLogger<StreamOperatorServiceWorker>(),
streamOperatorService,
this.materializer
);
Expand Down
Loading

0 comments on commit 5868850

Please sign in to comment.