Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [YUNIKORN-1795] Remove the scheduling cycle in the shim #765

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 73 additions & 75 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Application struct {
placeholderTimeoutInSec int64
schedulingStyle string
originatingTask *Task // Original Pod which creates the requests
accepted bool
}

const transitionErr = "no transition"
Expand Down Expand Up @@ -332,80 +333,6 @@ func (app *Application) TriggerAppSubmission() error {
return app.handle(NewSubmitApplicationEvent(app.applicationID))
}

// Schedule is called in every scheduling interval,
// we are not using dispatcher here because we want to
// make state transition in sync mode in order to prevent
// generating too many duplicate events. However, it must
// ensure non of these calls is expensive, usually, they
// do nothing more than just triggering the state transition.
// return true if the app needs scheduling or false if not
func (app *Application) Schedule() bool {
switch app.GetApplicationState() {
case ApplicationStates().New:
ev := NewSubmitApplicationEvent(app.GetApplicationID())
if err := app.handle(ev); err != nil {
log.Log(log.ShimCacheApplication).Warn("failed to handle SUBMIT app event",
zap.Error(err))
}
case ApplicationStates().Accepted:
// once the app is accepted by the scheduler core,
// the next step is to send requests for scheduling
// the app state could be transited to Reserving or Running
// depends on if the app has gang members
app.postAppAccepted()
case ApplicationStates().Reserving:
// during the Reserving state, only the placeholders
// can be scheduled
app.scheduleTasks(func(t *Task) bool {
return t.placeholder
})
if len(app.GetNewTasks()) == 0 {
return false
}
case ApplicationStates().Running:
// during the Running state, only the regular pods
// can be scheduled
app.scheduleTasks(func(t *Task) bool {
return !t.placeholder
})
if len(app.GetNewTasks()) == 0 {
return false
}
default:
log.Log(log.ShimCacheApplication).Debug("skipping scheduling application",
zap.String("appState", app.GetApplicationState()),
zap.String("appID", app.GetApplicationID()),
zap.String("appState", app.GetApplicationState()))
return false
}
return true
}

func (app *Application) scheduleTasks(taskScheduleCondition func(t *Task) bool) {
for _, task := range app.GetNewTasks() {
if taskScheduleCondition(task) {
// for each new task, we do a sanity check before moving the state to Pending_Schedule
if err := task.sanityCheckBeforeScheduling(); err == nil {
// note, if we directly trigger submit task event, it may spawn too many duplicate
// events, because a task might be submitted multiple times before its state transits to PENDING.
if handleErr := task.handle(
NewSimpleTaskEvent(task.applicationID, task.taskID, InitTask)); handleErr != nil {
// something goes wrong when transit task to PENDING state,
// this should not happen because we already checked the state
// before calling the transition. Nowhere to go, just log the error.
log.Log(log.ShimCacheApplication).Warn("init task failed", zap.Error(err))
}
} else {
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "FailedScheduling", "FailedScheduling", err.Error())
log.Log(log.ShimCacheApplication).Debug("task is not ready for scheduling",
zap.String("appID", task.applicationID),
zap.String("taskID", task.taskID),
zap.Error(err))
}
}
}
}

