Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Improve handling of Mesos resource offers #582

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 96 additions & 83 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ResourceOffersOutcome struct {
type ResourceOffersDeploymentRequest struct {
tasksToDeploy Descriptors
envId uid.ID
outcomeCh chan ResourceOffersOutcome
}

type Manager struct {
Expand All @@ -88,8 +89,7 @@ type Manager struct {
classes *taskclass.Classes
roster *roster

resourceOffersDone <-chan ResourceOffersOutcome
tasksToDeploy chan<- ResourceOffersDeploymentRequest
tasksToDeploy chan<- *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
cq *controlcommands.CommandQueue
Expand Down Expand Up @@ -132,18 +132,17 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
roster: newRoster(),
internalEventCh: internalEventCh,
}
schedulerState, err := NewScheduler(taskman, fidStore, shutdown)
schedState, err := NewScheduler(taskman, fidStore, shutdown)
if err != nil {
return nil, err
}
taskman.schedulerState = schedulerState
taskman.schedulerState = schedState
taskman.cq = taskman.schedulerState.commandqueue
taskman.resourceOffersDone = taskman.schedulerState.resourceOffersDone
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
taskman.ackKilledTasks = newAcks()

schedulerState.setupCli()
schedState.setupCli()

return
}
Expand Down Expand Up @@ -491,93 +490,107 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e

m.deployMu.Lock()

timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))
DEPLOYMENT_ATTEMPTS_LOOP:
for attemptCount := 0; attemptCount < MAX_ATTEMPTS_PER_DEPLOY_REQUEST; attemptCount++ {
// We loop through the deployment attempts until we either succeed or
// reach the maximum number of attempts. In the happy case, we should only
// need to try once. A retry should only be necessary if the Mesos master
// has not been able to provide the resources we need in the offers round
// immediately after reviving.
// We also keep track of the number of attempts made in the request object
// so that the scheduler can decide whether to retry or not.
// The request object is used to pass the tasks to deploy and the outcome
// channel to the deployment routine.

outcomeCh := make(chan ResourceOffersOutcome)
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
tasksToDeploy: tasksToRun,
envId: envId,
outcomeCh: outcomeCh,
} // buffered channel, does not block

m.tasksToDeploy <- ResourceOffersDeploymentRequest{
tasksToDeploy: tasksToRun,
envId: envId,
} // blocks until received

log.WithField("partition", envId).
Debugf("scheduler has received request to deploy %d tasks", len(tasksToRun))

// IDEA: a flps mesos-role assigned to all mesos agents on flp hosts, and then a static
// reservation for that mesos-role on behalf of our scheduler

roOutcome := <-m.resourceOffersDone

utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

m.deployMu.Unlock()

deployedTasks = roOutcome.deployed
undeployedDescriptors = roOutcome.undeployed
undeployableDescriptors = roOutcome.undeployable

log.WithField("tasks", deployedTasks).
WithField("partition", envId).
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))

if len(deployedTasks) != len(tasksToRun) {
// ↑ Not all roles could be deployed. If some were critical,
// we cannot proceed with running this environment. Either way,
// we keep the roles running since they might be useful in the future.
log.WithField("partition", envId).
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))

for _, desc := range undeployedDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment failure: %s", printname)
} else {
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment failure: %s", printname)
Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun))

timeReviveOffers := time.Now()
timeDeployMu := time.Now()
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
<-m.reviveOffersTrg // we only continue when it's done
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in

utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section",
log.WithField("tasksToRun", len(tasksToRun)).
WithField("partition", envId))

deployedTasks = roOutcome.deployed
undeployedDescriptors = roOutcome.undeployed
undeployableDescriptors = roOutcome.undeployable

log.WithField("tasks", deployedTasks).
WithField("partition", envId).
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))

if len(deployedTasks) != len(tasksToRun) {
// ↑ Not all roles could be deployed. If some were critical,
// we cannot proceed with running this environment. Either way,
// we keep the roles running since they might be useful in the future.
Comment on lines +538 to +540
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. Where is the part of code which tries to re-deploy only the undeployed tasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the comment is misleading. They would be useful in the future if task reuse was enabled, but it was disabled some years ago since start-stop-start wasn't there yet. In case of partial deployment we'd actually fail, but to my knowledge the current undeployable cases we have are actually early failures where no tasks are deployed.

log.WithField("partition", envId).
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))

for _, desc := range undeployedDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment failure: %s", printname)
} else {
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment failure: %s", printname)
}
}
}

