Skip to content

Commit

Permalink
Move the actual code
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy committed Jul 18, 2024
1 parent 16e7653 commit 88eb70b
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/Arcane.Ingestion.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="SnD.Sdk" Version="1.0.4"/>
<PackageReference Include="Arcane.Framework" Version="0.0.24"/>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
</ItemGroup>

</Project>
46 changes: 46 additions & 0 deletions src/Configurations/JsonIngestionConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Diagnostics.CodeAnalysis;

namespace Arcane.Ingestion.Configurations;

/// <summary>
/// Configuration for a json ingestion endpoint.
/// </summary>
[ExcludeFromCodeCoverage(Justification = "Model")]
public class JsonIngestionConfiguration
{
/// <summary>
/// Size of an Akka MergeHub Buffer for this endpoint.
/// </summary>
public int BufferSize { get; set; }

/// <summary>
/// Document processing rate per <see cref="ThrottleTimespan"/>.
/// </summary>
public int ThrottleDocumentLimit { get; set; }

/// <summary>
/// Number of documents to receive before throttling kicks in.
/// </summary>
public int ThrottleDocumentBurst { get; set; }

/// <summary>
/// Document processing rate (time).
/// </summary>
public TimeSpan ThrottleTimespan { get; set; }

/// <summary>
/// Max number of JSON documents in a single output file.
/// </summary>
public int MaxDocumentsPerFile { get; set; }

/// <summary>
/// Grouping interval for received records.
/// </summary>
public TimeSpan GroupingInterval { get; set; }

/// <summary>
/// Base location to save data in. Must follow format required by underlying storage service (az, s3 etc.).
/// </summary>
public string IngestionSinkPath { get; set; }
}
35 changes: 35 additions & 0 deletions src/Controllers/IngestionController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Text.Json;
using Arcane.Ingestion.Services.Base;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

namespace Arcane.Ingestion.Controllers
{
[ExcludeFromCodeCoverage]
[ApiController]
[Route("[controller]")]
public class IngestionController : ControllerBase
{
private readonly ILogger<IngestionController> logger;
private readonly IIngestionService<JsonDocument> jsonService;

public IngestionController(ILogger<IngestionController> logger, IIngestionService<JsonDocument> jsonService)
{
this.logger = logger;
this.jsonService = jsonService;
}

[HttpPost("json/{source}")]
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
[ProducesResponseType((int)HttpStatusCode.BadRequest)]
public ObjectResult Ingest([FromBody] JsonDocument record, string source)
{
this.logger.LogDebug("Received record for {source}", source);

this.jsonService.Ingest(source, record);
return this.Accepted();
}
}
}
12 changes: 12 additions & 0 deletions src/Metrics/DeclaredMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Arcane.Ingestion.Metrics
{
/// <summary>
/// Metrics published by Arcane.
/// </summary>
public static class DeclaredMetrics
{
public const string ROWS_INCOMING = "rows.incoming";
public const string ROWS_INGESTED = "rows.ingested";
public const string DOCUMENTS_INGESTED = "documents.ingested";
}
}
40 changes: 37 additions & 3 deletions src/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
// See https://aka.ms/new-console-template for more information
using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
using Serilog;
using Snd.Sdk.Logs.Providers;
using Snd.Sdk.Logs.Providers.Configurations;

using System;
namespace Arcane.Ingestion
{
[ExcludeFromCodeCoverage]
public class Program
{
public static int Main(string[] args)
{
Log.Logger = DefaultLoggingProvider.CreateBootstrapLogger(nameof(Arcane));
try
{
Log.Information("Starting web host");
CreateHostBuilder(args).Build().Run();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex, "Host terminated unexpectedly");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}

Console.WriteLine("Hello, World!");
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.AddSerilogLogger(nameof(Arcane), loggerConfiguration => loggerConfiguration.Default().AddDatadog())
.ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); });
}
}
7 changes: 7 additions & 0 deletions src/Services/Base/IIngestionService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Arcane.Ingestion.Services.Base
{
public interface IIngestionService<T>
{
public void Ingest(string destinationName, T row);
}
}
64 changes: 64 additions & 0 deletions src/Services/Streams/JsonIngestionService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Arcane.Framework.Sinks.Json;
using Arcane.Ingestion.Configurations;
using Arcane.Ingestion.Metrics;
using Arcane.Ingestion.Services.Base;
using Microsoft.Extensions.Options;
using Snd.Sdk.Metrics.Base;
using Snd.Sdk.Storage.Base;

