diff --git a/Makefile b/Makefile index b2d06ed39..a2e0d589a 100644 --- a/Makefile +++ b/Makefile @@ -510,7 +510,7 @@ bench: fsm_graph: @echo "generating FSM graphs" "$(GO)" clean -testcache - "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/shim ./pkg/cache + "$(GO)" test -tags graphviz -run 'Test.*FsmGraph' ./pkg/cache scripts/generate-fsm-graph-images.sh # Remove generated build artifacts diff --git a/pkg/cmd/shim/main.go b/pkg/cmd/shim/main.go index 9f2406706..cc6785df6 100644 --- a/pkg/cmd/shim/main.go +++ b/pkg/cmd/shim/main.go @@ -51,7 +51,9 @@ func main() { if serviceContext.RMProxy != nil { ss := shim.NewShimScheduler(serviceContext.RMProxy, conf.GetSchedulerConf(), configMaps) - ss.Run() + if err := ss.Run(); err != nil { + log.Log(log.Shim).Fatal("Unable tto start scheduler", zap.Error(err)) + } signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) diff --git a/pkg/plugin/scheduler_plugin.go b/pkg/plugin/scheduler_plugin.go index 4201c6f17..507c8ef91 100644 --- a/pkg/plugin/scheduler_plugin.go +++ b/pkg/plugin/scheduler_plugin.go @@ -271,7 +271,9 @@ func NewSchedulerPlugin(_ runtime.Object, handle framework.Handle) (framework.Pl // we need our own informer factory here because the informers we get from the framework handle aren't yet initialized informerFactory := informers.NewSharedInformerFactory(handle.ClientSet(), 0) ss := shim.NewShimSchedulerForPlugin(serviceContext.RMProxy, informerFactory, conf.GetSchedulerConf(), configMaps) - ss.Run() + if err := ss.Run(); err != nil { + log.Log(log.ShimSchedulerPlugin).Fatal("Unable to start scheduler", zap.Error(err)) + } p := &YuniKornSchedulerPlugin{ context: ss.GetContext(), diff --git a/pkg/shim/scheduler.go b/pkg/shim/scheduler.go index f0620b3bf..9916eaaeb 100644 --- a/pkg/shim/scheduler.go +++ b/pkg/shim/scheduler.go @@ -19,12 +19,9 @@ package shim import ( - "context" - "os" "sync" "time" - "github.com/looplab/fsm" "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -35,7 +32,6 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/callback" "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/dispatcher" @@ -51,7 +47,6 @@ type KubernetesShim struct { appManager *appmgmt.AppManagementService phManager *cache.PlaceholderManager callback api.ResourceManagerCallback - stateMachine *fsm.FSM stopChan chan struct{} lock *sync.RWMutex outstandingAppsFound bool @@ -96,13 +91,11 @@ func newShimSchedulerInternal(ctx *cache.Context, apiFactory client.APIProvider, stopChan: make(chan struct{}), lock: &sync.RWMutex{}, outstandingAppsFound: false, - stateMachine: newSchedulerState(), } // init dispatcher dispatcher.RegisterEventHandler(dispatcher.EventTypeApp, ctx.ApplicationEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeTask, ctx.TaskEventHandler()) dispatcher.RegisterEventHandler(dispatcher.EventTypeNode, ctx.SchedulerNodeEventHandler()) - dispatcher.RegisterEventHandler(dispatcher.EventTypeScheduler, ss.SchedulerEventHandler()) return ss } @@ -111,89 +104,37 @@ func (ss *KubernetesShim) GetContext() *cache.Context { return ss.context } -func (ss *KubernetesShim) SchedulerEventHandler() func(obj interface{}) { - return func(obj interface{}) { - if event, ok := obj.(events.SchedulerEvent); ok { - if ss.canHandle(event) { - if err := ss.handle(event); err != nil { - log.Log(log.ShimScheduler).Error("failed to handle scheduler event", - zap.String("event", event.GetEvent()), - zap.Error(err)) - } - } - } +func (ss *KubernetesShim) recoverSchedulerState() error { + log.Log(log.ShimScheduler).Info("recovering scheduler states") + // step 1: recover all applications + // this step, we collect all the existing allocated pods from api-server, + // identify the scheduling identity (aka applicationInfo) from the pod, + // and then add these applications to the scheduler. + if err := ss.appManager.WaitForRecovery(); err != nil { + // failed + log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err)) + return err } -} -func (ss *KubernetesShim) register() { - if err := ss.registerShimLayer(); err != nil { - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RegisterSchedulerFailed, - }) - } else { - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RegisterSchedulerSucceed, - }) + // step 2: recover existing allocations + // this step, we collect all existing allocations (allocated pods) from api-server, + // rerun the scheduling for these allocations in order to restore scheduler-state, + // the rerun is like a replay, not a actual scheduling procedure. + recoverableAppManagers := make([]interfaces.Recoverable, 0) + for _, appMgr := range ss.appManager.GetAllManagers() { + if m, ok := appMgr.(interfaces.Recoverable); ok { + recoverableAppManagers = append(recoverableAppManagers, m) + } } -} - -func (ss *KubernetesShim) handleSchedulerFailure() { - ss.Stop() - // testmode will be true when mock scheduler intailize - if !conf.GetSchedulerConf().IsTestMode() { - os.Exit(1) + if err := ss.context.WaitForRecovery(recoverableAppManagers, 5*time.Minute); err != nil { + // failed + log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err)) + return err } -} - -func (ss *KubernetesShim) triggerSchedulerStateRecovery() { - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RecoverScheduler, - }) -} -func (ss *KubernetesShim) recoverSchedulerState() { - // run recovery process in a go routine - // do not block main thread - go func() { - log.Log(log.ShimScheduler).Info("recovering scheduler states") - // step 1: recover all applications - // this step, we collect all the existing allocated pods from api-server, - // identify the scheduling identity (aka applicationInfo) from the pod, - // and then add these applications to the scheduler. - if err := ss.appManager.WaitForRecovery(); err != nil { - // failed - log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err)) - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RecoverSchedulerFailed, - }) - return - } - - // step 2: recover existing allocations - // this step, we collect all existing allocations (allocated pods) from api-server, - // rerun the scheduling for these allocations in order to restore scheduler-state, - // the rerun is like a replay, not a actual scheduling procedure. - recoverableAppManagers := make([]interfaces.Recoverable, 0) - for _, appMgr := range ss.appManager.GetAllManagers() { - if m, ok := appMgr.(interfaces.Recoverable); ok { - recoverableAppManagers = append(recoverableAppManagers, m) - } - } - if err := ss.context.WaitForRecovery(recoverableAppManagers, 5*time.Minute); err != nil { - // failed - log.Log(log.ShimScheduler).Error("scheduler recovery failed", zap.Error(err)) - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RecoverSchedulerFailed, - }) - return - } - - // success - log.Log(log.ShimScheduler).Info("scheduler recovery succeed") - dispatcher.Dispatch(ShimSchedulerEvent{ - event: RecoverSchedulerSucceed, - }) - }() + // success + log.Log(log.ShimScheduler).Info("scheduler recovery succeed") + return nil } func (ss *KubernetesShim) doScheduling() { @@ -243,27 +184,6 @@ func (ss *KubernetesShim) registerShimLayer() error { return nil } -func (ss *KubernetesShim) GetSchedulerState() string { - return ss.stateMachine.Current() -} - -// event handling -func (ss *KubernetesShim) handle(se events.SchedulerEvent) error { - ss.lock.Lock() - defer ss.lock.Unlock() - err := ss.stateMachine.Event(context.Background(), se.GetEvent(), ss) - if err != nil && err.Error() == "no transition" { - return err - } - return nil -} - -func (ss *KubernetesShim) canHandle(se events.SchedulerEvent) bool { - ss.lock.RLock() - defer ss.lock.RUnlock() - return ss.stateMachine.Can(se.GetEvent()) -} - // each schedule iteration, we scan all apps and triggers app state transition func (ss *KubernetesShim) schedule() { apps := ss.context.GetAllApplications() @@ -274,7 +194,7 @@ func (ss *KubernetesShim) schedule() { } } -func (ss *KubernetesShim) Run() { +func (ss *KubernetesShim) Run() error { // NOTE: the order of starting these services matter, // please look at the comments before modifying the orders @@ -289,19 +209,33 @@ func (ss *KubernetesShim) Run() { // run the client library code that communicates with Kubernetes ss.apiFactory.Start() - // register scheduler with scheduler core - // this triggers the scheduler state transition - // it first registers with the core, then start to do recovery, - // after the recovery is succeed, it goes to the normal scheduling routine - dispatcher.Dispatch(newRegisterSchedulerEvent()) + // register shim with core + if err := ss.registerShimLayer(); err != nil { + log.Log(log.ShimScheduler).Error("failed to register shim with core", zap.Error(err)) + ss.Stop() + return err + } // run app managers // the app manager launches the pod event handlers // it needs to be started after the shim is registered with the core if err := ss.appManager.Start(); err != nil { - log.Log(log.ShimScheduler).Fatal("failed to start app manager", zap.Error(err)) + log.Log(log.ShimScheduler).Error("failed to start app manager", zap.Error(err)) + ss.Stop() + return err + } + + // recover scheduler state + if err := ss.recoverSchedulerState(); err != nil { + log.Log(log.ShimScheduler).Error("failed to recover scheduler state", zap.Error(err)) ss.Stop() + return err } + + // start scheduling loop + ss.doScheduling() + + return nil } func (ss *KubernetesShim) Stop() { diff --git a/pkg/shim/scheduler_graphviz_test.go b/pkg/shim/scheduler_graphviz_test.go deleted file mode 100644 index 7f2da0738..000000000 --- a/pkg/shim/scheduler_graphviz_test.go +++ /dev/null @@ -1,61 +0,0 @@ -//go:build graphviz -// +build graphviz - -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "fmt" - "os" - "testing" - - "github.com/looplab/fsm" - "gotest.tools/v3/assert" - - "github.com/apache/yunikorn-k8shim/pkg/appmgmt" - "github.com/apache/yunikorn-k8shim/pkg/cache" - "github.com/apache/yunikorn-k8shim/pkg/client" - "github.com/apache/yunikorn-k8shim/pkg/common/test" - "github.com/apache/yunikorn-scheduler-interface/lib/go/api" - "github.com/apache/yunikorn-scheduler-interface/lib/go/si" -) - -func TestSchedulerFsmGraph(t *testing.T) { - var callback api.ResourceManagerCallback - - mockedAMProtocol := cache.NewMockedAMProtocol() - mockedAPIProvider := client.NewMockedAPIProvider(false) - mockedAPIProvider.GetAPIs().SchedulerAPI = test.NewSchedulerAPIMock().RegisterFunction( - func(request *si.RegisterResourceManagerRequest, - callback api.ResourceManagerCallback) (response *si.RegisterResourceManagerResponse, e error) { - return nil, fmt.Errorf("some error") - }) - - ctx := cache.NewContext(mockedAPIProvider) - shim := newShimSchedulerInternal(ctx, mockedAPIProvider, - appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider), callback) - - graph := fsm.Visualize(shim.stateMachine) - - err := os.MkdirAll("../../build/fsm", 0755) - assert.NilError(t, err, "Creating output dir failed") - os.WriteFile("../../build/fsm/k8shim-scheduler-state.dot", []byte(graph), 0644) - assert.NilError(t, err, "Writing graph failed") -} diff --git a/pkg/shim/scheduler_mock_test.go b/pkg/shim/scheduler_mock_test.go index f430bdc38..51e9cde1e 100644 --- a/pkg/shim/scheduler_mock_test.go +++ b/pkg/shim/scheduler_mock_test.go @@ -81,10 +81,14 @@ func (fc *MockScheduler) init() { fc.apiProvider = mockedAPIProvider } -func (fc *MockScheduler) start() { +func (fc *MockScheduler) start() error { fc.apiProvider.RunEventHandler() // must be called first - fc.scheduler.Run() + if err := fc.scheduler.Run(); err != nil { + fc.started.Store(false) + return err + } fc.started.Store(true) + return nil } func (fc *MockScheduler) updateConfig(queues string, extraConfig map[string]string) error { @@ -172,23 +176,6 @@ func (fc *MockScheduler) addTask(appID string, taskID string, ask *si.Resource) }) } -func (fc *MockScheduler) waitForSchedulerState(t testing.TB, expectedState string) { - deadline := time.Now().Add(10 * time.Second) - for { - if fc.scheduler.GetSchedulerState() == expectedState { - break - } - log.Log(log.Test).Info("waiting for scheduler state", - zap.String("expected", expectedState), - zap.String("actual", fc.scheduler.GetSchedulerState())) - time.Sleep(time.Second) - if time.Now().After(deadline) { - t.Errorf("wait for scheduler to reach state %s failed, current state %s", - expectedState, fc.scheduler.GetSchedulerState()) - } - } -} - func (fc *MockScheduler) waitAndAssertApplicationState(t *testing.T, appID, expectedState string) { app := fc.context.GetApplication(appID) assert.Equal(t, app != nil, true) diff --git a/pkg/shim/scheduler_perf_test.go b/pkg/shim/scheduler_perf_test.go index 53c057bb1..9c1afcae0 100644 --- a/pkg/shim/scheduler_perf_test.go +++ b/pkg/shim/scheduler_perf_test.go @@ -80,7 +80,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) { cluster := &MockScheduler{} cluster.init() - cluster.start() + assert.NilError(b, cluster.start(), "failed to initialize cluster") defer cluster.stop() if profileCpu { @@ -101,8 +101,7 @@ func BenchmarkSchedulingThroughPut(b *testing.B) { }(f) } - // init scheduler & update config - cluster.waitForSchedulerState(b, SchedulerStates().Running) + // update config err := cluster.updateConfig(queueConfig, map[string]string{ "log.level": "WARN", }) diff --git a/pkg/shim/scheduler_state.go b/pkg/shim/scheduler_state.go deleted file mode 100644 index dcce27350..000000000 --- a/pkg/shim/scheduler_state.go +++ /dev/null @@ -1,183 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "context" - "sync" - - "github.com/looplab/fsm" - "go.uber.org/zap" - - "github.com/apache/yunikorn-k8shim/pkg/common/events" - "github.com/apache/yunikorn-k8shim/pkg/log" -) - -var schedulerStatesOnce sync.Once - -// ---------------------------------------------- -// Scheduler events -// ---------------------------------------------- -type SchedulerEventType int - -const ( - RegisterScheduler SchedulerEventType = iota - RegisterSchedulerSucceed - RegisterSchedulerFailed - RecoverScheduler - RecoverSchedulerSucceed - RecoverSchedulerFailed -) - -func (ae SchedulerEventType) String() string { - return [...]string{"RegisterScheduler", "RegisterSchedulerSucceed", "RegisterSchedulerFailed", "RecoverScheduler", "RecoverSchedulerSucceed", "RecoverSchedulerFailed"}[ae] -} - -type ShimSchedulerEvent struct { //nolint:golint - event SchedulerEventType -} - -func (rs ShimSchedulerEvent) GetEvent() string { - return rs.event.String() -} - -func (rs ShimSchedulerEvent) GetArgs() []interface{} { - return nil -} - -// ------------------------------------------------------------------- -// event to trigger scheduler registration -// -------------------------------------------------------------------- -type RegisterSchedulerEvent struct { - event SchedulerEventType -} - -func newRegisterSchedulerEvent() RegisterSchedulerEvent { - return RegisterSchedulerEvent{ - event: RegisterScheduler, - } -} - -func (rs RegisterSchedulerEvent) GetEvent() string { - return rs.event.String() -} - -func (rs RegisterSchedulerEvent) GetArgs() []interface{} { - return nil -} - -// ---------------------------------- -// Scheduler states -// ---------------------------------- -var storeScheduleStates *SStates - -type SStates struct { - New string - Registered string - Registering string - Recovering string - Running string - Draining string - Stopped string -} - -func SchedulerStates() *SStates { - schedulerStatesOnce.Do(func() { - storeScheduleStates = &SStates{ - New: "New", - Registered: "Registered", - Registering: "Registering", - Recovering: "Recovering", - Running: "Running", - Draining: "Draining", - Stopped: "Stopped", - } - }) - return storeScheduleStates -} - -func newSchedulerState() *fsm.FSM { - states := SchedulerStates() - return fsm.NewFSM( - states.New, fsm.Events{ - { - Name: RegisterScheduler.String(), - Src: []string{states.New}, - Dst: states.Registering, - }, - { - Name: RegisterSchedulerSucceed.String(), - Src: []string{states.Registering}, - Dst: states.Registered, - }, - { - Name: RegisterSchedulerFailed.String(), - Src: []string{states.Registering}, - Dst: states.Stopped, - }, - { - Name: RecoverScheduler.String(), - Src: []string{states.Registered}, - Dst: states.Recovering, - }, - { - Name: RecoverSchedulerSucceed.String(), - Src: []string{states.Recovering}, - Dst: states.Running, - }, - { - Name: RecoverSchedulerFailed.String(), - Src: []string{states.Recovering}, - Dst: states.Stopped, - }, - }, - fsm.Callbacks{ - events.EnterState: func(_ context.Context, event *fsm.Event) { - log.Log(log.ShimFSM).Debug("scheduler shim state transition", - zap.String("source", event.Src), - zap.String("destination", event.Dst), - zap.String("event", event.Event)) - }, - states.Registered: func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.triggerSchedulerStateRecovery() // if reaches registered, trigger recovering - }, - states.Recovering: func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.recoverSchedulerState() // do recovering - }, - states.Running: func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.doScheduling() // do scheduling - }, - RegisterScheduler.String(): func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.register() // trigger registration - }, - RegisterSchedulerFailed.String(): func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.handleSchedulerFailure() // registration failed, stop the scheduler - }, - RecoverSchedulerFailed.String(): func(_ context.Context, event *fsm.Event) { - scheduler := event.Args[0].(*KubernetesShim) //nolint:errcheck - scheduler.handleSchedulerFailure() // recovery failed - }, - }, - ) -} diff --git a/pkg/shim/scheduler_test.go b/pkg/shim/scheduler_test.go index 4f563465c..f8387c6a9 100644 --- a/pkg/shim/scheduler_test.go +++ b/pkg/shim/scheduler_test.go @@ -21,9 +21,7 @@ package shim import ( "fmt" "testing" - "time" - "go.uber.org/zap" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" @@ -32,7 +30,6 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/client" "github.com/apache/yunikorn-k8shim/pkg/common" "github.com/apache/yunikorn-k8shim/pkg/common/test" - "github.com/apache/yunikorn-k8shim/pkg/log" "github.com/apache/yunikorn-scheduler-interface/lib/go/api" siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" @@ -58,11 +55,9 @@ partitions: // init and register scheduler cluster := MockScheduler{} cluster.init() - cluster.start() + assert.NilError(t, cluster.start(), "failed to start cluster") defer cluster.stop() - // ensure scheduler running - cluster.waitForSchedulerState(t, SchedulerStates().Running) err := cluster.updateConfig(configData, nil) assert.NilError(t, err, "update config failed") nodeLabels := map[string]string{ @@ -112,11 +107,9 @@ partitions: // init and register scheduler cluster := MockScheduler{} cluster.init() - cluster.start() + assert.NilError(t, cluster.start(), "failed to start cluster") defer cluster.stop() - // ensure scheduler state - cluster.waitForSchedulerState(t, SchedulerStates().Running) err := cluster.updateConfig(configData, nil) assert.NilError(t, err, "update config failed") @@ -173,11 +166,8 @@ func TestSchedulerRegistrationFailed(t *testing.T) { ctx := cache.NewContext(mockedAPIProvider) shim := newShimSchedulerInternal(ctx, mockedAPIProvider, appmgmt.NewAMService(mockedAMProtocol, mockedAPIProvider), callback) - shim.Run() - defer shim.Stop() - - err := waitShimSchedulerState(shim, SchedulerStates().Stopped, 5*time.Second) - assert.NilError(t, err) + assert.Error(t, shim.Run(), "some error") + shim.Stop() } func TestTaskFailures(t *testing.T) { @@ -203,7 +193,7 @@ partitions: // init and register scheduler cluster := MockScheduler{} cluster.init() - cluster.start() + assert.NilError(t, cluster.start(), "failed to start cluster") defer cluster.stop() // mock pod bind failures @@ -214,8 +204,6 @@ partitions: return nil }) - // ensure scheduler state - cluster.waitForSchedulerState(t, SchedulerStates().Running) err := cluster.updateConfig(configData, nil) assert.NilError(t, err, "update config failed") @@ -248,20 +236,3 @@ partitions: "[mycluster]default", "app0001", 1) assert.NilError(t, err, "number of allocations is not expected, error") } - -func waitShimSchedulerState(shim *KubernetesShim, expectedState string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for { - if shim.GetSchedulerState() == expectedState { - log.Log(log.Test).Info("waiting for state", - zap.String("expect", expectedState), - zap.String("current", shim.GetSchedulerState())) - return nil - } - time.Sleep(1 * time.Second) - if time.Now().After(deadline) { - return fmt.Errorf("scheduler has not reached expected state %s in %d seconds, current state: %s", - expectedState, deadline.Second(), shim.GetSchedulerState()) - } - } -}