Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2074] Remove scheduler state machine #700

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ bench:
fsm_graph:
@echo "generating FSM graphs"
"$(GO)" clean -testcache
"$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/shim ./pkg/cache
"$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/cache
scripts/generate-fsm-graph-images.sh

# Remove generated build artifacts
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/shim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func main() {

if serviceContext.RMProxy != nil {
ss := shim.NewShimScheduler(serviceContext.RMProxy, conf.GetSchedulerConf(), configMaps)
ss.Run()
if err := ss.Run(); err != nil {
log.Log(log.Shim).Fatal("Unable tto start scheduler", zap.Error(err))
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
Expand Down
4 changes: 3 additions & 1 deletion pkg/plugin/scheduler_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ func NewSchedulerPlugin(_ runtime.Object, handle framework.Handle) (framework.Pl
// we need our own informer factory here because the informers we get from the framework handle aren't yet initialized
informerFactory := informers.NewSharedInformerFactory(handle.ClientSet(), 0)
ss := shim.NewShimSchedulerForPlugin(serviceContext.RMProxy, informerFactory, conf.GetSchedulerConf(), configMaps)
ss.Run()
if err := ss.Run(); err != nil {
log.Log(log.ShimSchedulerPlugin).Fatal("Unable to start scheduler", zap.Error(err))
}

p := &YuniKornSchedulerPlugin{
context: ss.GetContext(),
Expand Down
160 changes: 47 additions & 113 deletions pkg/shim/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
package shim

import (
"context"
"os"
"sync"
"time"

"github.com/looplab/fsm"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -35,7 +32,6 @@
"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/callback"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
Expand All @@ -51,7 +47,6 @@
appManager *appmgmt.AppManagementService
phManager *cache.PlaceholderManager
callback api.ResourceManagerCallback
stateMachine *fsm.FSM
stopChan chan struct{}
lock *sync.RWMutex
outstandingAppsFound bool
Expand Down Expand Up @@ -96,13 +91,11 @@
stopChan: make(chan struct{}),
lock: &sync.RWMutex{},
outstandingAppsFound: false,
stateMachine: newSchedulerState(),
}
// init dispatcher
dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, ctx.ApplicationEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeTask, ctx.TaskEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, ctx.SchedulerNodeEventHandler())
dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler, ss.SchedulerEventHandler())

return ss
}
Expand All @@ -111,89 +104,37 @@
return ss.context
}

func (ss *KubernetesShim) SchedulerEventHandler() func(obj interface{}) {
return func(obj interface{}) {
if event, ok := obj.(events.SchedulerEvent); ok {
if ss.canHandle(event) {
if err := ss.handle(event); err != nil {
log.Log(log.ShimScheduler).Error("failed to handle scheduler event",
zap.String("event", event.GetEvent()),
zap.Error(err))
}
}
}
func (ss *KubernetesShim) recoverSchedulerState() error {
log.Log(log.ShimScheduler).Info("recovering scheduler states")
// step 1: recover all applications
// this step, we collect all the existing allocated pods from api-server,
// identify the scheduling identity (aka applicationInfo) from the pod,
// and then add these applications to the scheduler.
if err := ss.appManager.WaitForRecovery(); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
return err

Check warning on line 116 in pkg/shim/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/shim/scheduler.go#L114-L116

Added lines #L114 - L116 were not covered by tests
}
}

func (ss *KubernetesShim) register() {
if err := ss.registerShimLayer(); err != nil {
dispatcher.Dispatch(ShimSchedulerEvent{
event: RegisterSchedulerFailed,
})
} else {
dispatcher.Dispatch(ShimSchedulerEvent{
event: RegisterSchedulerSucceed,
})
// step 2: recover existing allocations
// this step, we collect all existing allocations (allocated pods) from api-server,
// rerun the scheduling for these allocations in order to restore scheduler-state,
// the rerun is like a replay, not a actual scheduling procedure.
recoverableAppManagers := make([]interfaces.Recoverable, 0)
for _, appMgr := range ss.appManager.GetAllManagers() {
if m, ok := appMgr.(interfaces.Recoverable); ok {
recoverableAppManagers = append(recoverableAppManagers, m)
}
}
}

func (ss *KubernetesShim) handleSchedulerFailure() {
ss.Stop()
// testmode will be true when mock scheduler intailize
if !conf.GetSchedulerConf().IsTestMode() {
os.Exit(1)
if err := ss.context.WaitForRecovery(recoverableAppManagers, 5*time.Minute); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
return err

Check warning on line 132 in pkg/shim/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/shim/scheduler.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
}

func (ss *KubernetesShim) triggerSchedulerStateRecovery() {
dispatcher.Dispatch(ShimSchedulerEvent{
event: RecoverScheduler,
})
}

func (ss *KubernetesShim) recoverSchedulerState() {
// run recovery process in a go routine
// do not block main thread
go func() {
log.Log(log.ShimScheduler).Info("recovering scheduler states")
// step 1: recover all applications
// this step, we collect all the existing allocated pods from api-server,
// identify the scheduling identity (aka applicationInfo) from the pod,
// and then add these applications to the scheduler.
if err := ss.appManager.WaitForRecovery(); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
dispatcher.Dispatch(ShimSchedulerEvent{
event: RecoverSchedulerFailed,
})
return
}

// step 2: recover existing allocations
// this step, we collect all existing allocations (allocated pods) from api-server,
// rerun the scheduling for these allocations in order to restore scheduler-state,
// the rerun is like a replay, not a actual scheduling procedure.
recoverableAppManagers := make([]interfaces.Recoverable, 0)
for _, appMgr := range ss.appManager.GetAllManagers() {
if m, ok := appMgr.(interfaces.Recoverable); ok {
recoverableAppManagers = append(recoverableAppManagers, m)
}
}
if err := ss.context.WaitForRecovery(recoverableAppManagers, 5*time.Minute); err != nil {
// failed
log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err))
dispatcher.Dispatch(ShimSchedulerEvent{
event: RecoverSchedulerFailed,
})
return
}

// success
log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
dispatcher.Dispatch(ShimSchedulerEvent{
event: RecoverSchedulerSucceed,
})
}()
// success
log.Log(log.ShimScheduler).Info("scheduler recovery succeed")
return nil
}

func (ss *KubernetesShim) doScheduling() {
Expand Down Expand Up @@ -243,27 +184,6 @@
return nil
}

func (ss *KubernetesShim) GetSchedulerState() string {
return ss.stateMachine.Current()
}

// event handling
func (ss *KubernetesShim) handle(se events.SchedulerEvent) error {
ss.lock.Lock()
defer ss.lock.Unlock()
err := ss.stateMachine.Event(context.Background(), se.GetEvent(), ss)
if err != nil && err.Error() == "no transition" {
return err
}
return nil
}

func (ss *KubernetesShim) canHandle(se events.SchedulerEvent) bool {
ss.lock.RLock()
defer ss.lock.RUnlock()
return ss.stateMachine.Can(se.GetEvent())
}

// each schedule iteration, we scan all apps and triggers app state transition
func (ss *KubernetesShim) schedule() {
apps := ss.context.GetAllApplications()
Expand All @@ -274,7 +194,7 @@
}
}

