diff --git a/cmd/bb_scheduler/main.go b/cmd/bb_scheduler/main.go index 5901eadf..a992ddbf 100644 --- a/cmd/bb_scheduler/main.go +++ b/cmd/bb_scheduler/main.go @@ -141,7 +141,29 @@ func main() { actionRouter, executeAuthorizer, modifyDrainsAuthorizer, - killOperationsAuthorizer) + killOperationsAuthorizer, + ) + + // Force periodic cleanups of stale workers. This also + // happens automatically when RPCs occur, but that's not + // sufficient to ensure Prometheus metrics are updated + // if the final workers disappear. + // + // TODO: Maybe it's better to let InMemoryBuildQueue + // implement prometheus.Collector? Then cleanups can run + // whenever the scheduler is scraped. + dependenciesGroup.Go(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error { + t := time.NewTicker(time.Minute) + for { + select { + case <-t.C: + buildQueue.ForceCleanup() + case <-ctx.Done(): + t.Stop() + return nil + } + } + }) // Create predeclared platform queues. for _, platformQueue := range configuration.PredeclaredPlatformQueues { diff --git a/pkg/scheduler/in_memory_build_queue.go b/pkg/scheduler/in_memory_build_queue.go index 8a11b57c..9ba87196 100644 --- a/pkg/scheduler/in_memory_build_queue.go +++ b/pkg/scheduler/in_memory_build_queue.go @@ -1258,6 +1258,15 @@ func (bq *InMemoryBuildQueue) leave() { bq.lock.Unlock() } +// ForceCleanup forcefully runs any pending cleanup tasks. This method +// can be invoked periodically to ensure that workers are removed, even +// if no other RPC traffic occurs. This ensures that Prometheus metrics +// report the correct values. +func (bq *InMemoryBuildQueue) ForceCleanup() { + bq.enter(bq.clock.Now()) + bq.leave() +} + // getIdleSynchronizeResponse returns a synchronization response that // explicitly instructs a worker to return to the idle state. func (bq *InMemoryBuildQueue) getIdleSynchronizeResponse() *remoteworker.SynchronizeResponse {