Skip to content

Commit

Permalink
Bump SnD.Sdk version
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy committed Aug 15, 2024
1 parent 7b55d37 commit e071c3f
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 120 deletions.
4 changes: 1 addition & 3 deletions src/Arcane.Operator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="SnD.Sdk" Version="1.1.0"/>
<PackageReference Include="SnD.Sdk" Version="1.2.4"/>
</ItemGroup>

<ItemGroup>
Expand All @@ -17,6 +17,4 @@
</None>
</ItemGroup>



</Project>
2 changes: 1 addition & 1 deletion src/Services/CommandHandlers/StreamingJobCommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,6 @@ private V1Job BuildJob(Option<IStreamingJobTemplate> jobTemplate, IStreamDefinit
.WithMetadataAnnotations(streamClass)
.WithCustomEnvironment(streamDefinition.ToV1EnvFromSources(streamClass))
.WithCustomEnvironment(streamDefinition.ToEnvironment(isBackfilling, streamClass))
.WithOwnerReference(streamDefinition)
.WithOwnerReference(streamDefinition.ApiVersion, streamDefinition.Kind, streamDefinition.Metadata)
.WithName(streamDefinition.StreamId);
}
106 changes: 0 additions & 106 deletions src/Services/Metrics/Actors/MetricsPublisherActor.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using SnD.Sdk.Metrics.Actors;
using Snd.Sdk.Metrics.Base;

namespace Arcane.Operator.Services.Metrics.Actors;

/// <summary>
/// The actor that emits a single value metric for each stream class that is online.
/// </summary>
public class StreamClassOnlineMetricsPublisherActor : MetricsPublisherActor
{
public StreamClassOnlineMetricsPublisherActor(TimeSpan initialDelay, TimeSpan emitInterval, MetricsService metricsService)
: base(initialDelay, emitInterval, metricsService)
{
}

/// <inheritdoc cref="MetricsPublisherActor.EmitMetric"/>
protected override void EmitMetric(MetricsService metricsService, string name, int value, SortedDictionary<string, string> tags)
{
metricsService.Count(name, value, tags);
}
}
14 changes: 8 additions & 6 deletions src/Services/Metrics/MetricsReporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using k8s;
using k8s.Models;
using Microsoft.Extensions.Options;
using SnD.Sdk.Metrics.Actors;
using Snd.Sdk.Metrics.Base;

namespace Arcane.Operator.Services.Metrics;
Expand All @@ -24,22 +25,23 @@ public MetricsReporter(MetricsService metricsService, ActorSystem actorSystem,
IOptions<MetricsReporterConfiguration> metricsReporterConfiguration)
{
this.metricsService = metricsService;
this.statusActor = actorSystem.ActorOf(Props.Create(() => new MetricsPublisherActor(
metricsReporterConfiguration.Value.MetricsPublisherActorConfiguration,
metricsService)),
nameof(MetricsPublisherActor));
this.statusActor = actorSystem.StartMetricsPublisher(() =>
new StreamClassOnlineMetricsPublisherActor(
metricsReporterConfiguration.Value.MetricsPublisherActorConfiguration.InitialDelay,
metricsReporterConfiguration.Value.MetricsPublisherActorConfiguration.UpdateInterval,
this.metricsService));
}

/// <inheritdoc cref="IMetricsReporter.ReportStatusMetrics"/>
public SetStreamClassStatusCommand ReportStatusMetrics(SetStreamClassStatusCommand command)
{
if (command.phase.IsFinal())
{
this.statusActor.Tell(new RemoveStreamClassMetricsMessage(command.streamClass.KindRef));
this.statusActor.Tell(new RemoveMetricMessage(command.streamClass.KindRef));
}
else
{
var msg = new AddStreamClassMetricsMessage(command.streamClass.KindRef, "stream_class", command.GetMetricsTags());
var msg = new AddMetricMessage(command.streamClass.KindRef, "stream_class", command.GetMetricsTags());
this.statusActor.Tell(msg);
}
return command;
Expand Down
8 changes: 4 additions & 4 deletions src/Services/Operators/StreamClassOperatorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@ public class StreamClassOperatorService : IStreamClassOperatorService

private readonly ILogger<StreamClassOperatorService> logger;
private readonly IStreamClassRepository streamClassRepository;
private readonly IMetricsReporter metricsService;
private readonly IMetricsReporter metricsReporter;
private readonly IStreamOperatorService streamOperatorService;
private readonly CustomResourceApiRequest request;
private readonly ICommandHandler<SetStreamClassStatusCommand> streamClassStatusCommandHandler;

public StreamClassOperatorService(IOptions<StreamClassOperatorServiceConfiguration> streamOperatorServiceOptions,
IStreamClassRepository streamClassRepository,
IMetricsReporter metricsService,
IMetricsReporter metricsReporter,
ILogger<StreamClassOperatorService> logger,
ICommandHandler<SetStreamClassStatusCommand> streamClassStatusCommandHandler,
IStreamOperatorService streamOperatorService)
{
this.configuration = streamOperatorServiceOptions.Value;
this.logger = logger;
this.streamClassRepository = streamClassRepository;
this.metricsService = metricsService;
this.metricsReporter = metricsReporter;
this.streamOperatorService = streamOperatorService;
this.streamClassStatusCommandHandler = streamClassStatusCommandHandler;
this.request = new CustomResourceApiRequest(
Expand All @@ -70,7 +70,7 @@ public IRunnableGraph<Task> GetStreamClassEventsGraph(CancellationToken cancella
.Via(cancellationToken.AsFlow<ResourceEvent<IStreamClass>>(true))
.Select(this.OnEvent)
.CollectOption()
.Select(streamClass => this.metricsService.ReportStatusMetrics(streamClass))
.Select(streamClass => this.metricsReporter.ReportStatusMetrics(streamClass))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(this.HandleError))
.ToMaterialized(sink, Keep.Right);
}
Expand Down

0 comments on commit e071c3f

Please sign in to comment.