diff --git a/src/Extensions/JsonElementExtensions.cs b/src/Extensions/JsonElementExtensions.cs new file mode 100644 index 0000000..692248d --- /dev/null +++ b/src/Extensions/JsonElementExtensions.cs @@ -0,0 +1,21 @@ +using System.Text.Json; +using Akka.Util; +using Akka.Util.Extensions; +using Arcane.Operator.Models.StreamDefinitions; +using Arcane.Operator.Models.StreamDefinitions.Base; + +namespace Arcane.Operator.Extensions; + +/// +/// Extension methods for the JsonElement class +/// +public static class JsonElementExtensions +{ + /// + /// Deserialize the JsonElement to IStreamDefinition object and wrap it in an Option{IStreamDefinition} object + /// + /// Element to deserialize + /// + public static Option AsOptionalStreamDefinition(this JsonElement jsonElement) => + jsonElement.Deserialize().AsOption(); +} diff --git a/src/Services/Base/IStreamClassRepository.cs b/src/Services/Base/IStreamClassRepository.cs index 3249223..6f10f43 100644 --- a/src/Services/Base/IStreamClassRepository.cs +++ b/src/Services/Base/IStreamClassRepository.cs @@ -1,8 +1,10 @@ -using Arcane.Operator.Configurations; +using System.Threading.Tasks; +using Akka.Util; +using Arcane.Operator.Models.StreamClass.Base; namespace Arcane.Operator.Services.Base; public interface IStreamClassRepository { - CustomResourceConfiguration Get(string nameSpace, string kind); + Task> Get(string nameSpace, string streamDefinitionKind); } diff --git a/src/Services/Repositories/StreamDefinitionRepository.cs b/src/Services/Repositories/StreamDefinitionRepository.cs index dff85b5..1180cda 100644 --- a/src/Services/Repositories/StreamDefinitionRepository.cs +++ b/src/Services/Repositories/StreamDefinitionRepository.cs @@ -2,9 +2,8 @@ using System.Text.Json; using System.Threading.Tasks; using Akka.Util; -using Akka.Util.Extensions; using Arcane.Models.StreamingJobLifecycle; -using Arcane.Operator.Models.StreamDefinitions; +using Arcane.Operator.Extensions; using Arcane.Operator.Models.StreamDefinitions.Base; using Arcane.Operator.Models.StreamStatuses.StreamStatus.V1Beta1; using Arcane.Operator.Services.Base; @@ -29,86 +28,85 @@ public StreamDefinitionRepository(ILogger logger, this.streamClassRepository = streamClassRepository; } - public Task> GetStreamDefinition(string nameSpace, string kind, string streamId) - { - var crdConf = this.streamClassRepository.Get(nameSpace, kind); - if (crdConf is { ApiGroup: null, Version: null, Plural: null }) - { - this.logger.LogError("Failed to get configuration for kind {kind}", kind); - return Task.FromResult(Option.None); - } + public Task> GetStreamDefinition(string nameSpace, string kind, string streamId) => + this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf => + { + if (crdConf is { HasValue: false }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } - return this.kubeCluster - .GetCustomResource( - crdConf.ApiGroup, - crdConf.Version, - crdConf.Plural, - nameSpace, - streamId, - element => (IStreamDefinition)element.Deserialize()) - .Map(resource => resource.AsOption()); - } + return this.kubeCluster + .GetCustomResource( + crdConf.Value.ApiGroupRef, + crdConf.Value.VersionRef, + crdConf.Value.PluralNameRef, + nameSpace, + streamId, + element => element.AsOptionalStreamDefinition()); + } + ); public Task> SetStreamStatus(string nameSpace, string kind, string streamId, - V1Beta1StreamStatus streamStatus) - { - this.logger.LogInformation( - "Status and phase of stream with kind {kind} and id {streamId} changed to {statuses}, {phase}", - kind, - streamId, - string.Join(", ", streamStatus.Conditions.Select(sc => sc.Type)), - streamStatus.Phase); - var crdConf = this.streamClassRepository.Get(nameSpace, kind); - if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + V1Beta1StreamStatus streamStatus) => + this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf => { - this.logger.LogError("Failed to get configuration for kind {kind}", kind); - return Task.FromResult(Option.None); - } + if (crdConf is { HasValue: false }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } + + this.logger.LogInformation( + "Status and phase of stream with kind {kind} and id {streamId} changed to {statuses}, {phase}", + kind, + streamId, + string.Join(", ", streamStatus.Conditions.Select(sc => sc.Type)), + streamStatus.Phase); - return this.kubeCluster.UpdateCustomResourceStatus( - crdConf.ApiGroup, - crdConf.Version, - crdConf.Plural, + return this.kubeCluster.UpdateCustomResourceStatus( + crdConf.Value.ApiGroupRef, + crdConf.Value.VersionRef, + crdConf.Value.PluralNameRef, nameSpace, streamId, streamStatus, - element => (IStreamDefinition)element.Deserialize()) - .Map(resource => resource.AsOption()); - } + element => element.AsOptionalStreamDefinition()); + }); - public Task> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId) - { - var crdConf = this.streamClassRepository.Get(nameSpace, kind); - if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + public Task> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId) => + this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf => { - this.logger.LogError("Failed to get configuration for kind {kind}", kind); - return Task.FromResult(Option.None); - } + if (crdConf is { HasValue: false }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } - return this.kubeCluster - .RemoveObjectAnnotation(crdConf.ToNamespacedCrd(), - Annotations.STATE_ANNOTATION_KEY, - streamId, - nameSpace) - .Map(result => (IStreamDefinition)((JsonElement)result).Deserialize()) - .Map(result => result.AsOption()); - } + return this.kubeCluster + .RemoveObjectAnnotation(crdConf.Value.ToNamespacedCrd(), + Annotations.STATE_ANNOTATION_KEY, + streamId, + nameSpace) + .Map(result => ((JsonElement)result).AsOptionalStreamDefinition()); + }); - public Task> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId) - { - var crdConf = this.streamClassRepository.Get(nameSpace, kind); - if (crdConf is { ApiGroup: null, Version: null, Plural: null }) + public Task> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId) => + this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf => { - this.logger.LogError("Failed to get configuration for kind {kind}", kind); - return Task.FromResult(Option.None); - } + if (crdConf is { HasValue: false }) + { + this.logger.LogError("Failed to get configuration for kind {kind}", kind); + return Task.FromResult(Option.None); + } - return this.kubeCluster - .AnnotateObject(crdConf.ToNamespacedCrd(), - Annotations.STATE_ANNOTATION_KEY, - Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE, - streamId, - nameSpace) - .Map(result => ((IStreamDefinition)((JsonElement)result).Deserialize()).AsOption()); - } + return this.kubeCluster + .AnnotateObject(crdConf.Value.ToNamespacedCrd(), + Annotations.STATE_ANNOTATION_KEY, + Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE, + streamId, + nameSpace) + .Map(result => ((JsonElement)result).AsOptionalStreamDefinition()); + }); }