From bfadf0afd160827fdc2f7b1f34ada938fd541178 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Thu, 27 Jun 2024 16:42:28 +0200 Subject: [PATCH] [core] Improve handling of Mesos resource offers This commit addresses an issue with timing between a Mesos REVIVE call and its corresponding OFFERS event. Specifically: * The channel that pipes incoming deployment requests into the OFFERS handler is now buffered. * We retry an unsatisfiable task deployment 3 times before giving up. * The deployment request is now passed as a pointer. * The response from the OFFERS handler to task.Manager.acquireTasks now goes through its own channel, one per request. * The deployment request is now enqueued immediately *before* a REVIVE, as opposed to after, in order to prevent a race with Mesos. --- core/task/manager.go | 179 +++++++++++++++++++----------------- core/task/scheduler.go | 32 +++++-- core/task/schedulerstate.go | 16 ++-- 3 files changed, 127 insertions(+), 100 deletions(-) diff --git a/core/task/manager.go b/core/task/manager.go index 3cd99c93..4e01ef7f 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -75,6 +75,7 @@ type ResourceOffersOutcome struct { type ResourceOffersDeploymentRequest struct { tasksToDeploy Descriptors envId uid.ID + outcomeCh chan ResourceOffersOutcome } type Manager struct { @@ -88,8 +89,7 @@ type Manager struct { classes *taskclass.Classes roster *roster - resourceOffersDone <-chan ResourceOffersOutcome - tasksToDeploy chan<- ResourceOffersDeploymentRequest + tasksToDeploy chan<- *ResourceOffersDeploymentRequest reviveOffersTrg chan struct{} cq *controlcommands.CommandQueue @@ -132,18 +132,17 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M roster: newRoster(), internalEventCh: internalEventCh, } - schedulerState, err := NewScheduler(taskman, fidStore, shutdown) + schedState, err := NewScheduler(taskman, fidStore, shutdown) if err != nil { return nil, err } - taskman.schedulerState = schedulerState + taskman.schedulerState = schedState taskman.cq = taskman.schedulerState.commandqueue - taskman.resourceOffersDone = taskman.schedulerState.resourceOffersDone taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg taskman.ackKilledTasks = newAcks() - schedulerState.setupCli() + schedState.setupCli() return } @@ -491,93 +490,107 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e m.deployMu.Lock() - timeReviveOffers := time.Now() - timeDeployMu := time.Now() - m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers - <-m.reviveOffersTrg // we only continue when it's done - utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers", - log.WithField("tasksToRun", len(tasksToRun)). - WithField("partition", envId)) + DEPLOYMENT_ATTEMPTS_LOOP: + for attemptCount := 0; attemptCount < MAX_ATTEMPTS_PER_DEPLOY_REQUEST; attemptCount++ { + // We loop through the deployment attempts until we either succeed or + // reach the maximum number of attempts. In the happy case, we should only + // need to try once. A retry should only be necessary if the Mesos master + // has not been able to provide the resources we need in the offers round + // immediately after reviving. + // We also keep track of the number of attempts made in the request object + // so that the scheduler can decide whether to retry or not. + // The request object is used to pass the tasks to deploy and the outcome + // channel to the deployment routine. + + outcomeCh := make(chan ResourceOffersOutcome) + m.tasksToDeploy <- &ResourceOffersDeploymentRequest{ + tasksToDeploy: tasksToRun, + envId: envId, + outcomeCh: outcomeCh, + } // buffered channel, does not block - m.tasksToDeploy <- ResourceOffersDeploymentRequest{ - tasksToDeploy: tasksToRun, - envId: envId, - } // blocks until received - - log.WithField("partition", envId). - Debugf("scheduler has received request to deploy %d tasks", len(tasksToRun)) - - // IDEA: a flps mesos-role assigned to all mesos agents on flp hosts, and then a static - // reservation for that mesos-role on behalf of our scheduler - - roOutcome := <-m.resourceOffersDone - - utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section", - log.WithField("tasksToRun", len(tasksToRun)). - WithField("partition", envId)) - - m.deployMu.Unlock() - - deployedTasks = roOutcome.deployed - undeployedDescriptors = roOutcome.undeployed - undeployableDescriptors = roOutcome.undeployable - - log.WithField("tasks", deployedTasks). - WithField("partition", envId). - Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks)) - - if len(deployedTasks) != len(tasksToRun) { - // ↑ Not all roles could be deployed. If some were critical, - // we cannot proceed with running this environment. Either way, - // we keep the roles running since they might be useful in the future. log.WithField("partition", envId). - Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks)) - - for _, desc := range undeployedDescriptors { - if desc.TaskRole.GetTaskTraits().Critical == true { - deploymentSuccess = false - undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc) - printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) - log.WithField("partition", envId). - Errorf("critical task deployment failure: %s", printname) - } else { - undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc) - printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) - log.WithField("partition", envId). - Warnf("non-critical task deployment failure: %s", printname) + Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun)) + + timeReviveOffers := time.Now() + timeDeployMu := time.Now() + m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers + <-m.reviveOffersTrg // we only continue when it's done + utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers", + log.WithField("tasksToRun", len(tasksToRun)). + WithField("partition", envId)) + + roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in + + utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section", + log.WithField("tasksToRun", len(tasksToRun)). + WithField("partition", envId)) + + deployedTasks = roOutcome.deployed + undeployedDescriptors = roOutcome.undeployed + undeployableDescriptors = roOutcome.undeployable + + log.WithField("tasks", deployedTasks). + WithField("partition", envId). + Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks)) + + if len(deployedTasks) != len(tasksToRun) { + // ↑ Not all roles could be deployed. If some were critical, + // we cannot proceed with running this environment. Either way, + // we keep the roles running since they might be useful in the future. + log.WithField("partition", envId). + Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks)) + + for _, desc := range undeployedDescriptors { + if desc.TaskRole.GetTaskTraits().Critical == true { + deploymentSuccess = false + undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc) + printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) + log.WithField("partition", envId). + Errorf("critical task deployment failure: %s", printname) + } else { + undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc) + printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) + log.WithField("partition", envId). + Warnf("non-critical task deployment failure: %s", printname) + } } - } - for _, desc := range undeployableDescriptors { - if desc.TaskRole.GetTaskTraits().Critical == true { - deploymentSuccess = false - undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc) - printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) - log.WithField("partition", envId). - Errorf("critical task deployment impossible: %s", printname) - go desc.TaskRole.UpdateStatus(UNDEPLOYABLE) - } else { - undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc) - printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) - log.WithField("partition", envId). - Warnf("non-critical task deployment impossible: %s", printname) + for _, desc := range undeployableDescriptors { + if desc.TaskRole.GetTaskTraits().Critical == true { + deploymentSuccess = false + undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc) + printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) + log.WithField("partition", envId). + Errorf("critical task deployment impossible: %s", printname) + go desc.TaskRole.UpdateStatus(UNDEPLOYABLE) + } else { + undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc) + printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName) + log.WithField("partition", envId). + Warnf("non-critical task deployment impossible: %s", printname) + } } } - } - } - if deploymentSuccess { - // ↑ means all the required critical processes are now running, - // and we are ready to update the envId - for taskPtr, descriptor := range deployedTasks { - taskPtr.SetParent(descriptor.TaskRole) - // Ensure everything is filled out properly - if !taskPtr.IsLocked() { - log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task") - deploymentSuccess = false + if deploymentSuccess { + // ↑ means all the required critical processes are now running, + // and we are ready to update the envId + for taskPtr, descriptor := range deployedTasks { + taskPtr.SetParent(descriptor.TaskRole) + // Ensure everything is filled out properly + if !taskPtr.IsLocked() { + log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task") + deploymentSuccess = false + } + } + break DEPLOYMENT_ATTEMPTS_LOOP } } } + + m.deployMu.Unlock() + if !deploymentSuccess { // While all the required roles are running, for some reason we // can't lock some of them, so we must roll back and keep them diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 832187ed..a126eb68 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -463,8 +463,14 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han var descriptorsStillToDeploy Descriptors envId := uid.NilID() + var deploymentRequestPayload *ResourceOffersDeploymentRequest + + // receive deployment request from channel, if any select { - case deploymentRequestPayload := <-state.tasksToDeploy: + case deploymentRequestPayload = <-state.tasksToDeploy: + if deploymentRequestPayload == nil { + break + } descriptorsStillToDeploy = deploymentRequestPayload.tasksToDeploy envId = deploymentRequestPayload.envId @@ -1051,17 +1057,23 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han } // Notify listeners... - select { - case state.resourceOffersDone <- ResourceOffersOutcome{tasksDeployed, descriptorsStillToDeploy, descriptorsUndeployable}: - log.WithPrefix("scheduler"). - WithField("tasksDeployed", len(tasksDeployed)). - WithField("partition", envId.String()). - Trace("notified listeners on resourceOffers done") - default: - if viper.GetBool("veryVerbose") { + if deploymentRequestPayload != nil { + select { + case deploymentRequestPayload.outcomeCh <- ResourceOffersOutcome{ + deployed: tasksDeployed, + undeployed: descriptorsStillToDeploy, + undeployable: descriptorsUndeployable, + }: log.WithPrefix("scheduler"). + WithField("tasksDeployed", len(tasksDeployed)). WithField("partition", envId.String()). - Trace("no listeners notified") + Trace("notified listeners on resourceOffers done") + default: + if viper.GetBool("veryVerbose") { + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + Trace("no listeners notified") + } } } diff --git a/core/task/schedulerstate.go b/core/task/schedulerstate.go index dd3d8bf4..e35d5d1e 100644 --- a/core/task/schedulerstate.go +++ b/core/task/schedulerstate.go @@ -47,6 +47,11 @@ import ( "github.com/spf13/viper" ) +const ( + MAX_CONCURRENT_DEPLOY_REQUESTS = 100 + MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3 +) + type schedulerState struct { sync.RWMutex @@ -59,10 +64,9 @@ type schedulerState struct { err error // not used in multiple goroutines: - executor *mesos.ExecutorInfo - reviveTokens <-chan struct{} - resourceOffersDone chan ResourceOffersOutcome - tasksToDeploy chan ResourceOffersDeploymentRequest + executor *mesos.ExecutorInfo + reviveTokens <-chan struct{} + tasksToDeploy chan *ResourceOffersDeploymentRequest reviveOffersTrg chan struct{} random *rand.Rand @@ -100,8 +104,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( return nil, err } - resourceOffersDone := make(chan ResourceOffersOutcome) - tasksToDeploy := make(chan ResourceOffersDeploymentRequest) + tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS) reviveOffersTrg := make(chan struct{}) @@ -113,7 +116,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) ( viper.GetDuration("mesosReviveWait"), viper.GetDuration("mesosReviveWait"), nil), - resourceOffersDone: resourceOffersDone, tasksToDeploy: tasksToDeploy, reviveOffersTrg: reviveOffersTrg, wantsTaskResources: mesos.Resources{},