Skip to content

Commit

Permalink
fix queue depedency injection
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticroentgen committed Apr 17, 2024
1 parent f5213c8 commit 625b5c1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
38 changes: 25 additions & 13 deletions PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@

namespace GithubActionsOrchestrator;

public class RunnerQueue
{

public ConcurrentQueue<CreateRunnerTask?> CreateTasks { get; }
public ConcurrentQueue<DeleteRunnerTask?> DeleteTasks { get; }

public RunnerQueue()
{
CreateTasks = new ConcurrentQueue<CreateRunnerTask>();
DeleteTasks = new ConcurrentQueue<DeleteRunnerTask>();
}
}

public class PoolManager : BackgroundService
{
private readonly CloudController _cc;
Expand All @@ -14,15 +27,14 @@ public class PoolManager : BackgroundService
private static readonly Gauge QueueSize = Metrics
.CreateGauge("github_queue", "Number of queued runner tasks");

public ConcurrentQueue<CreateRunnerTask?> CreateTasks { get; }
public ConcurrentQueue<DeleteRunnerTask?> DeleteTasks { get; }
private readonly RunnerQueue _queues;


public PoolManager(CloudController cc, ILogger<PoolManager> logger)
public PoolManager(CloudController cc, ILogger<PoolManager> logger, RunnerQueue queues)
{
_cc = cc;
_logger = logger;
CreateTasks = new ConcurrentQueue<CreateRunnerTask?>();
DeleteTasks = new ConcurrentQueue<DeleteRunnerTask?>();
_queues = queues;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand Down Expand Up @@ -110,7 +122,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
for (int i = 0; i < missingCt; i++)
{
// Queue VM creation
CreateTasks.Enqueue(new CreateRunnerTask
_queues.CreateTasks.Enqueue(new CreateRunnerTask
{
Arch = arch,
Size = pool.Size,
Expand All @@ -128,11 +140,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

while (!stoppingToken.IsCancellationRequested)
{
QueueSize.Set(CreateTasks.Count + DeleteTasks.Count);
QueueSize.Set(_queues.CreateTasks.Count + _queues.DeleteTasks.Count);

if (DeleteTasks.TryDequeue(out DeleteRunnerTask? dtask))
if (_queues.DeleteTasks.TryDequeue(out DeleteRunnerTask? dtask))
{
_logger.LogInformation($"Current Queue length: C:{CreateTasks.Count} D:{DeleteTasks.Count}");
_logger.LogInformation($"Current Queue length: C:{_queues.CreateTasks.Count} D:{_queues.DeleteTasks.Count}");
if (dtask != null)
{
bool success = await DeleteRunner(dtask);
Expand All @@ -145,9 +157,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
}

if (CreateTasks.TryDequeue(out CreateRunnerTask? task))
if (_queues.CreateTasks.TryDequeue(out CreateRunnerTask? task))
{
_logger.LogInformation($"Current Queue length: C:{CreateTasks.Count} D:{DeleteTasks.Count}");
_logger.LogInformation($"Current Queue length: C:{_queues.CreateTasks.Count} D:{_queues.DeleteTasks.Count}");
if (task != null)
{
bool success = await CreateRunner(task);
Expand Down Expand Up @@ -176,7 +188,7 @@ private async Task<bool> DeleteRunner(DeleteRunnerTask rt)
_logger.LogError(
$"Unable to delete runner [{rt.ServerId} | Retry: {rt.RetryCount}]: {ex.Message}");
rt.RetryCount += 1;
DeleteTasks.Enqueue(rt);
_queues.DeleteTasks.Enqueue(rt);
return false;
}
}
Expand All @@ -194,7 +206,7 @@ private async Task<bool> CreateRunner(CreateRunnerTask rt)
{
_logger.LogError($"Unable to create runner [{rt.Size} on {rt.Arch} | Retry: {rt.RetryCount}]: {ex.Message}");
rt.RetryCount += 1;
CreateTasks.Enqueue(rt);
_queues.CreateTasks.Enqueue(rt);
return false;
}
}
Expand Down
7 changes: 4 additions & 3 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static void Main(string[] args)

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSerilog();
builder.Services.AddSingleton<RunnerQueue>();
builder.Services.AddHostedService<PoolManager>();

// Add services to the container.
Expand All @@ -97,7 +98,7 @@ public static void Main(string[] args)

// Prepare pools
app.MapPost("/github-webhook", async (HttpRequest request, [FromServices] CloudController cloud,
[FromServices] ILogger<Program> logger, [FromServices] PoolManager poolMgr) =>
[FromServices] ILogger<Program> logger, [FromServices] RunnerQueue poolMgr) =>
{
logger.LogInformation("Received GitHub Webhook");
// Verify webhook HMAC - TODO
Expand Down Expand Up @@ -185,7 +186,7 @@ public static void Main(string[] args)
app.Run();
}

private static void JobCompleted(ILogger<Program> logger, long jobId, CloudController cloud, PoolManager poolMgr)
private static void JobCompleted(ILogger<Program> logger, long jobId, CloudController cloud, RunnerQueue poolMgr)
{
logger.LogInformation(
$"Workflow Job {jobId} has completed. Queuing deletion VM associated with Job...");
Expand Down Expand Up @@ -219,7 +220,7 @@ private static void JobInProgress(JsonElement workflowJson, ILogger<Program> log
PickedJobCount.Labels(orgName, jobSize).Inc();
}

private static async Task JobQueued(ILogger<Program> logger, string? repoName, List<string?> labels, string orgName, PoolManager poolMgr)
private static async Task JobQueued(ILogger<Program> logger, string? repoName, List<string?> labels, string orgName, RunnerQueue poolMgr)
{
logger.LogInformation(
$"New Workflow Job was queued for {repoName}. Queuing VM creation to replenish pool...");
Expand Down

0 comments on commit 625b5c1

Please sign in to comment.