for _, desc := range undeployableDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment impossible: %s", printname)
go desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
} else {
undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment impossible: %s", printname)
for _, desc := range undeployableDescriptors {
if desc.TaskRole.GetTaskTraits().Critical == true {
deploymentSuccess = false
undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Errorf("critical task deployment impossible: %s", printname)
go desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
} else {
undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc)
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
log.WithField("partition", envId).
Warnf("non-critical task deployment impossible: %s", printname)
}
}
}
}
}

if deploymentSuccess {
// ↑ means all the required critical processes are now running,
// and we are ready to update the envId
for taskPtr, descriptor := range deployedTasks {
taskPtr.SetParent(descriptor.TaskRole)
// Ensure everything is filled out properly
if !taskPtr.IsLocked() {
log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task")
deploymentSuccess = false
if deploymentSuccess {
// ↑ means all the required critical processes are now running,
// and we are ready to update the envId
for taskPtr, descriptor := range deployedTasks {
taskPtr.SetParent(descriptor.TaskRole)
// Ensure everything is filled out properly
if !taskPtr.IsLocked() {
log.WithField("task", taskPtr.taskId).Warning("cannot lock newly deployed task")
deploymentSuccess = false
}
}
break DEPLOYMENT_ATTEMPTS_LOOP
}
}
}

m.deployMu.Unlock()

if !deploymentSuccess {
// While all the required roles are running, for some reason we
// can't lock some of them, so we must roll back and keep them
Expand Down
32 changes: 22 additions & 10 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,14 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
var descriptorsStillToDeploy Descriptors
envId := uid.NilID()

var deploymentRequestPayload *ResourceOffersDeploymentRequest

// receive deployment request from channel, if any
select {
case deploymentRequestPayload := <-state.tasksToDeploy:
case deploymentRequestPayload = <-state.tasksToDeploy:
if deploymentRequestPayload == nil {
break
}
descriptorsStillToDeploy = deploymentRequestPayload.tasksToDeploy
envId = deploymentRequestPayload.envId

Expand Down Expand Up @@ -1051,17 +1057,23 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
}

// Notify listeners...
select {
case state.resourceOffersDone <- ResourceOffersOutcome{tasksDeployed, descriptorsStillToDeploy, descriptorsUndeployable}:
log.WithPrefix("scheduler").
WithField("tasksDeployed", len(tasksDeployed)).
WithField("partition", envId.String()).
Trace("notified listeners on resourceOffers done")
default:
if viper.GetBool("veryVerbose") {
if deploymentRequestPayload != nil {
select {
case deploymentRequestPayload.outcomeCh <- ResourceOffersOutcome{
deployed: tasksDeployed,
undeployed: descriptorsStillToDeploy,
undeployable: descriptorsUndeployable,
}:
log.WithPrefix("scheduler").
WithField("tasksDeployed", len(tasksDeployed)).
WithField("partition", envId.String()).
Trace("no listeners notified")
Trace("notified listeners on resourceOffers done")
default:
if viper.GetBool("veryVerbose") {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
Trace("no listeners notified")
}
}
}

Expand Down
16 changes: 9 additions & 7 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ import (
"github.com/spf13/viper"
)

const (
MAX_CONCURRENT_DEPLOY_REQUESTS = 100
MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3
)

type schedulerState struct {
sync.RWMutex

Expand All @@ -59,10 +64,9 @@ type schedulerState struct {
err error

// not used in multiple goroutines:
executor *mesos.ExecutorInfo
reviveTokens <-chan struct{}
resourceOffersDone chan ResourceOffersOutcome
tasksToDeploy chan ResourceOffersDeploymentRequest
executor *mesos.ExecutorInfo
reviveTokens <-chan struct{}
tasksToDeploy chan *ResourceOffersDeploymentRequest

reviveOffersTrg chan struct{}
random *rand.Rand
Expand Down Expand Up @@ -100,8 +104,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
return nil, err
}

resourceOffersDone := make(chan ResourceOffersOutcome)
tasksToDeploy := make(chan ResourceOffersDeploymentRequest)
tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)

reviveOffersTrg := make(chan struct{})

Expand All @@ -113,7 +116,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
viper.GetDuration("mesosReviveWait"),
viper.GetDuration("mesosReviveWait"),
nil),
resourceOffersDone: resourceOffersDone,
tasksToDeploy: tasksToDeploy,
reviveOffersTrg: reviveOffersTrg,
wantsTaskResources: mesos.Resources{},
Expand Down
Loading