Skip to content

Commit

Permalink
Refactor the StreamClassRepository class (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Apr 9, 2024
1 parent f7719c0 commit f47c762
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 72 deletions.
21 changes: 21 additions & 0 deletions src/Extensions/JsonElementExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System.Text.Json;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Operator.Models.StreamDefinitions;
using Arcane.Operator.Models.StreamDefinitions.Base;

namespace Arcane.Operator.Extensions;

/// <summary>
/// Extension methods for the JsonElement class
/// </summary>
public static class JsonElementExtensions
{
/// <summary>
/// Deserialize the JsonElement to IStreamDefinition object and wrap it in an Option{IStreamDefinition} object
/// </summary>
/// <param name="jsonElement">Element to deserialize</param>
/// <returns></returns>
public static Option<IStreamDefinition> AsOptionalStreamDefinition(this JsonElement jsonElement) =>
jsonElement.Deserialize<StreamDefinition>().AsOption<IStreamDefinition>();
}
6 changes: 4 additions & 2 deletions src/Services/Base/IStreamClassRepository.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using Arcane.Operator.Configurations;
using System.Threading.Tasks;
using Akka.Util;
using Arcane.Operator.Models.StreamClass.Base;

namespace Arcane.Operator.Services.Base;

public interface IStreamClassRepository
{
CustomResourceConfiguration Get(string nameSpace, string kind);
Task<Option<IStreamClass>> Get(string nameSpace, string streamDefinitionKind);
}
138 changes: 68 additions & 70 deletions src/Services/Repositories/StreamDefinitionRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
using System.Text.Json;
using System.Threading.Tasks;
using Akka.Util;
using Akka.Util.Extensions;
using Arcane.Models.StreamingJobLifecycle;
using Arcane.Operator.Models.StreamDefinitions;
using Arcane.Operator.Extensions;
using Arcane.Operator.Models.StreamDefinitions.Base;
using Arcane.Operator.Models.StreamStatuses.StreamStatus.V1Beta1;
using Arcane.Operator.Services.Base;
Expand All @@ -29,86 +28,85 @@ public StreamDefinitionRepository(ILogger<StreamDefinitionRepository> logger,
this.streamClassRepository = streamClassRepository;
}

public Task<Option<IStreamDefinition>> GetStreamDefinition(string nameSpace, string kind, string streamId)
{
var crdConf = this.streamClassRepository.Get(nameSpace, kind);
if (crdConf is { ApiGroup: null, Version: null, Plural: null })
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}
public Task<Option<IStreamDefinition>> GetStreamDefinition(string nameSpace, string kind, string streamId) =>
this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf =>
{
if (crdConf is { HasValue: false })
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}

return this.kubeCluster
.GetCustomResource(
crdConf.ApiGroup,
crdConf.Version,
crdConf.Plural,
nameSpace,
streamId,
element => (IStreamDefinition)element.Deserialize<StreamDefinition>())
.Map(resource => resource.AsOption());
}
return this.kubeCluster
.GetCustomResource(
crdConf.Value.ApiGroupRef,
crdConf.Value.VersionRef,
crdConf.Value.PluralNameRef,
nameSpace,
streamId,
element => element.AsOptionalStreamDefinition());
}
);

public Task<Option<IStreamDefinition>> SetStreamStatus(string nameSpace, string kind, string streamId,
V1Beta1StreamStatus streamStatus)
{
this.logger.LogInformation(
"Status and phase of stream with kind {kind} and id {streamId} changed to {statuses}, {phase}",
kind,
streamId,
string.Join(", ", streamStatus.Conditions.Select(sc => sc.Type)),
streamStatus.Phase);
var crdConf = this.streamClassRepository.Get(nameSpace, kind);
if (crdConf is { ApiGroup: null, Version: null, Plural: null })
V1Beta1StreamStatus streamStatus) =>
this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf =>
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}
if (crdConf is { HasValue: false })
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}

this.logger.LogInformation(
"Status and phase of stream with kind {kind} and id {streamId} changed to {statuses}, {phase}",
kind,
streamId,
string.Join(", ", streamStatus.Conditions.Select(sc => sc.Type)),
streamStatus.Phase);

return this.kubeCluster.UpdateCustomResourceStatus(
crdConf.ApiGroup,
crdConf.Version,
crdConf.Plural,
return this.kubeCluster.UpdateCustomResourceStatus(
crdConf.Value.ApiGroupRef,
crdConf.Value.VersionRef,
crdConf.Value.PluralNameRef,
nameSpace,
streamId,
streamStatus,
element => (IStreamDefinition)element.Deserialize<StreamDefinition>())
.Map(resource => resource.AsOption());
}
element => element.AsOptionalStreamDefinition());
});

public Task<Option<IStreamDefinition>> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId)
{
var crdConf = this.streamClassRepository.Get(nameSpace, kind);
if (crdConf is { ApiGroup: null, Version: null, Plural: null })
public Task<Option<IStreamDefinition>> RemoveReloadingAnnotation(string nameSpace, string kind, string streamId) =>
this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf =>
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}
if (crdConf is { HasValue: false })
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}

return this.kubeCluster
.RemoveObjectAnnotation(crdConf.ToNamespacedCrd(),
Annotations.STATE_ANNOTATION_KEY,
streamId,
nameSpace)
.Map(result => (IStreamDefinition)((JsonElement)result).Deserialize<StreamDefinition>())
.Map(result => result.AsOption());
}
return this.kubeCluster
.RemoveObjectAnnotation(crdConf.Value.ToNamespacedCrd(),
Annotations.STATE_ANNOTATION_KEY,
streamId,
nameSpace)
.Map(result => ((JsonElement)result).AsOptionalStreamDefinition());
});

public Task<Option<IStreamDefinition>> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId)
{
var crdConf = this.streamClassRepository.Get(nameSpace, kind);
if (crdConf is { ApiGroup: null, Version: null, Plural: null })
public Task<Option<IStreamDefinition>> SetCrashLoopAnnotation(string nameSpace, string kind, string streamId) =>
this.streamClassRepository.Get(nameSpace, kind).FlatMap(crdConf =>
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}
if (crdConf is { HasValue: false })
{
this.logger.LogError("Failed to get configuration for kind {kind}", kind);
return Task.FromResult(Option<IStreamDefinition>.None);
}

return this.kubeCluster
.AnnotateObject(crdConf.ToNamespacedCrd(),
Annotations.STATE_ANNOTATION_KEY,
Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE,
streamId,
nameSpace)
.Map(result => ((IStreamDefinition)((JsonElement)result).Deserialize<StreamDefinition>()).AsOption());
}
return this.kubeCluster
.AnnotateObject(crdConf.Value.ToNamespacedCrd(),
Annotations.STATE_ANNOTATION_KEY,
Annotations.CRASH_LOOP_STATE_ANNOTATION_VALUE,
streamId,
nameSpace)
.Map(result => ((JsonElement)result).AsOptionalStreamDefinition());
});
}

0 comments on commit f47c762

Please sign in to comment.