func (ss *KubernetesShim) Run() {
func (ss *KubernetesShim) Run() error {
// NOTE: the order of starting these services matter,
// please look at the comments before modifying the orders

Expand All @@ -289,19 +209,33 @@
// run the client library code that communicates with Kubernetes
ss.apiFactory.Start()

// register scheduler with scheduler core
// this triggers the scheduler state transition
// it first registers with the core, then start to do recovery,
// after the recovery is succeed, it goes to the normal scheduling routine
dispatcher.Dispatch(newRegisterSchedulerEvent())
// register shim with core
if err := ss.registerShimLayer(); err != nil {
log.Log(log.ShimScheduler).Error("failed to register shim with core", zap.Error(err))
ss.Stop()
return err
}

// run app managers
// the app manager launches the pod event handlers
// it needs to be started after the shim is registered with the core
if err := ss.appManager.Start(); err != nil {
log.Log(log.ShimScheduler).Fatal("failed to start app manager", zap.Error(err))
log.Log(log.ShimScheduler).Error("failed to start app manager", zap.Error(err))
ss.Stop()
return err
}

Check warning on line 226 in pkg/shim/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/shim/scheduler.go#L223-L226

Added lines #L223 - L226 were not covered by tests

// recover scheduler state
if err := ss.recoverSchedulerState(); err != nil {
log.Log(log.ShimScheduler).Error("failed to recover scheduler state", zap.Error(err))

Check warning on line 230 in pkg/shim/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/shim/scheduler.go#L230

Added line #L230 was not covered by tests
ss.Stop()
return err

Check warning on line 232 in pkg/shim/scheduler.go

View check run for this annotation

Codecov / codecov/patch

pkg/shim/scheduler.go#L232

Added line #L232 was not covered by tests
}

// start scheduling loop
ss.doScheduling()

return nil
}

func (ss *KubernetesShim) Stop() {
Expand Down
61 changes: 0 additions & 61 deletions pkg/shim/scheduler_graphviz_test.go

This file was deleted.

25 changes: 6 additions & 19 deletions pkg/shim/scheduler_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ func (fc *MockScheduler) init() {
fc.apiProvider = mockedAPIProvider
}

func (fc *MockScheduler) start() {
func (fc *MockScheduler) start() error {
fc.apiProvider.RunEventHandler() // must be called first
fc.scheduler.Run()
if err := fc.scheduler.Run(); err != nil {
fc.started.Store(false)
return err
}
fc.started.Store(true)
return nil
}

func (fc *MockScheduler) updateConfig(queues string, extraConfig map[string]string) error {
Expand Down Expand Up @@ -172,23 +176,6 @@ func (fc *MockScheduler) addTask(appID string, taskID string, ask *si.Resource)
})
}

func (fc *MockScheduler) waitForSchedulerState(t testing.TB, expectedState string) {
deadline := time.Now().Add(10 * time.Second)
for {
if fc.scheduler.GetSchedulerState() == expectedState {
break
}
log.Log(log.Test).Info("waiting for scheduler state",
zap.String("expected", expectedState),
zap.String("actual", fc.scheduler.GetSchedulerState()))
time.Sleep(time.Second)
if time.Now().After(deadline) {
t.Errorf("wait for scheduler to reach state %s failed, current state %s",
expectedState, fc.scheduler.GetSchedulerState())
}
}
}

func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expectedState string) {
app := fc.context.GetApplication(appID)
assert.Equal(t, app != nil, true)
Expand Down
5 changes: 2 additions & 3 deletions pkg/shim/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {

cluster := &MockScheduler{}
cluster.init()
cluster.start()
assert.NilError(b, cluster.start(), "failed to initialize cluster")
defer cluster.stop()

if profileCpu {
Expand All @@ -101,8 +101,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) {
}(f)
}

// init scheduler & update config
cluster.waitForSchedulerState(b, SchedulerStates().Running)
// update config
err := cluster.updateConfig(queueConfig, map[string]string{
"log.level": "WARN",
})
Expand Down
Loading