namespace Arcane.Ingestion.Services.Streams
{
public class JsonIngestionService : IIngestionService<JsonDocument>
{
private readonly IBlobStorageService blobStorageService;
private readonly IMaterializer materializer;
private readonly JsonIngestionConfiguration serviceConfig;
private readonly MetricsService metricsService;
private readonly IRunnableGraph<Sink<(string, DateTimeOffset, JsonDocument), NotUsed>> graph;
private readonly Sink<(string, DateTimeOffset, JsonDocument), NotUsed> graphSink;

public JsonIngestionService(IOptions<JsonIngestionConfiguration> options, IBlobStorageService blobStorageService, MetricsService metricsService, IMaterializer materializer)
{
this.blobStorageService = blobStorageService;
this.materializer = materializer;
this.serviceConfig = options.Value;
this.metricsService = metricsService;

this.graph = this.GetGraph();
this.graphSink = this.graph.Run(this.materializer);
}

public void Ingest(string destinationName, JsonDocument json)
{
this.metricsService.Increment(DeclaredMetrics.DOCUMENTS_INGESTED, new SortedDictionary<string, string> { { "ingestion_source", destinationName } });
Source.Single((destinationName, DateTimeOffset.UtcNow, json)).RunWith(this.graphSink, this.materializer);
}

private IRunnableGraph<Sink<(string, DateTimeOffset, JsonDocument), NotUsed>> GetGraph()
{
return MergeHub
.Source<(string, DateTimeOffset, JsonDocument)>(perProducerBufferSize: this.serviceConfig.BufferSize)
.Throttle(elements: this.serviceConfig.ThrottleDocumentLimit,
per: this.serviceConfig.ThrottleTimespan,
maximumBurst: this.serviceConfig.ThrottleDocumentBurst,
mode: ThrottleMode.Shaping)
.GroupedWithin(this.serviceConfig.MaxDocumentsPerFile, this.serviceConfig.GroupingInterval)
.SelectMany(batch => batch.GroupBy(v => v.Item1))
.Select(v =>
{
var groupName = v.Key;
var groupRecords = v.Select(grp => (grp.Item2, grp.Item3)).ToList();
this.metricsService.Gauge(DeclaredMetrics.ROWS_INGESTED, groupRecords.Count, new SortedDictionary<string, string> { { "ingestion_source", groupName } });
return (groupName, groupRecords);
})
.To(JsonSink.Create(this.blobStorageService, this.serviceConfig.IngestionSinkPath));
}
}
}
76 changes: 76 additions & 0 deletions src/Startup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Serialization;
using Arcane.Ingestion.Configurations;
using Arcane.Ingestion.Services.Base;
using Arcane.Ingestion.Services.Streams;
using Azure.Data.Tables;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Kubernetes.Providers;
using Snd.Sdk.Metrics.Configurations;
using Snd.Sdk.Metrics.Providers;
using Snd.Sdk.Storage.Providers;
using Snd.Sdk.Storage.Providers.Configurations;

namespace Arcane.Ingestion
{
[ExcludeFromCodeCoverage]
public class Startup
{
public Startup(IConfiguration configuration)
{
this.Configuration = configuration;
}

public IConfiguration Configuration { get; }

public void ConfigureServices(IServiceCollection services)
{
// service config injections
services.Configure<JsonIngestionConfiguration>(this.Configuration.GetSection(nameof(JsonIngestionConfiguration)));


services.AddLocalActorSystem();

services.AddAzureBlob(AzureStorageConfiguration.CreateDefault());
services.AddAzureTable<TableEntity>(AzureStorageConfiguration.CreateDefault());
services.AddDatadogMetrics(DatadogConfiguration.Default(nameof(Arcane)));

services.AddSingleton<IIngestionService<JsonDocument>, JsonIngestionService>();
services.AddKubernetes();

services.AddHealthChecks();

services.AddControllers().AddJsonOptions(options => options.JsonSerializerOptions.Converters.Add(new JsonStringEnumConverter()));
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Arcane", Version = "v1" });
});
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime hostApplicationLifetime)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Arcane v1"));
}

app.UseRouting();

app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapHealthChecks("/health");
});
}
}
}

0 comments on commit 88eb70b

Please sign in to comment.