diff --git a/src/Arcane.Operator.csproj b/src/Arcane.Operator.csproj index 3e73bb2..4433bff 100644 --- a/src/Arcane.Operator.csproj +++ b/src/Arcane.Operator.csproj @@ -6,10 +6,12 @@ 10 Arcane.Operator - + - + + + diff --git a/src/ArcaneEnvironment.cs b/src/ArcaneEnvironment.cs new file mode 100644 index 0000000..8659536 --- /dev/null +++ b/src/ArcaneEnvironment.cs @@ -0,0 +1,11 @@ +namespace Arcane.Operator; + +public static class ArcaneEnvironment +{ + public static string DefaultVarPrefix => $"{nameof(Arcane).ToUpper()}__"; + + public static string GetEnvironmentVariableName(this string name) + { + return $"{DefaultVarPrefix}{name}".ToUpperInvariant(); + } +} diff --git a/src/Configurations/CustomResourceConfiguration.cs b/src/Configurations/CustomResourceConfiguration.cs new file mode 100644 index 0000000..348e07f --- /dev/null +++ b/src/Configurations/CustomResourceConfiguration.cs @@ -0,0 +1,37 @@ +using System.Diagnostics.CodeAnalysis; +using Snd.Sdk.Kubernetes; + +namespace Arcane.Operator.Configurations; + +[ExcludeFromCodeCoverage(Justification = "Model")] +public class CustomResourceConfiguration +{ + /// + /// Api group of the StreamDefinition CRD + /// + public string ApiGroup { get; init; } + + /// + /// Version of the CRD + /// + public string Version { get; init; } + + /// + /// Plural of the CRD + /// + public string Plural { get; init; } + + /// + /// Convert configuration to NamespacedCrd object for consuming in the Proteus library + /// + /// object + public NamespacedCrd ToNamespacedCrd() + { + return new NamespacedCrd + { + Group = this.ApiGroup, + Plural = this.Plural, + Version = this.Version + }; + } +} diff --git a/src/Configurations/StreamOperatorServiceConfiguration.cs b/src/Configurations/StreamOperatorServiceConfiguration.cs new file mode 100644 index 0000000..b2bf5dd --- /dev/null +++ b/src/Configurations/StreamOperatorServiceConfiguration.cs @@ -0,0 +1,21 @@ +using System.Diagnostics.CodeAnalysis; +using Arcane.Operator.Services.Operator; + +namespace Arcane.Operator.Configurations; + +/// +/// Configuration for the +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamOperatorServiceConfiguration +{ + /// + /// Max buffer capacity for StreamDefinitions events stream + /// + public int MaxBufferCapacity { get; init; } + + /// + /// Parallelism for StreamDefinitions events stream + /// + public int Parallelism { get; init; } +} diff --git a/src/Configurations/StreamingJobMaintenanceServiceConfiguration.cs b/src/Configurations/StreamingJobMaintenanceServiceConfiguration.cs new file mode 100644 index 0000000..70f418a --- /dev/null +++ b/src/Configurations/StreamingJobMaintenanceServiceConfiguration.cs @@ -0,0 +1,22 @@ +using System.Diagnostics.CodeAnalysis; +using Arcane.Operator.Services.Maintenance; + +namespace Arcane.Operator.Configurations; + +/// +/// Configuration for the +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamingJobMaintenanceServiceConfiguration +{ + /// + /// Max buffer capacity for job events stream + /// + public int MaxBufferCapacity { get; init; } + + + /// + /// Parallelism for job events stream + /// + public int Parallelism { get; init; } +} diff --git a/src/Configurations/StreamingJobOperatorServiceConfiguration.cs b/src/Configurations/StreamingJobOperatorServiceConfiguration.cs new file mode 100644 index 0000000..1f47735 --- /dev/null +++ b/src/Configurations/StreamingJobOperatorServiceConfiguration.cs @@ -0,0 +1,22 @@ +using System.Diagnostics.CodeAnalysis; +using Arcane.Operator.Services.Streams; +using k8s.Models; + +namespace Arcane.Operator.Configurations; + +/// +/// Configuration for the +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamingJobOperatorServiceConfiguration +{ + /// + /// Template for the job to be created. + /// + public V1Job JobTemplate { get; set; } + + /// + /// Namespace where the job will be created + /// + public string Namespace { get; set; } +} diff --git a/src/Extensions/V1JobExtensions.cs b/src/Extensions/V1JobExtensions.cs new file mode 100644 index 0000000..664f210 --- /dev/null +++ b/src/Extensions/V1JobExtensions.cs @@ -0,0 +1,101 @@ +using System.Collections.Generic; +using Arcane.Models.StreamingJobLifecycle; +using k8s.Models; +using Snd.Sdk.Kubernetes; + +namespace Arcane.Operator.Extensions; + +public static class V1JobExtensions +{ + public const string STREAM_KIND_LABEL = "arcane/stream-kind"; + public const string STREAM_ID_LABEL = "arcane/stream-id"; + public const string FULL_LOAD_LABEL = "arcane/full-load"; + + public static V1Job WithStreamingJobLabels(this V1Job job, string streamId, + bool fullLoadOnStart, string streamKind) + { + return job.WithLabels(new Dictionary + { + { STREAM_ID_LABEL, streamId }, + { STREAM_KIND_LABEL, streamKind }, + { FULL_LOAD_LABEL, fullLoadOnStart.ToString().ToLowerInvariant() } + }); + } + + public static V1Job WithStreamingJobAnnotations(this V1Job job, string configurationChecksum) + { + return job.WithAnnotations(new Dictionary + { + { Annotations.CONFIGURATION_CHECKSUM_ANNOTATION_KEY, configurationChecksum } + }); + } + + public static string GetStreamId(this V1Job job) + { + return job.Name(); + } + + public static string GetStreamKind(this V1Job job) + { + if (job.Labels() != null && job.Labels().TryGetValue(STREAM_KIND_LABEL, out var value)) + { + return value; + } + + return string.Empty; + } + + public static string GetConfigurationChecksum(this V1Job job) + { + if (job.Annotations() != null && job.Annotations().TryGetValue( + Annotations.CONFIGURATION_CHECKSUM_ANNOTATION_KEY, + out var value)) + { + return value; + } + + return string.Empty; + } + + public static bool IsStopRequested(this V1Job job) + { + return job.Annotations() != null + && job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.TERMINATE_REQUESTED_STATE_ANNOTATION_VALUE; + } + + public static bool IsRestartRequested(this V1Job job) + { + return job.Annotations() != null + && job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.RESTARTING_STATE_ANNOTATION_VALUE; + } + + public static bool IsReloadRequested(this V1Job job) + { + return job.Annotations() != null + && job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.RELOADING_STATE_ANNOTATION_VALUE; + } + + public static bool IsReloading(this V1Job job) + { + return job.Labels() != null + && job.Labels().TryGetValue(FULL_LOAD_LABEL, out var value) + && value == "true"; + } + + public static bool IsSchemaMismatch(this V1Job job) + { + return job.Annotations() != null + && job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE; + } + + public static bool IsStopping(this V1Job job) + { + return job.Annotations() != null + && job.Annotations().TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.TERMINATING_STATE_ANNOTATION_VALUE; + } +} diff --git a/src/JobTemplates/V1Beta1/V1Beta1StreamingJobTemplate.cs b/src/JobTemplates/V1Beta1/V1Beta1StreamingJobTemplate.cs new file mode 100644 index 0000000..b79d9af --- /dev/null +++ b/src/JobTemplates/V1Beta1/V1Beta1StreamingJobTemplate.cs @@ -0,0 +1,45 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; +using k8s; +using k8s.Models; + +namespace Arcane.Operator.JobTemplates.V1Beta1; + +[ExcludeFromCodeCoverage(Justification = "Model")] +public class V1Beta1StreamingJobTemplate : IKubernetesObject +{ + /// + /// Streaming job configuration + /// + [JsonPropertyName("spec")] + public V1Beta1StreamingJobTemplateSpec Spec { get; set; } + + /// + /// Api version + /// + [JsonPropertyName("apiVersion")] + public string ApiVersion { get; set; } + + /// + /// Object kind (should always be "StreamingJobTemplate") + /// + [JsonPropertyName("kind")] + public string Kind { get; set; } + + /// + /// Object metadata see + /// + [JsonPropertyName("metadata")] + public V1ObjectMeta Metadata { get; set; } + + public V1Job GetJob() + { + return new V1Job + { + ApiVersion = "batch/v1", + Kind = "Job", + Metadata = this.Spec.Metadata ?? new V1ObjectMeta(), + Spec = this.Spec.Template.Spec + }; + } +} diff --git a/src/JobTemplates/V1Beta1/V1StreamingJobTemplateSpec.cs b/src/JobTemplates/V1Beta1/V1StreamingJobTemplateSpec.cs new file mode 100644 index 0000000..8fcf5ec --- /dev/null +++ b/src/JobTemplates/V1Beta1/V1StreamingJobTemplateSpec.cs @@ -0,0 +1,24 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; +using k8s.Models; + +namespace Arcane.Operator.JobTemplates.V1Beta1; + +/// +/// Configuration for streaming job template. +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class V1Beta1StreamingJobTemplateSpec +{ + /// + /// Job template reference + /// + [JsonPropertyName("template")] + public V1Job Template { get; init; } + + /// + /// Job template reference + /// + [JsonPropertyName("metadata")] + public V1ObjectMeta Metadata { get; init; } +} diff --git a/src/Models/StreamOperatorResponse.cs b/src/Models/StreamOperatorResponse.cs new file mode 100644 index 0000000..63b5d25 --- /dev/null +++ b/src/Models/StreamOperatorResponse.cs @@ -0,0 +1,283 @@ +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; + +namespace Arcane.Operator.Models; + +/// +/// Represents stream status badge for Lens app. +/// +public enum StreamStatusType +{ + /// + /// The stream is in a ready state. + /// + READY, + + /// + /// The stream is in an error state. + /// + ERROR, + + /// + /// The stream is in a warning state. + /// + WARNING +} + +/// +/// Possible stream states. +/// +public enum StreamPhase +{ + /// + /// A running stream. + /// + RUNNING, + + /// + /// A stopped stream. + /// + STOPPED, + + /// + /// A stream that is shutting down. + /// + TERMINATING, + + /// + /// A restarting stream. + /// + RESTARTING, + + /// + /// A stream that is in a data backfill process. + /// + RELOADING, + + /// + /// A stream that had been suspended. + /// + SUSPENDED, + + /// + /// A stream that has failed and cannot be automatically recovered. + /// + FAILED +} + +/// +/// Contains response from stream operator that can be used by other services inside the application +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamOperatorResponse +{ + /// + /// Affected stream identifier + /// + public string Id { get; private init; } + + /// + /// Affected stream kind + /// + public string Kind { get; set; } + + /// + /// Affected stream namespace + /// + public string Namespace { get; set; } + + /// + /// Latest observed state of the stream + /// + public IEnumerable Conditions { get; private init; } + + /// + /// Stream livecycle phase + /// + public StreamPhase Phase { get; private set; } + + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to RESTARTING + /// + /// Affected stream identifier + /// Affected stream namespace + /// Affected stream kind + public static StreamOperatorResponse Restarting(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Namespace = nameSpace, + Kind = kind, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.WARNING.ToString(), Status = "True" } + }, + Phase = StreamPhase.RESTARTING + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to RUNNING + /// + /// Affected stream identifier + /// Affected stream namespace + /// Affected stream kind + public static StreamOperatorResponse Running(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.READY.ToString(), Status = "True" } + }, + Phase = StreamPhase.RUNNING + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to RELOADING + /// + /// Affected stream identifier + /// Affected stream namespace + /// Affected stream kind + public static StreamOperatorResponse Reloading(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.READY.ToString(), Status = "True" } + }, + Phase = StreamPhase.RELOADING + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to TERMINATING + /// + /// Affected stream identifier + /// Affected stream namespace + /// Affected stream kind + public static StreamOperatorResponse Terminating(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Namespace = nameSpace, + Kind = kind, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.WARNING.ToString(), Status = "True" } + }, + Phase = StreamPhase.TERMINATING + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to TERMINATING + /// + /// Affected stream identifier + /// Affected stream namespace + /// Affected stream kind + public static StreamOperatorResponse Stopped(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.WARNING.ToString(), Status = "True" } + }, + Phase = StreamPhase.STOPPED + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to FAILED + /// with specified message + /// + /// Affected stream namespace + /// Affected stream kind + /// Affected stream identifier + /// Error message + public static StreamOperatorResponse OperationFailed(string nameSpace, string kind, string streamId, string message) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition + { Type = StreamStatusType.ERROR.ToString(), Status = "True", Message = message } + }, + Phase = StreamPhase.FAILED + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to STOPPED + /// with specified message + /// + /// Affected stream namespace + /// Affected stream kind + /// Affected stream identifier + public static StreamOperatorResponse Suspended(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.WARNING.ToString(), Status = "True" } + }, + Phase = StreamPhase.SUSPENDED + }; + } + + /// + /// Creates a StreamOperatorResponse object for stream with specified identifier, setting it state to FAILED + /// with message "Crash loop detected" + /// + /// Affected stream namespace + /// Affected stream kind + /// Affected stream identifier + public static StreamOperatorResponse CrashLoopDetected(string nameSpace, string kind, string streamId) + { + return new StreamOperatorResponse + { + Id = streamId, + Kind = kind, + Namespace = nameSpace, + Conditions = new[] + { + new V1Beta1StreamCondition { Type = StreamStatusType.ERROR.ToString(), Status = "True" } + }, + Phase = StreamPhase.FAILED + }; + } + + public V1Beta1StreamStatus ToStatus() + { + return new V1Beta1StreamStatus + { + Conditions = this.Conditions.ToArray(), + Phase = this.Phase.ToString() + }; + } +} diff --git a/src/Program.cs b/src/Program.cs index 71e7a3a..acd6884 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -1,5 +1,49 @@ -// See https://aka.ms/new-console-template for more information - using System; +using System.Diagnostics.CodeAnalysis; +using Arcane.Operator.Services.Maintenance; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Serilog; +using Snd.Sdk.Logs.Providers; +using Snd.Sdk.Logs.Providers.Configurations; + +namespace Arcane.Operator; + +[ExcludeFromCodeCoverage(Justification = "Service entrypoint")] +public class Program +{ + public static int Main(string[] args) + { + Log.Logger = DefaultLoggingProvider.CreateBootstrappLogger(nameof(Arcane)); + try + { + Log.Information("Starting web host"); + CreateHostBuilder(args).Build().Run(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex, "Host terminated unexpectedly"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + } -Console.WriteLine("Hello, World!"); \ No newline at end of file + public static IHostBuilder CreateHostBuilder(string[] args) + { + return Host.CreateDefaultBuilder(args) + .AddSerilogLogger(nameof(Arcane), loggerConfiguration => loggerConfiguration.Default().AddDatadog()) + .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup(); }) + .ConfigureServices(services => + { + if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("MAINTAINER"))) + { + services.AddHostedService(); + } + }); + } +} diff --git a/src/Services/Base/IStreamClassRepository.cs b/src/Services/Base/IStreamClassRepository.cs new file mode 100644 index 0000000..3249223 --- /dev/null +++ b/src/Services/Base/IStreamClassRepository.cs @@ -0,0 +1,8 @@ +using Arcane.Operator.Configurations; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamClassRepository +{ + CustomResourceConfiguration Get(string nameSpace, string kind); +} diff --git a/src/Services/Base/IStreamDefinitionRepository.cs b/src/Services/Base/IStreamDefinitionRepository.cs new file mode 100644 index 0000000..8fb8c69 --- /dev/null +++ b/src/Services/Base/IStreamDefinitionRepository.cs @@ -0,0 +1,47 @@ +using System.Threading.Tasks; +using Akka.Util; +using Arcane.Operator.StreamDefinitions.Base; +using Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamDefinitionRepository +{ + /// + /// Return the definition object fot the given stream id + /// + /// Stream definition namespace + /// Stream definition kind to update + /// Stream identifier + /// IStreamDefinition or None, it it does not exit + public Task> GetStreamDefinition(string nameSpace, string kind, string streamId); + + /// + /// Update stream condition for the given stream id + /// + /// Stream definition namespace + /// Stream definition namespace + /// Stream identifier + /// Stream condition + /// Updated IStreamDefinition version or None, it it does not exit + public Task> SetStreamStatus(string nameSpace, string kind, string streamId, + V1Beta1StreamStatus streamStatus); + + /// + /// Remove reloading annotation for the given stream id + /// + /// Stream definition namespace + /// Stream definition kind to update + /// Stream identifier + /// IStreamDefinition or None, it it does not exit + public Task> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId); + + /// + /// Set suspended annotation on stream definition for the given stream id + /// + /// Stream definition namespace + /// Stream definition kind to update + /// Stream identifier + /// IStreamDefinition or None, it it does not exit + public Task> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId); +} diff --git a/src/Services/Base/IStreamInteractionService.cs b/src/Services/Base/IStreamInteractionService.cs new file mode 100644 index 0000000..67a3d68 --- /dev/null +++ b/src/Services/Base/IStreamInteractionService.cs @@ -0,0 +1,26 @@ +using System; +using System.Threading.Tasks; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamInteractionService +{ + /// + /// Set stream termination callback + /// + /// The action to be invoked on stream termination + /// Task that completes when termination event occured + Task SetupTermination(Action onStreamTermination); + + /// + /// Send termination request to a stream pod + /// + /// Ip address of a pod that running stream + Task SendStopRequest(string streamerIp); + + /// + /// Report that schema mismatch occured to the maintainer service + /// + /// Id of the current stream + Task ReportSchemaMismatch(string streamId); +} diff --git a/src/Services/Base/IStreamOperatorService.cs b/src/Services/Base/IStreamOperatorService.cs new file mode 100644 index 0000000..1af87be --- /dev/null +++ b/src/Services/Base/IStreamOperatorService.cs @@ -0,0 +1,13 @@ +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Dsl; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamOperatorService +{ + /// + /// Return graph that watches for job events and updates stream state accordingly + /// + public IRunnableGraph GetStreamDefinitionEventsGraph(CancellationToken cancellationToken); +} diff --git a/src/Services/Base/IStreamingJobMaintenanceService.cs b/src/Services/Base/IStreamingJobMaintenanceService.cs new file mode 100644 index 0000000..aea6ce0 --- /dev/null +++ b/src/Services/Base/IStreamingJobMaintenanceService.cs @@ -0,0 +1,13 @@ +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Dsl; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamingJobMaintenanceService +{ + /// + /// Return graph that watches for job events and updates stream state accordingly + /// + public IRunnableGraph GetJobEventsGraph(CancellationToken cancellationToken); +} diff --git a/src/Services/Base/IStreamingJobOperatorService.cs b/src/Services/Base/IStreamingJobOperatorService.cs new file mode 100644 index 0000000..a7ea67d --- /dev/null +++ b/src/Services/Base/IStreamingJobOperatorService.cs @@ -0,0 +1,67 @@ +using System.Threading.Tasks; +using Akka.Util; +using Arcane.Operator.Models; +using Arcane.Operator.StreamDefinitions.Base; +using k8s.Models; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamingJobOperatorService +{ + /// + /// Namespace for kubernetes jobs + /// + public string StreamJobNamespace { get; } + + /// + /// Starts a new stream using an existing stream definition in Kubernetes database. + /// + /// Stream definition + /// Whether to perform a full reload for this stream. + /// StreamInfo if stream was created or None if an error occured + Task> StartRegisteredStream(IStreamDefinition streamDefinition, bool fullLoad); + + /// + /// Retrieves a streaming job with name equal to streamId from the cluster. If not found, returns None. + /// + /// Stream identifier that should be started. + /// + Task> GetStreamingJob(string streamId); + + /// + /// Marks streaming job for restart + /// + /// Stream identifier that should be terminated. + /// + Task> RequestStreamingJobRestart(string streamId); + + /// + /// Marks streaming job for stop + /// + /// Stream identifier that should be terminated. + /// + Task> RequestStreamingJobTermination(string streamId); + + /// + /// Marks streaming job for stop + /// + /// Stream identifier that should be terminated. + /// + Task> RequestStreamingJobReload(string streamId); + + /// + /// Find pod for streaming job and stop it + /// + /// Stream definition kind + /// Stream identifier that should be terminated. + /// + Task> FindAndStopStreamingJob(string kind, string streamId); + + /// + /// Delete the streaming job + /// + /// Stream definition kind + /// Stream identifier that should be terminated. + /// + Task> DeleteJob(string kind, string streamId); +} diff --git a/src/Services/Base/IStreamingJobTemplateRepository.cs b/src/Services/Base/IStreamingJobTemplateRepository.cs new file mode 100644 index 0000000..fc9dc00 --- /dev/null +++ b/src/Services/Base/IStreamingJobTemplateRepository.cs @@ -0,0 +1,18 @@ +using System.Threading.Tasks; +using Akka.Util; +using Arcane.Operator.JobTemplates.V1Beta1; + +namespace Arcane.Operator.Services.Base; + +public interface IStreamingJobTemplateRepository +{ + /// + /// Read the job template for the given kind, namespace and name + /// + /// Kind of the job template + /// Namespace to read from + /// Job template name + /// + Task> GetStreamingJobTemplate(string kind, string jobNamespace, + string templateName); +} diff --git a/src/Services/Maintenance/HostedStreamingJobMaintenanceService.cs b/src/Services/Maintenance/HostedStreamingJobMaintenanceService.cs new file mode 100644 index 0000000..f619c87 --- /dev/null +++ b/src/Services/Maintenance/HostedStreamingJobMaintenanceService.cs @@ -0,0 +1,43 @@ +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Arcane.Operator.Services.Base; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Arcane.Operator.Services.Maintenance; + +public class HostedStreamingJobMaintenanceService : BackgroundService +{ + private readonly ILogger logger; + private readonly IMaterializer materializer; + private readonly IStreamingJobMaintenanceService streamingJobMaintenanceService; + + public HostedStreamingJobMaintenanceService( + ILogger logger, + IStreamingJobMaintenanceService streamingJobMaintenanceService, + IMaterializer materializer) + { + this.logger = logger; + this.streamingJobMaintenanceService = streamingJobMaintenanceService; + this.materializer = materializer; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + this.logger.LogInformation("Activated {service}", nameof(HostedStreamingJobMaintenanceService)); + while (!stoppingToken.IsCancellationRequested) + { + this.logger.LogInformation("Activated JobEventGraph"); + await this.streamingJobMaintenanceService + .GetJobEventsGraph(stoppingToken) + .Run(this.materializer); + } + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + this.logger.LogInformation("Stopping {service}", nameof(HostedStreamingJobMaintenanceService)); + return base.StopAsync(cancellationToken); + } +} diff --git a/src/Services/Maintenance/StreamingJobMaintenanceService.cs b/src/Services/Maintenance/StreamingJobMaintenanceService.cs new file mode 100644 index 0000000..7984cc1 --- /dev/null +++ b/src/Services/Maintenance/StreamingJobMaintenanceService.cs @@ -0,0 +1,146 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.Configurations; +using Arcane.Operator.Extensions; +using Arcane.Operator.Models; +using Arcane.Operator.Services.Base; +using Arcane.Operator.StreamDefinitions.Base; +using k8s; +using k8s.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Snd.Sdk.ActorProviders; +using Snd.Sdk.Kubernetes; +using Snd.Sdk.Kubernetes.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Operator.Services.Maintenance; + +public class StreamingJobMaintenanceService : IStreamingJobMaintenanceService +{ + private readonly StreamingJobMaintenanceServiceConfiguration configuration; + private readonly IKubeCluster kubeCluster; + private readonly ILogger logger; + private readonly IStreamingJobOperatorService operatorService; + private readonly IStreamDefinitionRepository streamDefinitionRepository; + + public StreamingJobMaintenanceService( + ILogger logger, + IOptions options, + IKubeCluster kubeCluster, + IStreamDefinitionRepository streamDefinitionRepository, + IStreamingJobOperatorService operatorService) + { + this.configuration = options.Value; + this.kubeCluster = kubeCluster; + this.streamDefinitionRepository = streamDefinitionRepository; + this.operatorService = operatorService; + this.logger = logger; + } + + + public IRunnableGraph GetJobEventsGraph(CancellationToken cancellationToken) + { + return this.kubeCluster + .StreamJobEvents(this.operatorService.StreamJobNamespace, this.configuration.MaxBufferCapacity, + OverflowStrategy.Fail) + .Via(cancellationToken.AsFlow<(WatchEventType, V1Job)>(true)) + .SelectAsync(this.configuration.Parallelism, this.OnJobEvent) + .CollectOption() + .SelectAsync(this.configuration.Parallelism, this.HandleStreamOperatorResponse) + .ToMaterialized(Sink.Ignore>(), Keep.Right); + } + + private Task> OnJobEvent((WatchEventType, V1Job) valueTuple) + { + return valueTuple switch + { + (WatchEventType.Deleted, var job) => this.OnJobDelete(job), + (WatchEventType.Added, var job) => this.OnJobAdded(job), + (WatchEventType.Modified, var job) => this.OnJobModified(job), + _ => Task.FromResult(Option.None) + }; + } + + private Task> OnJobModified(V1Job job) + { + var streamId = job.GetStreamId(); + if (job.IsStopping()) + { + this.logger.LogInformation("Streaming job for stream with id {streamId} is already stopping", + streamId); + return Task.FromResult(Option.None); + } + + if (job.IsReloadRequested() || job.IsRestartRequested()) + { + return this.operatorService.DeleteJob(job.GetStreamKind(), streamId); + } + + return Task.FromResult(Option.None); + } + + private Task> OnJobAdded(V1Job job) + { + var streamId = job.GetStreamId(); + if (job.IsReloading()) + { + return Task.FromResult(StreamOperatorResponse.Reloading(job.Namespace(), job.GetStreamKind(), streamId) + .AsOption()); + } + + if (job.IsRunning()) + { + return Task.FromResult(StreamOperatorResponse.Running(job.Namespace(), job.GetStreamKind(), streamId) + .AsOption()); + } + + this.logger.LogError( + "{handler} handler triggered for the streaming job {streamId}, but the job is not in a running state", + nameof(this.OnJobAdded), streamId); + return Task.FromResult(Option.None); + } + + private Task> OnJobDelete(V1Job job) + { + var fullLoad = job.IsReloadRequested() || job.IsSchemaMismatch(); + return this.streamDefinitionRepository + .GetStreamDefinition(job.Namespace(), job.GetStreamKind(), job.GetStreamId()) + .Map(maybeSd => maybeSd switch + { + { HasValue: false } + => Task.FromResult(Option.None), + { HasValue: true, Value: var sd } when job.IsFailed() + => this.streamDefinitionRepository + .SetCrashLoopAnnotation(sd.Namespace(), sd.Kind, sd.StreamId) + .Map(maybeUpdatedSd => maybeUpdatedSd.HasValue + ? StreamOperatorResponse.CrashLoopDetected(maybeUpdatedSd.Value.Namespace(), + maybeUpdatedSd.Value.Kind, + maybeUpdatedSd.Value.StreamId) + .AsOption() + : Option.None), + { HasValue: true, Value: var sd } when sd.Suspended + => Task.FromResult( + StreamOperatorResponse.Suspended(sd.Namespace(), sd.Kind, sd.StreamId).AsOption()), + { HasValue: true, Value: var sd } when sd.CrashLoopDetected + => Task.FromResult(StreamOperatorResponse.CrashLoopDetected(sd.Namespace(), sd.Kind, sd.StreamId) + .AsOption()), + { HasValue: true, Value: var sd } when !sd.Suspended + => this.operatorService.StartRegisteredStream(maybeSd.Value, fullLoad), + _ => throw new ArgumentOutOfRangeException(nameof(maybeSd), maybeSd, null) + }).Flatten(); + } + + private Task> HandleStreamOperatorResponse(StreamOperatorResponse response) + { + return this.streamDefinitionRepository.SetStreamStatus(response.Namespace, + response.Kind, + response.Id, + response.ToStatus()); + } +} diff --git a/src/Services/Operator/HostedStreamOperatorService.cs b/src/Services/Operator/HostedStreamOperatorService.cs new file mode 100644 index 0000000..6e0c9db --- /dev/null +++ b/src/Services/Operator/HostedStreamOperatorService.cs @@ -0,0 +1,43 @@ +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Arcane.Operator.Services.Base; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Arcane.Operator.Services.Operator; + +public class HostedStreamOperatorService : BackgroundService +{ + private readonly ILogger> logger; + private readonly IMaterializer materializer; + private readonly IStreamOperatorService streamOperatorService; + + public HostedStreamOperatorService( + ILogger> logger, + IStreamOperatorService streamOperatorService, + IMaterializer materializer) + { + this.logger = logger; + this.streamOperatorService = streamOperatorService; + this.materializer = materializer; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + this.logger.LogInformation("Activated {service}", nameof(HostedStreamOperatorService)); + while (!stoppingToken.IsCancellationRequested) + { + this.logger.LogInformation("Start listening for {crdName} events", typeof(TStreamType).Name); + await this.streamOperatorService + .GetStreamDefinitionEventsGraph(stoppingToken) + .Run(this.materializer); + } + } + + public override Task StopAsync(CancellationToken cancellationToken) + { + this.logger.LogInformation("Stopping {service}", nameof(HostedStreamOperatorService)); + return base.StopAsync(cancellationToken); + } +} diff --git a/src/Services/Operator/StreamOperatorService.cs b/src/Services/Operator/StreamOperatorService.cs new file mode 100644 index 0000000..de9fdbe --- /dev/null +++ b/src/Services/Operator/StreamOperatorService.cs @@ -0,0 +1,188 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Streams.Supervision; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.Configurations; +using Arcane.Operator.Extensions; +using Arcane.Operator.Models; +using Arcane.Operator.Services.Base; +using Arcane.Operator.StreamDefinitions.Base; +using k8s; +using k8s.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Snd.Sdk.ActorProviders; +using Snd.Sdk.Kubernetes.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Operator.Services.Operator; + +public class StreamOperatorService : IStreamOperatorService + where TStreamType : IStreamDefinition +{ + private readonly StreamOperatorServiceConfiguration configuration; + private readonly IKubeCluster kubeCluster; + private readonly ILogger> logger; + private readonly IStreamingJobOperatorService operatorService; + private readonly CustomResourceConfiguration resourceConfiguration; + private readonly IStreamDefinitionRepository streamDefinitionRepository; + + public StreamOperatorService(IKubeCluster kubeCluster, + IOptions streamOperatorServiceOptions, + IOptionsSnapshot customResourceConfigurationsOptionsSnapshot, + IStreamingJobOperatorService operatorService, + IStreamDefinitionRepository streamDefinitionRepository, + ILogger> logger) + { + this.kubeCluster = kubeCluster; + this.configuration = streamOperatorServiceOptions.Value; + this.resourceConfiguration = customResourceConfigurationsOptionsSnapshot.Get(typeof(TStreamType).Name); + this.streamDefinitionRepository = streamDefinitionRepository; + this.operatorService = operatorService; + this.logger = logger; + } + + public IRunnableGraph GetStreamDefinitionEventsGraph(CancellationToken cancellationToken) + { + var synchronizationSource = this.GetStreamingJobSynchronizationGraph(); + var actualStateEventSource = this.kubeCluster.StreamCustomResourceEvents( + this.operatorService.StreamJobNamespace, + this.resourceConfiguration.ApiGroup, + this.resourceConfiguration.Version, + this.resourceConfiguration.Plural, + this.configuration.MaxBufferCapacity, + OverflowStrategy.Fail); + + return synchronizationSource + .Concat(actualStateEventSource) + .Via(cancellationToken.AsFlow<(WatchEventType, TStreamType)>(true)) + .SelectAsync(this.configuration.Parallelism, this.OnEvent) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError)) + .CollectOption() + .SelectAsync(this.configuration.Parallelism, + response => this.streamDefinitionRepository.SetStreamStatus(response.Namespace, + response.Kind, + response.Id, + response.ToStatus())) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError)) + .ToMaterialized(Sink.Ignore>(), Keep.Right); + } + + private Task> OnEvent((WatchEventType, TStreamType) arg) + { + return arg switch + { + (WatchEventType.Added, var streamDefinition) => this.OnAdded(streamDefinition), + (WatchEventType.Modified, var streamDefinition) => this.OnModified(streamDefinition), + _ => Task.FromResult(Option.None) + }; + } + + private Directive HandleError(Exception exception) + { + this.logger.LogError(exception, "Failed to handle stream definition event"); + return exception switch + { + BufferOverflowException => Directive.Stop, + _ => Directive.Resume + }; + } + + private Task> OnModified(IStreamDefinition streamDefinition) + { + this.logger.LogInformation("Modified a stream definition with id {streamId}", streamDefinition.StreamId); + return this.operatorService.GetStreamingJob(streamDefinition.StreamId) + .Map(maybeJob => + { + return maybeJob switch + { + { HasValue: false } when streamDefinition.CrashLoopDetected + => Task.FromResult(StreamOperatorResponse.CrashLoopDetected(streamDefinition.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId) + .AsOption()), + { HasValue: true } when streamDefinition.CrashLoopDetected + => this.operatorService.DeleteJob(streamDefinition.Kind, streamDefinition.StreamId), + { HasValue: false } when streamDefinition.ReloadRequested + => this.streamDefinitionRepository + .RemoveReloadingAnnotation(streamDefinition.Namespace(), streamDefinition.Kind, + streamDefinition.StreamId) + .Map(sd => sd.HasValue + ? this.operatorService.StartRegisteredStream(sd.Value, true) + : Task.FromResult(Option.None)) + .Flatten(), + { HasValue: true } when streamDefinition.ReloadRequested + => this.streamDefinitionRepository + .RemoveReloadingAnnotation(streamDefinition.Namespace(), streamDefinition.Kind, + streamDefinition.StreamId) + .Map(sd => sd.HasValue + ? this.operatorService.RequestStreamingJobReload(streamDefinition.StreamId) + : Task.FromResult(Option.None)) + .Flatten(), + { HasValue: true } when streamDefinition.Suspended + => this.operatorService.DeleteJob(streamDefinition.Kind, streamDefinition.StreamId), + { HasValue: false } when streamDefinition.Suspended + => Task.FromResult(StreamOperatorResponse.Suspended(streamDefinition.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId) + .AsOption()), + { Value: var job } when job.GetConfigurationChecksum() == + streamDefinition.GetConfigurationChecksum() + => Task.FromResult(Option.None), + { Value: var job } when !string.IsNullOrEmpty(job.GetConfigurationChecksum()) && + job.GetConfigurationChecksum() != + streamDefinition.GetConfigurationChecksum() + => this.operatorService.RequestStreamingJobRestart(streamDefinition.StreamId), + { HasValue: false } + => this.operatorService.StartRegisteredStream(streamDefinition, false), + _ => Task.FromResult(Option.None) + }; + }).Flatten(); + } + + private Task> OnAdded(IStreamDefinition streamDefinition) + { + this.logger.LogInformation("Added a stream definition with id {streamId}", streamDefinition.StreamId); + return streamDefinition.Suspended + ? Task.FromResult(StreamOperatorResponse.Suspended( + streamDefinition.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId).AsOption()) + : this.operatorService.GetStreamingJob(streamDefinition.StreamId) + .Map(maybeJob => maybeJob switch + { + { HasValue: true, Value: var job } when job.IsReloading() + => Task.FromResult(StreamOperatorResponse.Reloading( + streamDefinition.Metadata.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId) + .AsOption()), + { HasValue: true, Value: var job } when !job.IsReloading() + => Task.FromResult(StreamOperatorResponse.Running( + streamDefinition.Metadata.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId) + .AsOption()), + { HasValue: false } => this.operatorService.StartRegisteredStream(streamDefinition, true) + }).Flatten(); + } + + private Source<(WatchEventType, TStreamType), NotUsed> GetStreamingJobSynchronizationGraph() + { + var listTask = this.kubeCluster.ListCustomResources( + this.resourceConfiguration.ApiGroup, + this.resourceConfiguration.Version, + this.resourceConfiguration.Plural, + this.operatorService.StreamJobNamespace); + + return Source + .FromTask(listTask) + .SelectMany(l => l.Select(d => (WatchEventType.Modified, d))); + } +} diff --git a/src/Services/Repositories/StreamDefinitionRepository.cs b/src/Services/Repositories/StreamDefinitionRepository.cs new file mode 100644 index 0000000..8d56e18 --- /dev/null +++ b/src/Services/Repositories/StreamDefinitionRepository.cs @@ -0,0 +1,114 @@ +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.Services.Base; +using Arcane.Operator.StreamDefinitions; +using Arcane.Operator.StreamDefinitions.Base; +using Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; +using Microsoft.Extensions.Logging; +using Snd.Sdk.Kubernetes.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Operator.Services.Repositories; + +public class StreamDefinitionRepository : IStreamDefinitionRepository +{ + private readonly IKubeCluster kubeCluster; + private readonly ILogger logger; + private readonly IStreamClassRepository streamClassRepository; + + public StreamDefinitionRepository(ILogger logger, + IStreamClassRepository streamClassRepository, + IKubeCluster kubeCluster) + { + this.kubeCluster = kubeCluster; + this.logger = logger; + this.streamClassRepository = streamClassRepository; + } + + public Task> GetStreamDefinition(string nameSpace, string kind, string streamId) + { + var crdConf = this.streamClassRepository.Get(nameSpace, kind); + if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } + + return this.kubeCluster + .GetCustomResource( + crdConf.ApiGroup, + crdConf.Version, + crdConf.Plural, + nameSpace, + streamId, + element => (IStreamDefinition)element.Deserialize()) + .Map(resource => resource.AsOption()); + } + + public Task> SetStreamStatus(string nameSpace, string kind, string streamId, + V1Beta1StreamStatus streamStatus) + { + this.logger.LogInformation( + "Status and phase of stream with kind {kind} and id {streamId} changed to {statuses}, {phase}", + kind, + streamId, + string.Join(", ", streamStatus.Conditions.Select(sc => sc.Type)), + streamStatus.Phase); + var crdConf = this.streamClassRepository.Get(nameSpace, kind); + if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } + + return this.kubeCluster.UpdateCustomResourceStatus( + crdConf.ApiGroup, + crdConf.Version, + crdConf.Plural, + nameSpace, + streamId, + streamStatus, + element => (IStreamDefinition)element.Deserialize()) + .Map(resource => resource.AsOption()); + } + + public Task> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId) + { + var crdConf = this.streamClassRepository.Get(nameSpace, kind); + if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } + + return this.kubeCluster + .RemoveObjectAnnotation(crdConf.ToNamespacedCrd(), + Annotations.STATE_ANNOTATION_KEY, + streamId, + nameSpace) + .Map(result => (IStreamDefinition)((JsonElement)result).Deserialize()) + .Map(result => result.AsOption()); + } + + public Task> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId) + { + var crdConf = this.streamClassRepository.Get(nameSpace, kind); + if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } + + return this.kubeCluster + .AnnotateObject(crdConf.ToNamespacedCrd(), + Annotations.STATE_ANNOTATION_KEY, + Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE, + streamId, + nameSpace) + .Map(result => ((IStreamDefinition)((JsonElement)result).Deserialize()).AsOption()); + } +} diff --git a/src/Services/Repositories/StreamingJobTemplateRepository.cs b/src/Services/Repositories/StreamingJobTemplateRepository.cs new file mode 100644 index 0000000..42a2c39 --- /dev/null +++ b/src/Services/Repositories/StreamingJobTemplateRepository.cs @@ -0,0 +1,48 @@ +using System.Threading.Tasks; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.JobTemplates.V1Beta1; +using Arcane.Operator.Services.Base; +using Microsoft.Extensions.Logging; +using Snd.Sdk.Kubernetes.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Operator.Services.Repositories; + +public class StreamingJobTemplateRepository : IStreamingJobTemplateRepository +{ + private readonly IKubeCluster kubeCluster; + private readonly ILogger logger; + private readonly IStreamClassRepository streamClassRepository; + + public StreamingJobTemplateRepository(IKubeCluster kubeCluster, + IStreamClassRepository streamClassRepository, + ILogger logger) + { + this.kubeCluster = kubeCluster; + this.logger = logger; + this.streamClassRepository = streamClassRepository; + } + + public Task> GetStreamingJobTemplate(string kind, string jobNamespace, + string templateName) + { + var crdConf = this.streamClassRepository.Get(jobNamespace, kind); + if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + { + this.logger.LogError("Failed to get configuration for kind {kind} and mapped type {typeName}", + kind, + crdConf.GetType().Name); + return Task.FromResult(Option.None); + } + + return this.kubeCluster + .GetCustomResource( + crdConf.ApiGroup, + crdConf.Version, + crdConf.Plural, + jobNamespace, + templateName) + .Map(resource => resource.AsOption()); + } +} diff --git a/src/Services/StreamInteractionService.cs b/src/Services/StreamInteractionService.cs new file mode 100644 index 0000000..588fc73 --- /dev/null +++ b/src/Services/StreamInteractionService.cs @@ -0,0 +1,46 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.Services.Base; +using Snd.Sdk.Kubernetes.Base; + +namespace Arcane.Operator.Services; + +public class StreamInteractionService : IStreamInteractionService +{ + private const int STREAM_STOP_PORT = 13000; + private readonly IKubeCluster kubernetesService; + + public StreamInteractionService(IKubeCluster kubernetesService) + { + this.kubernetesService = kubernetesService; + } + + /// + public Task SetupTermination(Action onStreamTermination) + { + var server = new TcpListener(IPAddress.Parse("0.0.0.0"), STREAM_STOP_PORT); + server.Start(); + return server.AcceptSocketAsync().ContinueWith(onStreamTermination); + } + + /// + public async Task SendStopRequest(string streamerIp) + { + var endpoint = new IPEndPoint(IPAddress.Parse(streamerIp), STREAM_STOP_PORT); + using var tcpClient = new TcpClient(); + await tcpClient.ConnectAsync(endpoint); + } + + /// + public Task ReportSchemaMismatch(string streamId) + { + var nameSpace = this.kubernetesService.GetCurrentNamespace(); + return this.kubernetesService.AnnotateJob(streamId, + nameSpace, + Annotations.STATE_ANNOTATION_KEY, + Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE); + } +} diff --git a/src/Services/Streams/StreamingJobOperatorService.cs b/src/Services/Streams/StreamingJobOperatorService.cs new file mode 100644 index 0000000..1933391 --- /dev/null +++ b/src/Services/Streams/StreamingJobOperatorService.cs @@ -0,0 +1,201 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.Configurations; +using Arcane.Operator.Extensions; +using Arcane.Operator.Models; +using Arcane.Operator.Services.Base; +using Arcane.Operator.StreamDefinitions.Base; +using k8s.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Snd.Sdk.Kubernetes; +using Snd.Sdk.Kubernetes.Base; +using Snd.Sdk.Tasks; + +namespace Arcane.Operator.Services.Streams; + +public class StreamingJobOperatorService : IStreamingJobOperatorService +{ + private readonly StreamingJobOperatorServiceConfiguration configuration; + private readonly IKubeCluster kubernetesService; + private readonly ILogger logger; + private readonly IStreamingJobTemplateRepository streamingJobTemplateRepository; + private readonly IStreamInteractionService streamInteractionService; + + public StreamingJobOperatorService( + ILogger logger, + IOptions configuration, + IKubeCluster kubernetesService, + IStreamInteractionService streamInteractionService, + IStreamingJobTemplateRepository streamingJobTemplateRepository) + { + this.logger = logger; + this.configuration = configuration.Value; + this.kubernetesService = kubernetesService; + this.streamInteractionService = streamInteractionService; + this.streamingJobTemplateRepository = streamingJobTemplateRepository; + } + + + public string StreamJobNamespace => this.configuration.Namespace; + + public Task> GetStreamingJob(string streamId) + { + return this.kubernetesService.GetJob(streamId, this.StreamJobNamespace) + .TryMap(job => job.AsOption(), exception => + { + this.logger.LogWarning(exception, "Streaming job {streamId} not found", streamId); + return Option.None; + }); + } + + public Task> StartRegisteredStream(IStreamDefinition streamDefinition, bool fullLoad) + { + var templateRefKind = fullLoad + ? streamDefinition.ReloadingJobTemplateRef.Kind + : streamDefinition.JobTemplateRef.Kind; + var templateRefName = fullLoad + ? streamDefinition.ReloadingJobTemplateRef.Name + : streamDefinition.JobTemplateRef.Name; + return this.streamingJobTemplateRepository + .GetStreamingJobTemplate(templateRefKind, streamDefinition.Namespace(), templateRefName) + .Map(jobTemplate => + { + if (!jobTemplate.HasValue) + { + return Task.FromResult(StreamOperatorResponse.OperationFailed(streamDefinition.Metadata.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId, + $"Failed to find job template with kind {templateRefKind} and name {templateRefName}") + .AsOption()); + } + + var job = jobTemplate + .Value + .GetJob() + .WithStreamingJobLabels(streamDefinition.StreamId, fullLoad, streamDefinition.Kind) + .WithStreamingJobAnnotations(streamDefinition.GetConfigurationChecksum()) + .WithCustomEnvironment(streamDefinition.ToV1EnvFromSources()) + .WithCustomEnvironment(streamDefinition.ToEnvironment(fullLoad)) + .WithOwnerReference(streamDefinition) + .WithName(streamDefinition.StreamId); + this.logger.LogInformation("Starting a new stream job with an id {streamId}", + streamDefinition.StreamId); + return this.kubernetesService + .SendJob(job, streamDefinition.Metadata.Namespace(), CancellationToken.None) + .TryMap( + _ => fullLoad + ? StreamOperatorResponse.Reloading(streamDefinition.Metadata.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId) + : StreamOperatorResponse.Running(streamDefinition.Metadata.Namespace(), + streamDefinition.Kind, + streamDefinition.StreamId), + exception => + { + this.logger.LogError(exception, "Failed to send job"); + return Option.None; + }); + }) + .Flatten(); + } + + public Task> RequestStreamingJobRestart(string streamId) + { + return this.SetStreamingJobAnnotation(streamId, Annotations.RESTARTING_STATE_ANNOTATION_VALUE) + .Map(maybeSi + => maybeSi.Select(job + => StreamOperatorResponse.Restarting(this.StreamJobNamespace, job.GetStreamKind(), streamId))); + } + + public Task> RequestStreamingJobTermination(string streamId) + { + return this.SetStreamingJobAnnotation(streamId, Annotations.TERMINATE_REQUESTED_STATE_ANNOTATION_VALUE) + .Map(maybeSi + => maybeSi.Select(job + => StreamOperatorResponse.Terminating(this.StreamJobNamespace, job.GetStreamKind(), streamId))); + } + + public Task> RequestStreamingJobReload(string streamId) + { + return this.SetStreamingJobAnnotation(streamId, Annotations.RELOADING_STATE_ANNOTATION_VALUE) + .Map(maybeSi + => maybeSi.Select(job + => StreamOperatorResponse.Terminating(this.StreamJobNamespace, job.GetStreamKind(), streamId))); + } + + public Task> DeleteJob(string kind, string streamId) + { + return this.kubernetesService.DeleteJob(streamId, this.StreamJobNamespace) + .Map(_ => StreamOperatorResponse.Suspended(this.StreamJobNamespace, kind, streamId).AsOption()); + } + + public Task> FindAndStopStreamingJob(string kind, string streamId) + { + var labels = new Dictionary { { V1JobExtensions.STREAM_ID_LABEL, streamId } }; + return this.kubernetesService + .GetPods(labels, this.configuration.Namespace) + .Map(pods => this.StopActivePod(pods, kind, streamId)) + .Flatten(); + } + + private async Task> StopActivePod(IEnumerable pods, string kind, + string streamId) + { + var activePods = pods.Where(p => + p.Status != null + && p.Status.Phase.Equals("running", StringComparison.OrdinalIgnoreCase) + && !string.IsNullOrEmpty(p.Status.PodIP)).ToList(); + if (activePods is { Count: 1 }) + { + try + { + await this.streamInteractionService + .SendStopRequest(activePods.Single().Status.PodIP); + return StreamOperatorResponse.Stopped(this.StreamJobNamespace, kind, streamId); + } + catch (Exception e) + { + this.logger.LogError(e, "Failed to send stop request to pod with stream id: {streamId}", streamId); + return StreamOperatorResponse.OperationFailed(this.StreamJobNamespace, + kind, + streamId, + "Stopping the stream failed"); + } + } + + if (activePods is { Count: < 1 }) + { + this.logger.LogWarning("Cannot find active pod for stream id {streamId}", streamId); + return StreamOperatorResponse.OperationFailed(this.StreamJobNamespace, + kind, + streamId, + "Stopping the stream failed: no running pods found"); + } + + this.logger.LogError("Found multiple active pods for stream id {streamId}: {count}", streamId, + activePods.Count); + return StreamOperatorResponse.OperationFailed(this.StreamJobNamespace, + kind, + streamId, + "Stopping the stream failed: multiple running pods found"); + } + + private Task> SetStreamingJobAnnotation(string streamId, string annotationValue) + { + return this.kubernetesService.AnnotateJob(streamId, this.configuration.Namespace, + Annotations.STATE_ANNOTATION_KEY, annotationValue) + .TryMap(job => job.AsOption(), + exception => + { + this.logger.LogError(exception, "Failed request {streamId} termination", streamId); + return Option.None; + }); + } +} diff --git a/src/Startup.cs b/src/Startup.cs new file mode 100644 index 0000000..7a76c47 --- /dev/null +++ b/src/Startup.cs @@ -0,0 +1,77 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; +using Arcane.Operator.Services; +using Arcane.Operator.Services.Base; +using Arcane.Operator.Services.Maintenance; +using Arcane.Operator.Services.Repositories; +using Arcane.Operator.Services.Streams; +using Azure.Data.Tables; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.OpenApi.Models; +using Snd.Sdk.ActorProviders; +using Snd.Sdk.Kubernetes.Providers; +using Snd.Sdk.Metrics.Configurations; +using Snd.Sdk.Metrics.Providers; +using Snd.Sdk.Storage.Providers; +using Snd.Sdk.Storage.Providers.Configurations; + +namespace Arcane.Operator; + +[ExcludeFromCodeCoverage] +public class Startup +{ + public Startup(IConfiguration configuration) + { + this.Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + public void ConfigureServices(IServiceCollection services) + { + // service config injections + services.Configure(this.Configuration.GetSection(nameof(AzureMonitorConfiguration))); + + services.AddLocalActorSystem(); + + services.AddAzureBlob(AzureStorageConfiguration.CreateDefault()); + services.AddAzureTable(AzureStorageConfiguration.CreateDefault()); + services.AddDatadogMetrics(DatadogConfiguration.Default(nameof(Arcane))); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddKubernetes(); + + services.AddHealthChecks(); + + services.AddControllers().AddJsonOptions(options => + options.JsonSerializerOptions.Converters.Add(new JsonStringEnumConverter())); + services.AddSwaggerGen(c => { c.SwaggerDoc("v1", new OpenApiInfo { Title = "Arcane", Version = "v1" }); }); + } + + public void Configure(IApplicationBuilder app, IWebHostEnvironment env, + IHostApplicationLifetime hostApplicationLifetime) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + app.UseSwagger(); + app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Arcane v1")); + } + + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + endpoints.MapHealthChecks("/health"); + }); + } +} diff --git a/src/StreamDefinitions/Base/IStreamDefinition.cs b/src/StreamDefinitions/Base/IStreamDefinition.cs new file mode 100644 index 0000000..f43c6e6 --- /dev/null +++ b/src/StreamDefinitions/Base/IStreamDefinition.cs @@ -0,0 +1,62 @@ +using System.Collections.Generic; +using Arcane.Models.StreamingJobLifecycle; +using k8s; +using k8s.Models; +using Newtonsoft.Json; + +namespace Arcane.Operator.StreamDefinitions.Base; + +public interface IStreamDefinition : IKubernetesObject +{ + /// + /// Stream identifier + /// + [JsonIgnore] + public string StreamId { get; } + + /// + /// True if the stream is suspended + /// + public bool Suspended { get; } + + /// + /// True if the stream is a crash loop detected + /// + [JsonIgnore] + public bool CrashLoopDetected + => + this.Metadata?.Annotations != null + && this.Metadata.Annotations.TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE; + + /// + /// True if a data backfill (full reload) is requested + /// + public bool ReloadRequested { get; } + + /// + /// Streaming job template reference + /// + public V1TypedLocalObjectReference JobTemplateRef { get; } + + /// + /// Streaming job template reference for full load job mode + /// + public V1TypedLocalObjectReference ReloadingJobTemplateRef { get; } + + /// + /// Convert stream configuration to Kubernetes environment references + /// + public IEnumerable ToV1EnvFromSources(); + + /// + /// Convert stream configuration to environment variables + /// + /// + public Dictionary ToEnvironment(bool fullLoad); + + /// + /// Returns checksum of the stream configuration + /// + public string GetConfigurationChecksum(); +} diff --git a/src/StreamDefinitions/StreamDefinition.cs b/src/StreamDefinitions/StreamDefinition.cs new file mode 100644 index 0000000..42f26c3 --- /dev/null +++ b/src/StreamDefinitions/StreamDefinition.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.StreamDefinitions.Base; +using k8s.Models; + +namespace Arcane.Operator.StreamDefinitions; + +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamDefinition : IStreamDefinition +{ + /// + /// Stream configuration + /// + [JsonPropertyName("spec")] + public StreamDefinitionSpec Spec { get; set; } + + /// + /// Api version + /// + [JsonPropertyName("apiVersion")] + public string ApiVersion { get; set; } + + /// + /// Object kind (should always be "SqlServerStream") + /// + [JsonPropertyName("kind")] + public string Kind { get; set; } + + /// + /// Object metadata see + /// + [JsonPropertyName("metadata")] + public V1ObjectMeta Metadata { get; set; } + + /// + [JsonIgnore] + public bool Suspended + => + this.Metadata?.Annotations != null + && this.Metadata.Annotations.TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.SUSPENDED_STATE_ANNOTATION_VALUE; + + /// + [JsonIgnore] + public bool ReloadRequested + => + this.Metadata?.Annotations != null + && this.Metadata.Annotations.TryGetValue(Annotations.STATE_ANNOTATION_KEY, out var value) + && value == Annotations.RELOADING_STATE_ANNOTATION_VALUE; + + /// + [JsonIgnore] + public V1TypedLocalObjectReference JobTemplateRef => this.Spec.JobTemplateRef; + + /// + [JsonIgnore] + public V1TypedLocalObjectReference ReloadingJobTemplateRef => this.Spec.ReloadingJobTemplateRef; + + /// + public IEnumerable ToV1EnvFromSources() + { + return this.Spec.ToV1EnvFromSources(); + } + + /// + /// Encode Stream runner configuration to dictionary that can be passed as environment variables + /// + /// + /// Dictionary of strings + public Dictionary ToEnvironment(bool fullLoad) + { + return this.SelfToEnvironment(fullLoad) + .Concat(this.Spec.ToEnvironment()) + .ToDictionary(x => x.Key, x => x.Value); + } + + public string GetConfigurationChecksum() + { + var base64Hash = Convert.ToBase64String(this.GetSpecHash()); + return base64Hash[..7].ToLowerInvariant(); + } + + /// + /// Stream identifier + /// + [JsonIgnore] + public string StreamId => this.Metadata.Name; + + private byte[] GetSpecHash() + { + return SHA256.HashData(Encoding.UTF8.GetBytes(JsonSerializer.Serialize(this.Spec))); + } + + private Dictionary SelfToEnvironment(bool fullLoad) + { + return new Dictionary + { + { "STREAM_ID".GetEnvironmentVariableName(), this.StreamId }, + { "STREAM_KIND".GetEnvironmentVariableName(), this.Kind }, + { "FULL_LOAD".GetEnvironmentVariableName(), fullLoad.ToString().ToLowerInvariant() } + }; + } +} diff --git a/src/StreamDefinitions/StreamDefinitionSpec.cs b/src/StreamDefinitions/StreamDefinitionSpec.cs new file mode 100644 index 0000000..9d684dd --- /dev/null +++ b/src/StreamDefinitions/StreamDefinitionSpec.cs @@ -0,0 +1,57 @@ +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; +using k8s.Models; + +namespace Arcane.Operator.StreamDefinitions; + +/// +/// Configuration for Sql Server Single Table Stream. +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class StreamDefinitionSpec +{ + /// + /// Sql Server connection string. + /// + [JsonPropertyName("secretRef")] + public V1SecretEnvSource SecretRef { get; init; } + + /// + /// Job template reference + /// + [JsonPropertyName("jobTemplateRef")] + public V1TypedLocalObjectReference JobTemplateRef { get; init; } + + /// + /// Job template reference + /// + [JsonPropertyName("reloadingJobTemplateRef")] + public V1TypedLocalObjectReference ReloadingJobTemplateRef { get; init; } + + /// + /// Table schema. + /// + [JsonPropertyName("streamSettings")] + public string StreamSettings { get; init; } + + /// + /// Convert secret references from this spec to V1EnvFromSource objects. + /// + public IEnumerable ToV1EnvFromSources() + { + return new[] { new V1EnvFromSource(secretRef: new V1SecretEnvSource(this.SecretRef.Name)) }; + } + + /// + /// Convert configuration to environment variables. + /// + /// + public Dictionary ToEnvironment() + { + return new Dictionary + { + { nameof(this.StreamSettings).GetEnvironmentVariableName(), this.StreamSettings } + }; + } +} diff --git a/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamCondition.cs b/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamCondition.cs new file mode 100644 index 0000000..23c45c3 --- /dev/null +++ b/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamCondition.cs @@ -0,0 +1,29 @@ +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; + +namespace Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; + +/// +/// Represents the status of a stream for Kubernetes CRD +/// +[ExcludeFromCodeCoverage(Justification = "Model")] +public class V1Beta1StreamCondition +{ + /// + /// Latest observed state of the stream + /// + [JsonPropertyName("status")] + public string Status { get; init; } + + /// + /// Latest observed state of the stream + /// + [JsonPropertyName("type")] + public string Type { get; init; } + + /// + /// Latest observed state of the stream + /// + [JsonPropertyName("message")] + public string Message { get; set; } +} diff --git a/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamStatus.cs b/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamStatus.cs new file mode 100644 index 0000000..4750960 --- /dev/null +++ b/src/StreamStatuses/StreamStatus/V1Beta1/V1Beta1StreamStatus.cs @@ -0,0 +1,18 @@ +using System.Text.Json.Serialization; + +namespace Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; + +public class V1Beta1StreamStatus +{ + /// + /// List of conditions of the stream + /// + [JsonPropertyName("conditions")] + public V1Beta1StreamCondition[] Conditions { get; init; } + + /// + /// List of conditions of the stream + /// + [JsonPropertyName("phase")] + public string Phase { get; init; } +} diff --git a/src/StreamingJobLifecycle/Annotations.cs b/src/StreamingJobLifecycle/Annotations.cs new file mode 100644 index 0000000..836acf3 --- /dev/null +++ b/src/StreamingJobLifecycle/Annotations.cs @@ -0,0 +1,15 @@ +namespace Arcane.Models.StreamingJobLifecycle; + +public static class Annotations +{ + public const string STATE_ANNOTATION_KEY = "arcane/state"; + public const string TERMINATING_STATE_ANNOTATION_VALUE = "terminating"; + public const string TERMINATE_REQUESTED_STATE_ANNOTATION_VALUE = "terminate-requested"; + public const string SUSPENDED_STATE_ANNOTATION_VALUE = "suspended"; + public const string CRASH_LOOP_STATE_ANNOTATION_VALUE = "crash-loop"; + public const string RESTARTING_STATE_ANNOTATION_VALUE = "restart-requested"; + public const string RELOADING_STATE_ANNOTATION_VALUE = "reload-requested"; + public const string SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE = "schema-mismatch"; + + public const string CONFIGURATION_CHECKSUM_ANNOTATION_KEY = "arcane/configuration-checksum"; +} \ No newline at end of file diff --git a/src/appsettings.Development.json b/src/appsettings.Development.json new file mode 100644 index 0000000..8983e0f --- /dev/null +++ b/src/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/src/appsettings.json b/src/appsettings.json new file mode 100644 index 0000000..cd1a082 --- /dev/null +++ b/src/appsettings.json @@ -0,0 +1,52 @@ +{ + "AllowedHosts": "*", + "CheckpointServiceConfiguration": { + "CheckpointTable": "ArcaneCheckpoints", + "CheckpointsPerBatch": 1000 + }, + "JsonIngestionConfiguration": { + "BufferSize": 16, + "ThrottleDocumentLimit": 256, + "ThrottleDocumentBurst": 1024, + "ThrottleTimespan": "0.00:00:01", + "MaxDocumentsPerFile": 1024, + "GroupingInterval": "0.00:00:30", + "IngestionSinkPath": "ingestion@" + }, + "StreamingJobOperatorServiceConfiguration": { + "Namespace": "arcane" + }, + "StreamingJobMaintenanceServiceConfiguration": { + "MaxBufferCapacity": 100, + "Parallelism": 1 + }, + "StreamOperatorServiceConfiguration": { + "MaxBufferCapacity": 100, + "Parallelism": 1 + }, + "V1Beta1SqlServerChangeTrackingStream": { + "ApiGroup": "streaming.sneaksanddata.com", + "Plural": "sqlserver-ct-streams", + "Version": "v1beta1" + }, + "V1Beta1SqlServerStream": { + "ApiGroup": "streaming.sneaksanddata.com", + "Plural": "sqlserver-streams", + "Version": "v1beta1" + }, + "V1Beta1CdmChangeFeedStream": { + "ApiGroup": "streaming.sneaksanddata.com", + "Plural": "cdm-streams", + "Version": "v1beta1" + }, + "V1Beta1RestApiFixedAuthStream": { + "ApiGroup": "streaming.sneaksanddata.com", + "Plural": "api-dfs", + "Version": "v1beta1" + }, + "V1Beta1StreamingJobTemplate": { + "ApiGroup": "streaming.sneaksanddata.com", + "Plural": "streaming-job-templates", + "Version": "v1beta1" + } +} diff --git a/test/Arcane.Operator.Tests.csproj b/test/Arcane.Operator.Tests.csproj index f2e33f5..2b31628 100644 --- a/test/Arcane.Operator.Tests.csproj +++ b/test/Arcane.Operator.Tests.csproj @@ -5,16 +5,16 @@ false - DotnetProject.Tests + Arcane.Operator.Tests - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -23,10 +23,13 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + + + - + - + diff --git a/test/Fixtures/AkkaFixture.cs b/test/Fixtures/AkkaFixture.cs new file mode 100644 index 0000000..9281311 --- /dev/null +++ b/test/Fixtures/AkkaFixture.cs @@ -0,0 +1,16 @@ +using Akka.Actor; +using Akka.Streams; + +namespace Arcane.Operator.Tests.Fixtures; + +public class AkkaFixture +{ + public AkkaFixture() + { + this.ActorSystem = ActorSystem.Create(nameof(AkkaFixture)); + this.Materializer = this.ActorSystem.Materializer(); + } + + public ActorSystem ActorSystem { get; } + public IMaterializer Materializer { get; } +} diff --git a/test/Fixtures/LoggerFixture.cs b/test/Fixtures/LoggerFixture.cs new file mode 100644 index 0000000..2269430 --- /dev/null +++ b/test/Fixtures/LoggerFixture.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.Logging; + +namespace Arcane.Operator.Tests.Fixtures; + +public class LoggerFixture +{ + public LoggerFixture() + { + this.Factory = LoggerFactory.Create(conf => conf.AddConsole()); + } + + public ILoggerFactory Factory { get; } +} diff --git a/test/Fixtures/ServiceFixture.cs b/test/Fixtures/ServiceFixture.cs new file mode 100644 index 0000000..a64230a --- /dev/null +++ b/test/Fixtures/ServiceFixture.cs @@ -0,0 +1,16 @@ +using Arcane.Operator.Services.Base; +using Moq; +using Snd.Sdk.Kubernetes.Base; + +namespace Arcane.Operator.Tests.Fixtures; + +public class ServiceFixture +{ + public Mock MockKubeCluster { get; } = new(); + + public Mock MockStreamInteractionService { get; } = new(); + + public Mock MockStreamingJobOperatorService { get; } = new(); + + public Mock MockStreamDefinitionRepository { get; } = new(); +} diff --git a/test/Services/HostedStreamingJobMaintenanceServiceTests.cs b/test/Services/HostedStreamingJobMaintenanceServiceTests.cs new file mode 100644 index 0000000..c50e3f3 --- /dev/null +++ b/test/Services/HostedStreamingJobMaintenanceServiceTests.cs @@ -0,0 +1,121 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Akka.Streams.Dsl; +using Arcane.Operator.Configurations; +using Arcane.Operator.Services.Base; +using Arcane.Operator.Services.Maintenance; +using Arcane.Operator.Services.Operator; +using Arcane.Operator.Services.Streams; +using Arcane.Operator.StreamDefinitions; +using Arcane.Operator.Tests.Fixtures; +using k8s; +using k8s.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Arcane.Operator.Tests.Services; + +public class HostedStreamingJobMaintenanceServiceTests : IClassFixture, IClassFixture, + IClassFixture +{ + private readonly AkkaFixture akkaFixture; + private readonly LoggerFixture loggerFixture; + private readonly ServiceFixture serviceFixture; + + public HostedStreamingJobMaintenanceServiceTests(ServiceFixture serviceFixture, LoggerFixture loggerFixture, + AkkaFixture akkaFixture) + { + this.serviceFixture = serviceFixture; + this.loggerFixture = loggerFixture; + this.akkaFixture = akkaFixture; + } + + [Fact] + public async Task TestHostedServiceRestart() + { + // Arrange + var mockSource = Source.From(new List<(WatchEventType, StreamDefinition)> + { + (WatchEventType.Added, + new StreamDefinition() + { Metadata = new V1ObjectMeta { Name = nameof(this.TestHostedServiceRestart) } }) + } + ); + this.serviceFixture + .MockKubeCluster + .Setup(cluster + => cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(mockSource); + + this.serviceFixture.MockKubeCluster.Setup(s + => s.ListCustomResources( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>>(), + It.IsAny())) + .ReturnsAsync(new List()); + + using var service = this.CreateService(); + + // Act + await service.StartAsync(CancellationToken.None); + + await Task.Delay(2000); + + // Assert + this.serviceFixture + .MockKubeCluster + .Verify(cluster + => cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny()), + Times.AtLeast(2)); + } + + private HostedStreamOperatorService CreateService() + { + var optionsMock = new Mock>(); + optionsMock + .Setup(m => m.Get(It.IsAny())) + .Returns(new CustomResourceConfiguration()); + return new ServiceCollection() + .AddSingleton(Options.Create(new StreamOperatorServiceConfiguration + { + Parallelism = 1, MaxBufferCapacity = 100 + })) + .AddSingleton(optionsMock.Object) + .AddSingleton(this.serviceFixture.MockKubeCluster.Object) + .AddSingleton(this.serviceFixture.MockStreamInteractionService.Object) + .AddSingleton(this.serviceFixture.MockStreamDefinitionRepository.Object) + .AddSingleton(this.serviceFixture.MockStreamingJobOperatorService.Object) + .AddSingleton(this.loggerFixture.Factory.CreateLogger()) + .AddSingleton(this.loggerFixture.Factory.CreateLogger>()) + .AddSingleton(this.loggerFixture.Factory.CreateLogger>()) + .AddSingleton() + .AddSingleton(Options.Create(new StreamingJobOperatorServiceConfiguration())) + .AddSingleton, StreamOperatorService>() + .AddSingleton>() + .AddSingleton(this.akkaFixture.Materializer) + .BuildServiceProvider() + .GetRequiredService>(); + } +} diff --git a/test/Services/StreamOperatorServiceTests.cs b/test/Services/StreamOperatorServiceTests.cs new file mode 100644 index 0000000..150623a --- /dev/null +++ b/test/Services/StreamOperatorServiceTests.cs @@ -0,0 +1,386 @@ +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.Configurations; +using Arcane.Operator.Services.Base; +using Arcane.Operator.Services.Operator; +using Arcane.Operator.StreamDefinitions; +using Arcane.Operator.StreamDefinitions.Base; +using Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; +using Arcane.Operator.Tests.Fixtures; +using Arcane.Operator.Tests.Services.TestCases; +using k8s; +using k8s.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; +using static Arcane.Operator.Tests.Services.TestCases.JobTestCases; +using static Arcane.Operator.Tests.Services.TestCases.StreamDefinitionTestCases; +using FailedStreamDefinition = Arcane.Operator.Tests.Services.TestCases.FailedStreamDefinition; + +namespace Arcane.Operator.Tests.Services; + +public class StreamOperatorServiceTests : IClassFixture, IClassFixture, + IClassFixture +{ + private readonly AkkaFixture akkaFixture; + private readonly LoggerFixture loggerFixture; + private readonly ServiceFixture serviceFixture; + + public StreamOperatorServiceTests(ServiceFixture serviceFixture, LoggerFixture loggerFixture, + AkkaFixture akkaFixture) + { + this.serviceFixture = serviceFixture; + this.loggerFixture = loggerFixture; + this.akkaFixture = akkaFixture; + } + + public static IEnumerable GenerateSynchronizationTestCases() + { + yield return new object[] + { WatchEventType.Added, StreamDefinitionTestCases.StreamDefinition, true, false, false, false }; + yield return new object[] + { WatchEventType.Modified, StreamDefinitionTestCases.StreamDefinition, false, true, true, false }; + yield return new object[] + { WatchEventType.Modified, StreamDefinitionTestCases.StreamDefinition, false, true, false, false }; + + yield return new object[] { WatchEventType.Added, SuspendedStreamDefinition, false, false, false, false }; + yield return new object[] { WatchEventType.Deleted, SuspendedStreamDefinition, false, false, false, false }; + yield return new object[] { WatchEventType.Modified, SuspendedStreamDefinition, false, false, true, true }; + yield return new object[] { WatchEventType.Modified, SuspendedStreamDefinition, false, false, false, false }; + } + + [Theory] + [MemberData(nameof(GenerateSynchronizationTestCases))] + public async Task TestHandleAddedStreamEvent(WatchEventType eventType, + StreamDefinition streamDefinition, + bool expectFullLoad, + bool expectRestart, + bool streamingJobExists, + bool expectTermination) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.SetupEventMock(eventType, streamDefinition); + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(streamingJobExists ? JobWithChecksum("checksum").AsOption() : Option.None); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.StartRegisteredStream( + It.IsAny(), true), + Times.Exactly(expectFullLoad ? 1 : 0)); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.RequestStreamingJobRestart(streamDefinition.StreamId), + Times.Exactly(expectRestart && streamingJobExists ? 1 : 0)); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.StartRegisteredStream( + It.IsAny(), false), + Times.Exactly(expectRestart && !streamingJobExists ? 1 : 0)); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.DeleteJob(It.IsAny(), It.IsAny()), + Times.Exactly(expectTermination ? 1 : 0)); + } + + public static IEnumerable GenerateModifiedTestCases() + { + yield return new object[] { StreamDefinitionTestCases.StreamDefinition, true, true }; + yield return new object[] { StreamDefinitionTestCases.StreamDefinition, false, false }; + } + + [Theory] + [MemberData(nameof(GenerateModifiedTestCases))] + public async Task TestJobModificationEvent(StreamDefinition streamDefinition, + bool jobChecksumChanged, bool expectRestart) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.SetupEventMock(WatchEventType.Modified, streamDefinition); + var mockJob = JobWithChecksum(jobChecksumChanged ? "checksum" : streamDefinition.GetConfigurationChecksum()); + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(mockJob.AsOption()); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.RequestStreamingJobRestart(It.IsAny()), Times.Exactly(expectRestart ? 1 : 0)); + } + + + public static IEnumerable GenerateReloadTestCases() + { + yield return new object[] { ReloadRequestedStreamDefinition, true, true }; + yield return new object[] { ReloadRequestedStreamDefinition, false, false }; + } + + [Theory] + [MemberData(nameof(GenerateReloadTestCases))] + public async Task TestStreamReload(StreamDefinition streamDefinition, bool jobExists, bool expectReload) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository + .Setup(sdr + => sdr.RemoveReloadingAnnotation(streamDefinition.Namespace(), streamDefinition.Kind, + streamDefinition.StreamId)) + .ReturnsAsync(((IStreamDefinition)streamDefinition).AsOption()); + this.SetupEventMock(WatchEventType.Modified, streamDefinition); + var mockJob = JobWithChecksum(streamDefinition.GetConfigurationChecksum()); + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(jobExists ? mockJob.AsOption() : Option.None); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.RequestStreamingJobReload(It.IsAny()), Times.Exactly(expectReload ? 1 : 0)); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.StartRegisteredStream(It.IsAny(), true), Times.Exactly(expectReload ? 0 : 1)); + + this.serviceFixture.MockStreamDefinitionRepository.Verify(service + => service.RemoveReloadingAnnotation(streamDefinition.Namespace(), streamDefinition.Kind, + streamDefinition.StreamId)); + } + + public static IEnumerable GenerateAddTestCases() + { + yield return new object[] { ReloadRequestedStreamDefinition, true, true, false }; + yield return new object[] { ReloadRequestedStreamDefinition, false, false, true }; + yield return new object[] { ReloadRequestedStreamDefinition, true, false, false }; + + yield return new object[] { StreamDefinitionTestCases.StreamDefinition, true, true, false }; + yield return new object[] { StreamDefinitionTestCases.StreamDefinition, false, false, true }; + yield return new object[] { StreamDefinitionTestCases.StreamDefinition, true, false, false }; + + yield return new object[] { SuspendedStreamDefinition, true, true, false }; + yield return new object[] { SuspendedStreamDefinition, false, false, false }; + yield return new object[] { SuspendedStreamDefinition, true, false, false }; + } + + [Theory] + [MemberData(nameof(GenerateAddTestCases))] + public async Task TestStreamAdded(StreamDefinition streamDefinition, bool jobExists, bool jobIsReloading, + bool expectStart) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository + .Setup(sdr + => sdr.RemoveReloadingAnnotation(streamDefinition.Namespace(), streamDefinition.Kind, + streamDefinition.StreamId)) + .ReturnsAsync(((IStreamDefinition)streamDefinition).AsOption()); + this.SetupEventMock(WatchEventType.Added, streamDefinition); + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(jobExists ? jobIsReloading ? ReloadingJob : RunningJob : Option.None); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.RequestStreamingJobReload(It.IsAny()), Times.Exactly(0)); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.StartRegisteredStream(It.IsAny(), true), Times.Exactly(expectStart ? 1 : 0)); + } + + public static IEnumerable GenerateRecoverableTestCases() + { + yield return new object[] { FailedStreamDefinition(new JsonException()) }; + } + + [Theory] + [MemberData(nameof(GenerateRecoverableTestCases))] + public async Task HandleBrokenStreamDefinition(FailedStreamDefinition streamDefinition) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Source.Single((WatchEventType.Added, streamDefinition))); + + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(FailedJob.AsOption()); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert + + this.serviceFixture.MockStreamingJobOperatorService.Verify(service + => service.GetStreamingJob(It.IsAny()), Times.Never); + } + + public static IEnumerable GenerateFatalTestCases() + { + yield return new object[] { FailedStreamDefinition(new BufferOverflowException("test")) }; + } + + [Theory] + [MemberData(nameof(GenerateFatalTestCases))] + public async Task HandleFatalException(FailedStreamDefinition streamDefinition) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Source.Single((WatchEventType.Added, streamDefinition))); + + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(FailedJob.AsOption()); + + // Act + var exception = await Assert.ThrowsAnyAsync(async () => + { + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + }); + // Assert + + Assert.Equal("test", exception.Message); + } + + [Fact] + public async Task HandleBrokenStreamRepository() + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns( + Source.Single((WatchEventType.Added, (StreamDefinition)StreamDefinitionTestCases.StreamDefinition))); + + this.serviceFixture.MockStreamingJobOperatorService + .Setup(service => service.GetStreamingJob(It.IsAny())) + .ReturnsAsync(FailedJob.AsOption()); + + this.serviceFixture.MockStreamDefinitionRepository + .Setup(service + => service.SetStreamStatus( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ThrowsAsync(new Exception()); + + // Act + var sp = this.CreateServiceProvider(); + await sp.GetRequiredService>() + .GetStreamDefinitionEventsGraph(CancellationToken.None) + .Run(this.akkaFixture.Materializer); + + // Assert that code above didn't throw + Assert.True(true); + } + + + private void SetupEventMock(WatchEventType eventType, StreamDefinition streamDefinition) + { + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamCustomResourceEvents( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Source.Single<(WatchEventType, StreamDefinition)>((eventType, streamDefinition))); + } + + + private ServiceProvider CreateServiceProvider() + { + var optionsMock = new Mock>(); + optionsMock + .Setup(m => m.Get(It.IsAny())) + .Returns(new CustomResourceConfiguration()); + return new ServiceCollection() + .AddSingleton(this.akkaFixture.Materializer) + .AddSingleton(this.serviceFixture.MockKubeCluster.Object) + .AddSingleton(this.serviceFixture.MockStreamingJobOperatorService.Object) + .AddSingleton(this.serviceFixture.MockStreamDefinitionRepository.Object) + .AddSingleton(this.loggerFixture.Factory.CreateLogger>()) + .AddSingleton(this.loggerFixture.Factory.CreateLogger>()) + .AddSingleton(this.serviceFixture.MockStreamingJobOperatorService.Object) + .AddSingleton(optionsMock.Object) + .AddSingleton(Options.Create(new StreamOperatorServiceConfiguration + { + Parallelism = 1, MaxBufferCapacity = 100 + })) + .AddSingleton, StreamOperatorService>() + .AddSingleton, StreamOperatorService>() + .BuildServiceProvider(); + } +} diff --git a/test/Services/StreamingJobMaintenanceServiceTests.cs b/test/Services/StreamingJobMaintenanceServiceTests.cs new file mode 100644 index 0000000..ae207fc --- /dev/null +++ b/test/Services/StreamingJobMaintenanceServiceTests.cs @@ -0,0 +1,238 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.Configurations; +using Arcane.Operator.Extensions; +using Arcane.Operator.Models; +using Arcane.Operator.Services.Maintenance; +using Arcane.Operator.Services.Streams; +using Arcane.Operator.StreamDefinitions.Base; +using Arcane.Operator.StreamStatuses.StreamStatus.V1Beta1; +using Arcane.Operator.Tests.Fixtures; +using k8s; +using k8s.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; +using static Arcane.Operator.Tests.Services.TestCases.JobTestCases; +using static Arcane.Operator.Tests.Services.TestCases.StreamDefinitionTestCases; + +namespace Arcane.Operator.Tests.Services; + +public class StreamingJobMaintenanceServiceTests : IClassFixture, IClassFixture, + IClassFixture +{ + private readonly AkkaFixture akkaFixture; + private readonly LoggerFixture loggerFixture; + private readonly ServiceFixture serviceFixture; + + public StreamingJobMaintenanceServiceTests(ServiceFixture serviceFixture, LoggerFixture loggerFixture, + AkkaFixture akkaFixture) + { + this.serviceFixture = serviceFixture; + this.loggerFixture = loggerFixture; + this.akkaFixture = akkaFixture; + } + + [Theory] + [MemberData(nameof(GenerateCompletedJobTestCases))] + public async Task HandleCompletedJob(V1Job job, bool definitionExists, bool fullLoad, bool expectRestart) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + var mockSource = Source.From(new List<(WatchEventType, V1Job)> + { + (WatchEventType.Deleted, job) + }); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamJobEvents(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny())) + .Returns(mockSource); + + this.serviceFixture + .MockStreamDefinitionRepository + .Setup(s => s.GetStreamDefinition(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(() => + definitionExists ? Mock.Of().AsOption() : Option.None); + var service = this.CreateService(); + + // Act + await service.GetJobEventsGraph(CancellationToken.None).Run(this.akkaFixture.Materializer); + + // Assert + this.serviceFixture.MockStreamingJobOperatorService + .Verify(s => s.StartRegisteredStream(It.IsAny(), fullLoad), + Times.Exactly(definitionExists && expectRestart ? 1 : 0)); + } + + [Theory] + [MemberData(nameof(GenerateModifiedJobCases))] + public async Task HandleIntermediateJobModifiedEvent(V1Job job, bool expectToStopJob) + { + // Arrange + this.serviceFixture.MockStreamingJobOperatorService.Invocations.Clear(); + var mockSource = Source.From(new List<(WatchEventType, V1Job)> + { + (WatchEventType.Modified, job) + }); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamJobEvents(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny())) + .Returns(mockSource); + + var service = this.CreateService(); + + // Act + await service.GetJobEventsGraph(CancellationToken.None).Run(this.akkaFixture.Materializer); + + this.serviceFixture + .MockStreamingJobOperatorService + .Verify(s => s.DeleteJob(It.IsAny(), It.IsAny()), + Times.Exactly(expectToStopJob ? 1 : 0) + ); + } + + [Theory] + [MemberData(nameof(GenerateAddedJobTestCases))] + public async Task HandleAddedJob(string expectedPhase, V1Job job, bool expectToChangeState) + { + // Arrange + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + var mockSource = Source.From(new List<(WatchEventType, V1Job)> + { + (WatchEventType.Added, job) + }); + this.serviceFixture + .MockKubeCluster.Setup(cluster => + cluster.StreamJobEvents(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny())) + .Returns(mockSource); + + var service = this.CreateService(); + + // Act + await service.GetJobEventsGraph(CancellationToken.None).Run(this.akkaFixture.Materializer); + + this.serviceFixture + .MockStreamDefinitionRepository + .Verify(s => s.SetStreamStatus( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.Is(cs => cs.Phase == expectedPhase)), + Times.Exactly(expectToChangeState ? 1 : 0)); + } + + [Theory] + [MemberData(nameof(GenerateDeletedJobTestCases))] + public async Task HandleDeletedJob(V1Job job, IStreamDefinition streamDefinition, bool expectToRestart, + bool expectFullLoad) + { + // Arrange + this.serviceFixture.MockStreamDefinitionRepository.Invocations.Clear(); + var mockSource = Source.From(new List<(WatchEventType, V1Job)> + { + (WatchEventType.Deleted, job) + }); + this.serviceFixture + .MockKubeCluster + .Setup(cluster => + cluster.StreamJobEvents(It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny())) + .Returns(mockSource); + + this.serviceFixture + .MockStreamDefinitionRepository + .Setup(service => + service.GetStreamDefinition(job.Namespace(), job.GetStreamKind(), job.GetStreamId())) + .ReturnsAsync(streamDefinition.AsOption()); + + + var service = this.CreateService(); + + // Act + await service.GetJobEventsGraph(CancellationToken.None).Run(this.akkaFixture.Materializer); + + this.serviceFixture.MockStreamingJobOperatorService.Verify(s => + s.StartRegisteredStream(streamDefinition, expectFullLoad), + Times.Exactly(expectToRestart ? 1 : 0) + ); + } + + public static IEnumerable GenerateDeletedJobTestCases() + { + yield return new object[] { CompletedJob, StreamDefinition, true, false }; + yield return new object[] { ReloadRequestedJob, StreamDefinition, true, true }; + yield return new object[] { SchemaMismatchJob, StreamDefinition, true, true }; + + yield return new object[] { CompletedJob, SuspendedStreamDefinition, false, false }; + yield return new object[] { ReloadRequestedJob, SuspendedStreamDefinition, false, true }; + yield return new object[] { SchemaMismatchJob, SuspendedStreamDefinition, false, true }; + } + + public static IEnumerable GenerateAddedJobTestCases() + { + yield return new object[] { StreamPhase.RUNNING.ToString(), RunningJob, true }; + yield return new object[] { StreamPhase.RELOADING.ToString(), ReloadingJob, true }; + } + + public static IEnumerable GenerateCompletedJobTestCases() + { + yield return new object[] { FailedJob, true, false, false }; + yield return new object[] { FailedJob, false, false, false }; + + yield return new object[] { CompletedJob, true, false, true }; + yield return new object[] { CompletedJob, false, false, true }; + + yield return new object[] { RunningJob, true, false, true }; + yield return new object[] { RunningJob, false, false, true }; + + yield return new object[] { SchemaMismatchJob, true, true, true }; + yield return new object[] { SchemaMismatchJob, false, true, true }; + + yield return new object[] { ReloadRequestedJob, true, true, true }; + yield return new object[] { ReloadRequestedJob, false, true, true }; + } + + public static IEnumerable GenerateModifiedJobCases() + { + yield return new object[] { TerminatingJob, false }; + yield return new object[] { ReloadRequestedJob, true }; + yield return new object[] { CompletedJob, false }; + yield return new object[] { FailedJob, false }; + yield return new object[] { RunningJob, false }; + yield return new object[] { SchemaMismatchJob, false }; + } + + + private StreamingJobMaintenanceService CreateService() + { + return new ServiceCollection() + .AddSingleton(this.serviceFixture.MockKubeCluster.Object) + .AddSingleton(this.serviceFixture.MockStreamDefinitionRepository.Object) + .AddSingleton(this.serviceFixture.MockStreamInteractionService.Object) + .AddSingleton(this.serviceFixture.MockStreamingJobOperatorService.Object) + .AddSingleton(this.loggerFixture.Factory.CreateLogger()) + .AddSingleton(this.loggerFixture.Factory.CreateLogger()) + .AddSingleton() + .AddSingleton(Options.Create(new StreamingJobMaintenanceServiceConfiguration + { + MaxBufferCapacity = 1000, Parallelism = 1 + })) + .AddSingleton(Options.Create(new StreamingJobOperatorServiceConfiguration())) + .AddSingleton() + .AddSingleton(this.akkaFixture.Materializer) + .BuildServiceProvider() + .GetRequiredService(); + } +} diff --git a/test/Services/TestCases/FailedStreamDefinition.cs b/test/Services/TestCases/FailedStreamDefinition.cs new file mode 100644 index 0000000..ef340a1 --- /dev/null +++ b/test/Services/TestCases/FailedStreamDefinition.cs @@ -0,0 +1,58 @@ +using System; +using System.Collections.Generic; +using Arcane.Operator.StreamDefinitions.Base; +using k8s.Models; + +namespace Arcane.Operator.Tests.Services.TestCases; + +/// +/// A stream definition that throws an exception (for tests) +/// +public class FailedStreamDefinition : IStreamDefinition +{ + private readonly Exception exception; + + public FailedStreamDefinition(Exception exception) + { + this.exception = exception; + } + + public string ApiVersion + { + get => throw this.exception; + set => throw this.exception; + } + + public string Kind + { + get => throw this.exception; + set => throw this.exception; + } + + public V1ObjectMeta Metadata + { + get => throw this.exception; + set => throw this.exception; + } + + public string StreamId => throw this.exception; + public bool Suspended => throw this.exception; + public bool ReloadRequested => throw this.exception; + public V1TypedLocalObjectReference JobTemplateRef => throw this.exception; + public V1TypedLocalObjectReference ReloadingJobTemplateRef => throw this.exception; + + public IEnumerable ToV1EnvFromSources() + { + throw this.exception; + } + + public Dictionary ToEnvironment(bool fullLoad) + { + throw this.exception; + } + + public string GetConfigurationChecksum() + { + throw this.exception; + } +} diff --git a/test/Services/TestCases/JobTestCases.cs b/test/Services/TestCases/JobTestCases.cs new file mode 100644 index 0000000..a98d1f9 --- /dev/null +++ b/test/Services/TestCases/JobTestCases.cs @@ -0,0 +1,72 @@ +using System; +using System.Collections.Generic; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.Extensions; +using k8s.Models; +using Snd.Sdk.Kubernetes; + +namespace Arcane.Operator.Tests.Services.TestCases; + +public static class JobTestCases +{ + public static V1Job FailedJob => CreateJob(new List + { new() { Type = "Failed", Status = "True" } }) + .WithStreamingJobLabels("1", false, string.Empty); + + public static V1Job CompletedJob => CreateJob(new List + { new() { Type = "Complete", Status = "True" } }) + .WithStreamingJobLabels("1", false, string.Empty); + + public static V1Job ReloadRequestedJob => CompletedJob + .Clone() + .WithAnnotations(new Dictionary + { + { Annotations.STATE_ANNOTATION_KEY, Annotations.RELOADING_STATE_ANNOTATION_VALUE } + }); + + public static V1Job TerminatingJob => CompletedJob + .Clone() + .WithAnnotations(new Dictionary + { + { Annotations.STATE_ANNOTATION_KEY, Annotations.TERMINATING_STATE_ANNOTATION_VALUE } + }); + + public static V1Job ReloadingJob => CreateJob(new List + { new() { Type = "Complete", Status = "True" } }) + .Clone() + .WithStreamingJobLabels(Guid.NewGuid().ToString(), true, string.Empty); + + public static V1Job RunningJob => CreateJob(null) + .WithStreamingJobLabels("1", false, string.Empty); + + public static V1Job SchemaMismatchJob => RunningJob + .Clone() + .WithAnnotations(new Dictionary + { + { Annotations.STATE_ANNOTATION_KEY, Annotations.SCHEMA_MISMATCH_STATE_ANNOTATION_VALUE } + }); + + public static V1Job JobWithChecksum(string checksum) + { + return RunningJob + .Clone() + .WithStreamingJobAnnotations(checksum); + } + + private static V1Job CreateJob(List conditions) + { + var job = new V1Job + { + Metadata = new V1ObjectMeta(), + Spec = new V1JobSpec + { + Template = new V1PodTemplateSpec + { + Metadata = new V1ObjectMeta() + } + }, + Status = new V1JobStatus { Conditions = conditions } + }; + return job; + } +} diff --git a/test/Services/TestCases/StreamDefinitionTestCases.cs b/test/Services/TestCases/StreamDefinitionTestCases.cs new file mode 100644 index 0000000..30c6374 --- /dev/null +++ b/test/Services/TestCases/StreamDefinitionTestCases.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using Arcane.Models.StreamingJobLifecycle; +using Arcane.Operator.StreamDefinitions; +using Arcane.Operator.StreamDefinitions.Base; +using k8s.Models; + +namespace Arcane.Operator.Tests.Services.TestCases; + +public static class StreamDefinitionTestCases +{ + public static IStreamDefinition StreamDefinition => new StreamDefinition + { + Spec = new StreamDefinitionSpec(), + Metadata = new V1ObjectMeta + { + Name = "stream" + } + }; + + public static IStreamDefinition SuspendedStreamDefinition => new StreamDefinition + { + Spec = new StreamDefinitionSpec(), + Metadata = new V1ObjectMeta + { + Name = "stream", + Annotations = new Dictionary + { + { Annotations.STATE_ANNOTATION_KEY, Annotations.SUSPENDED_STATE_ANNOTATION_VALUE } + } + } + }; + + public static IStreamDefinition ReloadRequestedStreamDefinition => new StreamDefinition + { + Spec = new StreamDefinitionSpec(), + Metadata = new V1ObjectMeta + { + Name = "stream", + Annotations = new Dictionary + { + { Annotations.STATE_ANNOTATION_KEY, Annotations.RELOADING_STATE_ANNOTATION_VALUE } + } + } + }; + + public static FailedStreamDefinition FailedStreamDefinition(Exception exception) + { + return new FailedStreamDefinition(exception); + } +} diff --git a/test/UnitTest1.cs b/test/UnitTest1.cs deleted file mode 100644 index 269095c..0000000 --- a/test/UnitTest1.cs +++ /dev/null @@ -1,11 +0,0 @@ -using Xunit; - -namespace DotnetProject.Tests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - } -} \ No newline at end of file