func (app *Application) handleSubmitApplicationEvent() error {
log.Log(log.ShimCacheApplication).Info("handle app submission",
zap.Stringer("app", app),
Expand Down Expand Up @@ -469,10 +396,11 @@ func (app *Application) postAppAccepted() {
// app could have allocated tasks upon a recovery, and in that case,
// the reserving phase has already passed, no need to trigger that again.
var ev events.SchedulingEvent
numAllocatedTasks := len(app.getTasks(TaskStates().Allocated))
log.Log(log.ShimCacheApplication).Debug("postAppAccepted on cached app",
zap.String("appID", app.applicationID),
zap.Int("numTaskGroups", len(app.taskGroups)),
zap.Int("numAllocatedTasks", len(app.GetAllocatedTasks())))
zap.Int("numAllocatedTasks", numAllocatedTasks))
if app.skipReservationStage() {
ev = NewRunApplicationEvent(app.applicationID)
log.Log(log.ShimCacheApplication).Info("Skip the reservation stage",
Expand All @@ -485,6 +413,24 @@ func (app *Application) postAppAccepted() {
dispatcher.Dispatch(ev)
}

func (app *Application) postAppRunning() {
tasks := make([]*Task, 0, len(app.taskMap))
for _, task := range app.taskMap {
if !task.IsPlaceholder() {
tasks = append(tasks, task)
}
}
// sort tasks based on submission time & schedule them
sort.Slice(tasks, func(i, j int) bool {
l := tasks[i]
r := tasks[j]
return l.createTime.Before(r.createTime)
})
for _, task := range tasks {
task.Schedule()
}
}

func (app *Application) onReserving() {
// happens after recovery - if placeholders already exist, we need to send
// an event to trigger Application state change in the core
Expand Down Expand Up @@ -659,3 +605,55 @@ func (app *Application) SetPlaceholderTimeout(timeout int64) {
defer app.lock.Unlock()
app.placeholderTimeoutInSec = timeout
}

func (app *Application) addTaskAndSchedule(task *Task) {
app.lock.Lock()
defer app.lock.Unlock()
if _, ok := app.taskMap[task.taskID]; ok {
// skip adding duplicate task
return
}
app.taskMap[task.taskID] = task

if app.canScheduleTask(task) {
task.Schedule()
}
}

func (app *Application) canScheduleTask(task *Task) bool {
// skip - not yet accepted by the core
if !app.accepted {
return false
}

// can submit if gang scheduling is not used
if len(app.taskGroups) == 0 {
return true
}

// placeholder, or regular task and we're past reservation
ph := task.IsPlaceholder()
currentState := app.sm.Current()
return ph || (!ph && currentState != ApplicationStates().Reserving)
}

func (app *Application) GetNewTasksWithFailedAttempt() []*Task {
app.lock.RLock()
defer app.lock.RUnlock()

taskList := make([]*Task, 0, len(app.taskMap))
for _, task := range app.taskMap {
if task.GetTaskState() == TaskStates().New && task.IsFailedAttempt() {
taskList = append(taskList, task)
}
}

// sort the task based on creation time
sort.Slice(taskList, func(i, j int) bool {
l := taskList[i]
r := taskList[j]
return l.createTime.Before(r.createTime)
})

return taskList
}
9 changes: 9 additions & 0 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,19 @@ func newAppState() *fsm.FSM { //nolint:funlen
zap.String("destination", event.Dst),
zap.String("event", event.Event))
},
states.Accepted: func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.accepted = true
app.postAppAccepted()
},
states.Reserving: func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.onReserving()
},
states.Running: func(ctx context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.postAppRunning()
},
SubmitApplication.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
event.Err = app.handleSubmitApplicationEvent()
Expand Down
106 changes: 91 additions & 15 deletions pkg/cache/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,6 @@ func TestTryReserve(t *testing.T) {
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// run app schedule
app.Schedule()

// since this app has taskGroups defined,
// once the app is accepted, it is expected to see this app goes to Reserving state
assertAppState(t, app, ApplicationStates().Reserving, 3*time.Second)
Expand Down Expand Up @@ -787,15 +784,6 @@ func TestTryReservePostRestart(t *testing.T) {
},
})

// submit the app
err := app.handle(NewSubmitApplicationEvent(app.applicationID))
assert.NilError(t, err)
assertAppState(t, app, ApplicationStates().Submitted, 3*time.Second)

// accepted the app
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// simulate some tasks are recovered during the restart
// create 3 pods, 1 of them is Allocated and the other 2 are New
resources := make(map[v1.ResourceName]resource.Quantity)
Expand Down Expand Up @@ -860,8 +848,11 @@ func TestTryReservePostRestart(t *testing.T) {
assert.Equal(t, len(app.getTasks(TaskStates().Allocated)), 1)
assert.Equal(t, len(app.getTasks(TaskStates().New)), 2)

// run app schedule
app.Schedule()
// submit app & trigger state transition to Accepted
err := app.TriggerAppSubmission()
assert.NilError(t, err)
err = app.handle(NewSimpleApplicationEvent(app.GetApplicationID(), AcceptApplication))
assert.NilError(t, err)

// since this app has Allocated tasks, the Reserving state will be skipped
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
Expand Down Expand Up @@ -1160,7 +1151,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
app := context.GetApplication("app00001")
assert.Assert(t, app != nil)
assert.Equal(t, app.GetApplicationID(), "app00001")
assert.Equal(t, app.GetApplicationState(), ApplicationStates().New)
assert.Equal(t, app.GetApplicationState(), ApplicationStates().Submitted)
assert.Equal(t, app.GetQueue(), "root.a")
assert.Equal(t, len(app.GetNewTasks()), 1)

Expand Down Expand Up @@ -1284,6 +1275,91 @@ func TestApplication_onReservationStateChange(t *testing.T) {
assertAppState(t, app, ApplicationStates().Running, 1*time.Second)
}

func TestAddTaskAndSchedule(t *testing.T) {
context := initContextForTest()
dispatcher.RegisterEventHandler("TestAppHandler", dispatcher.EventTypeApp, context.ApplicationEventHandler())
dispatcher.Start()
defer dispatcher.Stop()

pod := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Name: "pod-test-00001",
UID: "UID-00001",
},
}

// can't schedule - app is not accepted
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
task := NewTask("task01", app, context, pod)
app.addTaskAndSchedule(task)
assert.Equal(t, task.sm.Current(), TaskStates().New)
assert.Assert(t, !task.failedAttempt)

// can schedule task - no gang scheduling
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
task = NewTask("task01", app, context, pod)
app.addTaskAndSchedule(task)

// can schedule task - placeholder
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
app.taskGroups = []TaskGroup{
{
Name: "group1",
MinMember: 3,
},
}
task = NewTaskPlaceholder("task01", app, context, pod)
app.addTaskAndSchedule(task)
assert.Assert(t, !task.IsFailedAttempt())

// can schedule task - state is no longer Reserving
app = NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())
app.accepted = true
app.taskGroups = []TaskGroup{
{
Name: "group1",
MinMember: 3,
},
}
task = NewTask("task01", app, context, pod)
app.sm.SetState(ApplicationStates().Running)
app.addTaskAndSchedule(task)
assert.Assert(t, !task.IsFailedAttempt())
}

