Skip to content

Commit

Permalink
seperate create and delete queues
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticroentgen committed Apr 17, 2024
1 parent 0b332c0 commit f5213c8
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 36 deletions.
10 changes: 7 additions & 3 deletions RunnerTask.cs → CreateRunnerTask.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
namespace GithubActionsOrchestrator;

public record RunnerTask
public record CreateRunnerTask
{
public string Arch { get; set; }
public string Size { get; set; }
public string RunnerToken { get; set; }
public string OrgName { get; set; }
public int RetryCount { get; set; }
public RunnerAction Action { get; set; }
public long ServerId { get; set; }
}
}
public record DeleteRunnerTask
{
public int RetryCount { get; set; }
public long ServerId { get; set; }
}
50 changes: 29 additions & 21 deletions PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ public class PoolManager : BackgroundService
private static readonly Gauge QueueSize = Metrics
.CreateGauge("github_queue", "Number of queued runner tasks");

public ConcurrentQueue<RunnerTask?> Tasks { get; }
public ConcurrentQueue<CreateRunnerTask?> CreateTasks { get; }
public ConcurrentQueue<DeleteRunnerTask?> DeleteTasks { get; }

public PoolManager(CloudController cc, ILogger<PoolManager> logger)
{
_cc = cc;
_logger = logger;
Tasks = new ConcurrentQueue<RunnerTask?>();
CreateTasks = new ConcurrentQueue<CreateRunnerTask?>();
DeleteTasks = new ConcurrentQueue<DeleteRunnerTask?>();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand Down Expand Up @@ -108,9 +110,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
for (int i = 0; i < missingCt; i++)
{
// Queue VM creation
Tasks.Enqueue(new RunnerTask
CreateTasks.Enqueue(new CreateRunnerTask
{
Action = RunnerAction.Create,
Arch = arch,
Size = pool.Size,
RunnerToken = runnerToken,
Expand All @@ -127,26 +128,33 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

while (!stoppingToken.IsCancellationRequested)
{
QueueSize.Set(Tasks.Count);
if (Tasks.TryDequeue(out RunnerTask? task))
QueueSize.Set(CreateTasks.Count + DeleteTasks.Count);

if (DeleteTasks.TryDequeue(out DeleteRunnerTask? dtask))
{
_logger.LogInformation($"Current Queue length: {Tasks.Count}");
if (task != null)
_logger.LogInformation($"Current Queue length: C:{CreateTasks.Count} D:{DeleteTasks.Count}");
if (dtask != null)
{
bool success = false;
switch (task.Action)
bool success = await DeleteRunner(dtask);
if (!success)
{
case RunnerAction.Create:
success = await CreateRunner(task);
break;
case RunnerAction.Delete:
success = await DeleteRunner(task);
break;
// Deletion didn't succeed. Let's hold processing runners for a minute
_logger.LogWarning("Encountered a problem deleting runners. Will hold queue processing for 1 minute.");
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
}

if (CreateTasks.TryDequeue(out CreateRunnerTask? task))
{
_logger.LogInformation($"Current Queue length: C:{CreateTasks.Count} D:{DeleteTasks.Count}");
if (task != null)
{
bool success = await CreateRunner(task);
if (!success)
{
// Creation didn't succeed. Let's hold of creating new runners for a minute
_logger.LogWarning("Encountered a problem creating runners. Will hold creation for 1 minute.");
_logger.LogWarning("Encountered a problem creating runners. Will hold queue processing for 1 minute.");
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
Expand All @@ -156,7 +164,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
}

private async Task<bool> DeleteRunner(RunnerTask rt)
private async Task<bool> DeleteRunner(DeleteRunnerTask rt)
{
try
{
Expand All @@ -168,12 +176,12 @@ private async Task<bool> DeleteRunner(RunnerTask rt)
_logger.LogError(
$"Unable to delete runner [{rt.ServerId} | Retry: {rt.RetryCount}]: {ex.Message}");
rt.RetryCount += 1;
Tasks.Enqueue(rt);
DeleteTasks.Enqueue(rt);
return false;
}
}

private async Task<bool> CreateRunner(RunnerTask rt)
private async Task<bool> CreateRunner(CreateRunnerTask rt)
{
try
{
Expand All @@ -186,7 +194,7 @@ private async Task<bool> CreateRunner(RunnerTask rt)
{
_logger.LogError($"Unable to create runner [{rt.Size} on {rt.Arch} | Retry: {rt.RetryCount}]: {ex.Message}");
rt.RetryCount += 1;
Tasks.Enqueue(rt);
CreateTasks.Enqueue(rt);
return false;
}
}
Expand Down
8 changes: 3 additions & 5 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,9 @@ private static void JobCompleted(ILogger<Program> logger, long jobId, CloudContr
}
else
{
poolMgr.Tasks.Enqueue(new RunnerTask
poolMgr.DeleteTasks.Enqueue(new DeleteRunnerTask
{
Action = RunnerAction.Delete,
ServerId = vm.Id
ServerId = vm.Id,
});
ProcessedJobCount.Labels(vm.OrgName, vm.Size).Inc();

Expand Down Expand Up @@ -265,9 +264,8 @@ private static async Task JobQueued(ILogger<Program> logger, string? repoName, L
return;
}

poolMgr.Tasks.Enqueue(new RunnerTask
poolMgr.CreateTasks.Enqueue(new CreateRunnerTask
{
Action = RunnerAction.Create,
Arch = arch,
Size = size,
RunnerToken = runnerToken,
Expand Down
7 changes: 0 additions & 7 deletions RunnerAction.cs

This file was deleted.

0 comments on commit f5213c8

Please sign in to comment.