func TestGetNewTasksWithFailedAttempt(t *testing.T) {
context := initContextForTest()
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, newMockSchedulerAPI())

task1 := NewTask("task01", app, context, &v1.Pod{})
task1.setFailedAttempt(true)
task1.createTime = time.UnixMilli(100)
task2 := NewTask("task02", app, context, &v1.Pod{})
task3 := NewTask("task03", app, context, &v1.Pod{})
task3.setFailedAttempt(true)
task3.createTime = time.UnixMilli(50)
task4 := NewTask("task04", app, context, &v1.Pod{})
task4.setFailedAttempt(true)
task4.createTime = time.UnixMilli(10)

app.addTask(task1)
app.addTask(task2)
app.addTask(task3)
app.addTask(task4)

tasks := app.GetNewTasksWithFailedAttempt()
assert.Equal(t, 3, len(tasks))
assert.Equal(t, "task04", tasks[0].taskID)
assert.Equal(t, "task03", tasks[1].taskID)
assert.Equal(t, "task01", tasks[2].taskID)
}

func (ctx *Context) addApplicationToContext(app *Application) {
ctx.lock.Lock()
defer ctx.lock.Unlock()
Expand Down
17 changes: 16 additions & 1 deletion pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Context struct {
configMaps []*v1.ConfigMap // cached yunikorn configmaps
lock *sync.RWMutex // lock
txnID atomic.Uint64 // transaction ID counter
newApp bool // whether application has been added since the last time it was checked
}

// NewContext create a new context for the scheduler using a default (empty) configuration
Expand Down Expand Up @@ -322,6 +323,10 @@ func (ctx *Context) ensureAppAndTaskCreated(pod *v1.Pod) {
app = ctx.addApplication(&AddApplicationRequest{
Metadata: appMeta,
})
err := app.TriggerAppSubmission()
if err != nil {
log.Log(log.ShimContext).Error("BUG: application submission failed")
}
}

// get task metadata
Expand Down Expand Up @@ -1017,6 +1022,7 @@ func (ctx *Context) addApplication(request *AddApplicationRequest) *Application

// add into cache
ctx.applications[app.applicationID] = app
ctx.newApp = true
log.Log(log.ShimContext).Info("app added",
zap.String("appID", app.applicationID))

Expand Down Expand Up @@ -1111,7 +1117,7 @@ func (ctx *Context) addTask(request *AddTaskRequest) *Task {
}
}
task := NewFromTaskMeta(request.Metadata.TaskID, app, ctx, request.Metadata, originator)
app.addTask(task)
app.addTaskAndSchedule(task)
log.Log(log.ShimContext).Info("task added",
zap.String("appID", app.applicationID),
zap.String("taskID", task.taskID),
Expand Down Expand Up @@ -1709,6 +1715,15 @@ func (ctx *Context) finalizePods(existingPods []*v1.Pod) error {
return nil
}

func (ctx *Context) HasNewApplication() bool {
ctx.lock.Lock()
defer ctx.lock.Unlock()
v := ctx.newApp
ctx.newApp = false

return v
}

// for a given pod, return an allocation if found
func getExistingAllocation(pod *v1.Pod) *si.Allocation {
// skip terminated pods
Expand Down
Loading
Loading