From d5047bbdc182a991e7553859e609dd6045bcc6ec Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Thu, 15 Aug 2024 10:17:41 -0500 Subject: [PATCH] [YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and allocations --- go.mod | 4 +- go.sum | 8 +-- pkg/cache/application.go | 35 ++-------- pkg/cache/application_state.go | 64 +---------------- pkg/cache/application_state_test.go | 100 --------------------------- pkg/cache/scheduler_callback.go | 19 ----- pkg/cache/scheduler_callback_test.go | 48 ------------- pkg/cache/task.go | 57 ++++++--------- pkg/cache/task_test.go | 59 ++++++---------- pkg/common/si_helper.go | 40 ++++------- pkg/common/si_helper_test.go | 40 +++-------- 11 files changed, 80 insertions(+), 394 deletions(-) diff --git a/go.mod b/go.mod index 349820d38..5ab3457dc 100644 --- a/go.mod +++ b/go.mod @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim go 1.21 require ( - github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 - github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 + github.com/apache/yunikorn-core v0.0.0-20240815151636-1ee277455be8 + github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/looplab/fsm v1.0.1 diff --git a/go.sum b/go.sum index 7408b97bd..e3bb6b5e1 100644 --- a/go.sum +++ b/go.sum @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= -github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0= -github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0= -github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= +github.com/apache/yunikorn-core v0.0.0-20240815151636-1ee277455be8 h1:T/BRJNJOs7owiHMr6Es+J98L4B9VK82hNPNxJuD5u0k= +github.com/apache/yunikorn-core v0.0.0-20240815151636-1ee277455be8/go.mod h1:QHKfJ2RyZuQnZg28SnypmnvFxN/zfoYf+hmfxiVdq5g= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw= +github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= diff --git a/pkg/cache/application.go b/pkg/cache/application.go index a6ec1f3f8..085465ce7 100644 --- a/pkg/cache/application.go +++ b/pkg/cache/application.go @@ -627,42 +627,19 @@ func (app *Application) handleFailApplicationEvent(errMsg string) { } } -func (app *Application) handleReleaseAppAllocationEvent(allocationKey string, terminationType string) { - log.Log(log.ShimCacheApplication).Info("try to release pod from application", - zap.String("appID", app.applicationID), - zap.String("allocationKey", allocationKey), - zap.String("terminationType", terminationType)) - - for _, task := range app.taskMap { - if task.allocationKey == allocationKey { - task.setTaskTerminationType(terminationType) - err := task.DeleteTaskPod() - if err != nil { - log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err)) - } - app.publishPlaceholderTimeoutEvents(task) - } - } -} - -func (app *Application) handleReleaseAppAllocationAskEvent(taskID string, terminationType string) { +func (app *Application) handleReleaseAppAllocationEvent(taskID string, terminationType string) { log.Log(log.ShimCacheApplication).Info("try to release pod from application", zap.String("appID", app.applicationID), zap.String("taskID", taskID), zap.String("terminationType", terminationType)) + if task, ok := app.taskMap[taskID]; ok { task.setTaskTerminationType(terminationType) - if task.IsPlaceholder() { - err := task.DeleteTaskPod() - if err != nil { - log.Log(log.ShimCacheApplication).Error("failed to release allocation ask from application", zap.Error(err)) - } - app.publishPlaceholderTimeoutEvents(task) - } else { - log.Log(log.ShimCacheApplication).Warn("skip to release allocation ask, ask is not a placeholder", - zap.String("appID", app.applicationID), - zap.String("taskID", taskID)) + err := task.DeleteTaskPod() + if err != nil { + log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err)) } + app.publishPlaceholderTimeoutEvents(task) } else { log.Log(log.ShimCacheApplication).Warn("task not found", zap.String("appID", app.applicationID), diff --git a/pkg/cache/application_state.go b/pkg/cache/application_state.go index aee1c3e19..59541b246 100644 --- a/pkg/cache/application_state.go +++ b/pkg/cache/application_state.go @@ -49,13 +49,12 @@ const ( KillApplication KilledApplication ReleaseAppAllocation - ReleaseAppAllocationAsk ResumingApplication AppTaskCompleted ) func (ae ApplicationEventType) String() string { - return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ReleaseAppAllocationAsk", "ResumingApplication", "AppTaskCompleted"}[ae] + return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ResumingApplication", "AppTaskCompleted"}[ae] } // ------------------------ @@ -295,37 +294,6 @@ func (re ReleaseAppAllocationEvent) GetEvent() string { return re.event.String() } -type ReleaseAppAllocationAskEvent struct { - applicationID string - taskID string - terminationType string - event ApplicationEventType -} - -func NewReleaseAppAllocationAskEvent(appID string, allocTermination si.TerminationType, taskID string) ReleaseAppAllocationAskEvent { - return ReleaseAppAllocationAskEvent{ - applicationID: appID, - taskID: taskID, - terminationType: si.TerminationType_name[int32(allocTermination)], - event: ReleaseAppAllocationAsk, - } -} - -func (re ReleaseAppAllocationAskEvent) GetApplicationID() string { - return re.applicationID -} - -func (re ReleaseAppAllocationAskEvent) GetArgs() []interface{} { - args := make([]interface{}, 2) - args[0] = re.taskID - args[1] = re.terminationType - return args -} - -func (re ReleaseAppAllocationAskEvent) GetEvent() string { - return re.event.String() -} - // ------------------------ // Resuming application // ------------------------ @@ -434,7 +402,7 @@ func newAppState() *fsm.FSM { //nolint:funlen }, { Name: ReleaseAppAllocation.String(), - Src: []string{states.Running}, + Src: []string{states.Running, states.Accepted, states.Reserving}, Dst: states.Running, }, { @@ -447,21 +415,6 @@ func newAppState() *fsm.FSM { //nolint:funlen Src: []string{states.Resuming}, Dst: states.Resuming, }, - { - Name: ReleaseAppAllocationAsk.String(), - Src: []string{states.Running, states.Accepted, states.Reserving}, - Dst: states.Running, - }, - { - Name: ReleaseAppAllocationAsk.String(), - Src: []string{states.Failing}, - Dst: states.Failing, - }, - { - Name: ReleaseAppAllocationAsk.String(), - Src: []string{states.Resuming}, - Dst: states.Resuming, - }, { Name: CompleteApplication.String(), Src: []string{states.Running}, @@ -543,17 +496,6 @@ func newAppState() *fsm.FSM { //nolint:funlen app.onReservationStateChange() }, ReleaseAppAllocation.String(): func(_ context.Context, event *fsm.Event) { - app := event.Args[0].(*Application) //nolint:errcheck - eventArgs := make([]string, 2) - if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil { - log.Log(log.ShimFSM).Error("fail to parse event arg", zap.Error(err)) - return - } - allocationKey := eventArgs[0] - terminationType := eventArgs[1] - app.handleReleaseAppAllocationEvent(allocationKey, terminationType) - }, - ReleaseAppAllocationAsk.String(): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck eventArgs := make([]string, 2) if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil { @@ -562,7 +504,7 @@ func newAppState() *fsm.FSM { //nolint:funlen } taskID := eventArgs[0] terminationType := eventArgs[1] - app.handleReleaseAppAllocationAskEvent(taskID, terminationType) + app.handleReleaseAppAllocationEvent(taskID, terminationType) }, AppTaskCompleted.String(): func(_ context.Context, event *fsm.Event) { app := event.Args[0].(*Application) //nolint:errcheck diff --git a/pkg/cache/application_state_test.go b/pkg/cache/application_state_test.go index d16e3012c..75626bd95 100644 --- a/pkg/cache/application_state_test.go +++ b/pkg/cache/application_state_test.go @@ -769,105 +769,6 @@ func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) { } } -func TestNewReleaseAppAllocationAskEvent(t *testing.T) { - tests := []struct { - name string - appID, taskID string - terminationType si.TerminationType - wantID, wantTaskID, wantType string - wantEvent ApplicationEventType - }{ - {TestCreateName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT", ReleaseAppAllocationAsk}, - } - - for _, tt := range tests { - instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID) - t.Run(tt.name, func(t *testing.T) { - if instance.applicationID != tt.wantID || instance.taskID != tt.taskID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent { - t.Errorf("want %s %s %s %s, got %s %s %s %s", - tt.wantID, tt.taskID, tt.wantType, tt.wantEvent, - instance.applicationID, instance.taskID, instance.terminationType, instance.event) - } - }) - } -} - -func TestReleaseAppAllocationAskEventGetEvent(t *testing.T) { - tests := []struct { - name string - appID, taskID string - terminationType si.TerminationType - wantEvent ApplicationEventType - }{ - {TestEventName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, ReleaseAppAllocationAsk}, - } - - for _, tt := range tests { - instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID) - event := instance.GetEvent() - t.Run(tt.name, func(t *testing.T) { - if event != tt.wantEvent.String() { - t.Errorf("want %s, got %s", tt.wantEvent, event) - } - }) - } -} - -func TestReleaseAppAllocationAskEventGetArgs(t *testing.T) { - tests := []struct { - name string - appID, taskID string - terminationType si.TerminationType - wantLen int - wantTaskID, wantType string - castOk []bool - wantArg []string - }{ - {TestArgsName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, 2, "testTaskId001", "TIMEOUT", []bool{true, true}, []string{"testTaskId001", "TIMEOUT"}}, - } - - for _, tt := range tests { - instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID) - args := instance.GetArgs() - t.Run(tt.name, func(t *testing.T) { - if len(args) != tt.wantLen { - t.Errorf("want %d, got %d", tt.wantLen, len(args)) - - for index, arg := range args { - info, ok := arg.(string) - if ok != tt.castOk[index] { - t.Errorf("want %v, got %v", tt.castOk[index], ok) - } - if info != tt.wantArg[index] { - t.Errorf("want %s, got %s", tt.wantArg[index], info) - } - } - } - }) - } -} - -func TestReleaseAppAllocationAskEventGetApplicationID(t *testing.T) { - tests := []struct { - name string - appID, taskID string - terminationType si.TerminationType - wantID string - }{ - {TestAppIDName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001"}, - } - - for _, tt := range tests { - instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID) - appID := instance.GetApplicationID() - t.Run(tt.name, func(t *testing.T) { - if appID != tt.wantID { - t.Errorf("want %s, got %s", tt.wantID, appID) - } - }) - } -} - func TestNewResumingApplicationEvent(t *testing.T) { tests := []struct { name string @@ -902,7 +803,6 @@ func TestApplicationEventsAsString(t *testing.T) { assert.Equal(t, KillApplication.String(), "KillApplication") assert.Equal(t, KilledApplication.String(), "KilledApplication") assert.Equal(t, ReleaseAppAllocation.String(), "ReleaseAppAllocation") - assert.Equal(t, ReleaseAppAllocationAsk.String(), "ReleaseAppAllocationAsk") assert.Equal(t, ResumingApplication.String(), "ResumingApplication") assert.Equal(t, AppTaskCompleted.String(), "AppTaskCompleted") } diff --git a/pkg/cache/scheduler_callback.go b/pkg/cache/scheduler_callback.go index 728212bfa..0c43b34d8 100644 --- a/pkg/cache/scheduler_callback.go +++ b/pkg/cache/scheduler_callback.go @@ -77,15 +77,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons } } - for _, reject := range response.Rejected { - // request rejected by the scheduler, put it back and try scheduling again - log.Log(log.ShimRMCallback).Debug("callback: response to rejected ask", - zap.String("allocationKey", reject.AllocationKey)) - dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, reject.AllocationKey, - fmt.Sprintf("task %s ask from application %s is rejected by scheduler", - reject.AllocationKey, reject.ApplicationID))) - } - for _, reject := range response.RejectedAllocations { // request rejected by the scheduler, reject it log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation", @@ -110,16 +101,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons } } - for _, ask := range response.ReleasedAsks { - log.Log(log.ShimRMCallback).Debug("callback: response to released allocations", - zap.String("allocation key", ask.AllocationKey)) - - if ask.TerminationType == si.TerminationType_TIMEOUT { - ev := NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, ask.AllocationKey) - dispatcher.Dispatch(ev) - } - } - return nil } diff --git a/pkg/cache/scheduler_callback_test.go b/pkg/cache/scheduler_callback_test.go index 59bd875e4..d98f5f6f7 100644 --- a/pkg/cache/scheduler_callback_test.go +++ b/pkg/cache/scheduler_callback_test.go @@ -133,27 +133,6 @@ func TestUpdateAllocation_NewTask_PodAlreadyAssigned(t *testing.T) { assert.Equal(t, TaskSchedAllocated, task.schedulingState) } -func TestUpdateAllocation_AskRejected(t *testing.T) { - callback, context := initCallbackTest(t, false, false) - defer dispatcher.UnregisterAllEventHandlers() - defer dispatcher.Stop() - - err := callback.UpdateAllocation(&si.AllocationResponse{ - Rejected: []*si.RejectedAllocationAsk{ - { - ApplicationID: appID, - AllocationKey: taskUID1, - }, - }, - }) - assert.NilError(t, err, "error updating allocation") - task := context.getTask(appID, taskUID1) - err = utils.WaitForCondition(func() bool { - return task.GetTaskState() == TaskStates().Failed - }, 10*time.Millisecond, time.Second) - assert.NilError(t, err, "task has not transitioned to Failed state") -} - func TestUpdateAllocation_AllocationRejected(t *testing.T) { callback, context := initCallbackTest(t, false, false) defer dispatcher.UnregisterAllEventHandlers() @@ -239,33 +218,6 @@ func TestUpdateAllocation_AllocationReleased_StoppedByRM(t *testing.T) { assert.Error(t, err, "timeout waiting for condition") // pod is not expected to be deleted } -func TestUpdateAllocation_AskReleased(t *testing.T) { - callback, context := initCallbackTest(t, false, true) - defer dispatcher.UnregisterAllEventHandlers() - defer dispatcher.Stop() - app := context.getApplication(appID) - app.sm.SetState(ApplicationStates().Running) - var deleteCalled atomic.Bool - context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod *v1.Pod) error { //nolint:errcheck - deleteCalled.Store(true) - return nil - }) - - err := callback.UpdateAllocation(&si.AllocationResponse{ - ReleasedAsks: []*si.AllocationAskRelease{ - { - ApplicationID: appID, - AllocationKey: taskUID1, - TerminationType: si.TerminationType_TIMEOUT, - }, - }, - }) - assert.NilError(t, err, "error updating allocation") - assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1)) - err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, time.Second) - assert.NilError(t, err, "pod has not been deleted") -} - func TestUpdateApplication_Accepted(t *testing.T) { callback, context := initCallbackTest(t, false, false) defer dispatcher.UnregisterAllEventHandlers() diff --git a/pkg/cache/task.go b/pkg/cache/task.go index 02b07d16e..c77739659 100644 --- a/pkg/cache/task.go +++ b/pkg/cache/task.go @@ -292,40 +292,25 @@ func (task *Task) handleSubmitTaskEvent() { AllowPreemptOther: task.isPreemptOtherAllowed(), } - if utils.PodAlreadyBound(task.pod) { - // submit allocation - rr := common.CreateAllocationForTask( - task.applicationID, - task.taskID, - task.pod.Spec.NodeName, - task.resource, - task.placeholder, - task.taskGroupName, - task.pod, - task.originator, - preemptionPolicy) - log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr)) - if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil { - log.Log(log.ShimCacheTask).Debug("failed to send allocation to scheduler", zap.Error(err)) - return - } - } else { - // submit allocation ask - rr := common.CreateAllocationRequestForTask( - task.applicationID, - task.taskID, - task.resource, - task.placeholder, - task.taskGroupName, - task.pod, - task.originator, - preemptionPolicy) - log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr)) - if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil { - log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err)) - return - } + // submit allocation ask + rr := common.CreateAllocationForTask( + task.applicationID, + task.taskID, + task.pod.Spec.NodeName, + task.resource, + task.placeholder, + task.taskGroupName, + task.pod, + task.originator, + preemptionPolicy) + log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr)) + if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil { + log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err)) + return + } + if !utils.PodAlreadyBound(task.pod) { + // if this is a new request, add events to pod events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling", "%s is queued and waiting for allocation", task.alias) // if this task belongs to a task group, that means the app has gang scheduling enabled @@ -516,7 +501,7 @@ func (task *Task) releaseAllocation() { s := TaskStates() switch task.GetTaskState() { case s.New, s.Pending, s.Scheduling, s.Rejected: - releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.allocationKey, task.application.partition, task.terminationType) + releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.application.partition, task.terminationType) default: if task.allocationKey == "" { log.Log(log.ShimCacheTask).Warn("BUG: task allocationKey is empty on release", @@ -525,13 +510,11 @@ func (task *Task) releaseAllocation() { zap.String("taskAlias", task.alias), zap.String("task", task.GetTaskState())) } - releaseRequest = common.CreateReleaseRequestForTask( - task.applicationID, task.taskID, task.allocationKey, task.application.partition, task.terminationType) + releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.application.partition, task.terminationType) } if releaseRequest.Releases != nil { log.Log(log.ShimCacheTask).Info("releasing allocations", - zap.Int("numOfAsksToRelease", len(releaseRequest.Releases.AllocationAsksToRelease)), zap.Int("numOfAllocationsToRelease", len(releaseRequest.Releases.AllocationsToRelease))) } if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(releaseRequest); err != nil { diff --git a/pkg/cache/task_test.go b/pkg/cache/task_test.go index 2e2faf8f6..1ec5f7d8d 100644 --- a/pkg/cache/task_test.go +++ b/pkg/cache/task_test.go @@ -224,11 +224,6 @@ func TestReleaseTaskAllocation(t *testing.T) { assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) return nil }) @@ -244,12 +239,11 @@ func TestReleaseTaskAllocation(t *testing.T) { task = NewTask("task01", app, mockedContext, pod) mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { assert.Assert(t, request.Releases != nil) - assert.Assert(t, request.Releases.AllocationsToRelease == nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) + assert.Assert(t, request.Releases.AllocationsToRelease != nil) + assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) + assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") + assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") + assert.Equal(t, request.Releases.AllocationsToRelease[0].TerminationType, si.TerminationType_STOPPED_BY_RM) return nil }) err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test failure")) @@ -265,11 +259,7 @@ func TestReleaseTaskAllocation(t *testing.T) { assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) + assert.Equal(t, request.Releases.AllocationsToRelease[0].TerminationType, si.TerminationType_STOPPED_BY_RM) return nil }) err = task.handle(NewFailTaskEvent(app.applicationID, "task01", "test failure")) @@ -330,11 +320,10 @@ func TestReleaseTaskAsk(t *testing.T) { // this is to verify we are sending correct info to the scheduler core mockedApiProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { assert.Assert(t, request.Releases != nil) - assert.Assert(t, request.Releases.AllocationsToRelease == nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, app.applicationID) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, task.taskID) + assert.Assert(t, request.Releases.AllocationsToRelease != nil) + assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, app.applicationID) + assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") + assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, task.taskID) return nil }) @@ -598,11 +587,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) { assert.Equal(t, task1.GetTaskState(), TaskStates().Scheduling) assert.Equal(t, rt.time, int64(1)) assert.Assert(t, allocRequest != nil) - assert.Equal(t, len(allocRequest.Asks), 1) - assert.Equal(t, allocRequest.Asks[0].Priority, int32(1000)) - assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil) - assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf) - assert.Assert(t, !allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther) + assert.Equal(t, len(allocRequest.Allocations), 1) + assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1000)) + assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil) + assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf) + assert.Assert(t, !allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther) allocRequest = nil rt.time = 0 // pod with taskGroup name @@ -612,11 +601,11 @@ func TestHandleSubmitTaskEvent(t *testing.T) { assert.Equal(t, task2.GetTaskState(), TaskStates().Scheduling) assert.Equal(t, rt.time, int64(2)) assert.Assert(t, allocRequest != nil) - assert.Equal(t, len(allocRequest.Asks), 1) - assert.Equal(t, allocRequest.Asks[0].Priority, int32(1001)) - assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy != nil) - assert.Assert(t, !allocRequest.Asks[0].PreemptionPolicy.AllowPreemptSelf) - assert.Assert(t, allocRequest.Asks[0].PreemptionPolicy.AllowPreemptOther) + assert.Equal(t, len(allocRequest.Allocations), 1) + assert.Equal(t, allocRequest.Allocations[0].Priority, int32(1001)) + assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy != nil) + assert.Assert(t, !allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptSelf) + assert.Assert(t, allocRequest.Allocations[0].PreemptionPolicy.AllowPreemptOther) // Test over, set Recorder back fake type events.SetRecorder(k8sEvents.NewFakeRecorder(1024)) @@ -665,11 +654,9 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { // because the task is in Scheduling state, // here we expect to trigger a UpdateRequest that contains a releaseAllocationAsk request mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1, - "allocationAskToRelease is not in the expected length") - assert.Equal(t, len(request.Releases.AllocationsToRelease), 0, + assert.Equal(t, len(request.Releases.AllocationsToRelease), 1, "allocationsToRelease is not in the expected length") - askToRelease := request.Releases.AllocationAsksToRelease[0] + askToRelease := request.Releases.AllocationsToRelease[0] assert.Equal(t, askToRelease.ApplicationID, appID) assert.Equal(t, askToRelease.AllocationKey, podUID) return nil @@ -689,8 +676,6 @@ func TestSimultaneousTaskCompleteAndAllocate(t *testing.T) { PartitionName: "default", } mockedAPIProvider.MockSchedulerAPIUpdateAllocationFn(func(request *si.AllocationRequest) error { - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1, - "allocationAskToRelease is not in the expected length") assert.Equal(t, len(request.Releases.AllocationsToRelease), 1, "allocationsToRelease is not in the expected length") allocToRelease := request.Releases.AllocationsToRelease[0] diff --git a/pkg/common/si_helper.go b/pkg/common/si_helper.go index c3c1a7796..82a22530c 100644 --- a/pkg/common/si_helper.go +++ b/pkg/common/si_helper.go @@ -73,11 +73,11 @@ func CreatePriorityForTask(pod *v1.Pod) int32 { } func CreateAllocationRequestForTask(appID, taskID string, resource *si.Resource, placeholder bool, taskGroupName string, pod *v1.Pod, originator bool, preemptionPolicy *si.PreemptionPolicy) *si.AllocationRequest { - ask := si.AllocationAsk{ + ask := si.Allocation{ AllocationKey: taskID, - ResourceAsk: resource, + ResourcePerAlloc: resource, ApplicationID: appID, - Tags: CreateTagsForTask(pod), + AllocationTags: CreateTagsForTask(pod), Placeholder: placeholder, TaskGroupName: taskGroupName, Originator: originator, @@ -86,8 +86,8 @@ func CreateAllocationRequestForTask(appID, taskID string, resource *si.Resource, } return &si.AllocationRequest{ - Asks: []*si.AllocationAsk{&ask}, - RmID: conf.GetSchedulerConf().ClusterID, + Allocations: []*si.Allocation{&ask}, + RmID: conf.GetSchedulerConf().ClusterID, } } @@ -121,30 +121,18 @@ func GetTerminationTypeFromString(terminationTypeStr string) si.TerminationType return si.TerminationType_STOPPED_BY_RM } -func CreateReleaseRequestForTask(appID, taskID, allocationKey, partition, terminationType string) *si.AllocationRequest { - var allocToRelease []*si.AllocationRelease - if allocationKey != "" { - allocToRelease = make([]*si.AllocationRelease, 1) - allocToRelease[0] = &si.AllocationRelease{ - ApplicationID: appID, - AllocationKey: allocationKey, - PartitionName: partition, - TerminationType: GetTerminationTypeFromString(terminationType), - Message: "task completed", - } - } - - askToRelease := make([]*si.AllocationAskRelease, 1) - askToRelease[0] = &si.AllocationAskRelease{ - ApplicationID: appID, - AllocationKey: taskID, - PartitionName: partition, - Message: "task request completed", +func CreateReleaseRequestForTask(appID, taskID, partition, terminationType string) *si.AllocationRequest { + allocToRelease := make([]*si.AllocationRelease, 1) + allocToRelease[0] = &si.AllocationRelease{ + ApplicationID: appID, + AllocationKey: taskID, + PartitionName: partition, + TerminationType: GetTerminationTypeFromString(terminationType), + Message: "task completed", } releaseRequest := si.AllocationReleasesRequest{ - AllocationsToRelease: allocToRelease, - AllocationAsksToRelease: askToRelease, + AllocationsToRelease: allocToRelease, } return &si.AllocationRequest{ diff --git a/pkg/common/si_helper_test.go b/pkg/common/si_helper_test.go index 92f5e2407..b9464bd8c 100644 --- a/pkg/common/si_helper_test.go +++ b/pkg/common/si_helper_test.go @@ -32,45 +32,23 @@ const nodeID = "node-01" func TestCreateReleaseRequestForTask(t *testing.T) { // with allocationKey - request := CreateReleaseRequestForTask("app01", "task01", "task01", "default", "STOPPED_BY_RM") + request := CreateReleaseRequestForTask("app01", "task01", "default", "STOPPED_BY_RM") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) + assert.Equal(t, request.Releases.AllocationsToRelease[0].TerminationType, si.TerminationType_STOPPED_BY_RM) - // without allocationKey - request = CreateReleaseRequestForTask("app01", "task01", "", "default", "STOPPED_BY_RM") - assert.Assert(t, request.Releases != nil) - assert.Assert(t, request.Releases.AllocationsToRelease == nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) - assert.Equal(t, len(request.Releases.AllocationsToRelease), 0) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) - - request = CreateReleaseRequestForTask("app01", "task01", "task01", "default", "UNKNOWN") + request = CreateReleaseRequestForTask("app01", "task01", "default", "UNKNOWN_TERMINATION_TYPE") assert.Assert(t, request.Releases != nil) assert.Assert(t, request.Releases.AllocationsToRelease != nil) - assert.Assert(t, request.Releases.AllocationAsksToRelease != nil) assert.Equal(t, len(request.Releases.AllocationsToRelease), 1) - assert.Equal(t, len(request.Releases.AllocationAsksToRelease), 1) assert.Equal(t, request.Releases.AllocationsToRelease[0].ApplicationID, "app01") assert.Equal(t, request.Releases.AllocationsToRelease[0].AllocationKey, "task01") assert.Equal(t, request.Releases.AllocationsToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].ApplicationID, "app01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].AllocationKey, "task01") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].PartitionName, "default") - assert.Equal(t, request.Releases.AllocationAsksToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) + assert.Equal(t, request.Releases.AllocationsToRelease[0].TerminationType, si.TerminationType_UNKNOWN_TERMINATION_TYPE) } func TestCreateUpdateRequestForRemoveApplication(t *testing.T) { @@ -112,14 +90,14 @@ func TestCreateUpdateRequestForTask(t *testing.T) { } updateRequest := CreateAllocationRequestForTask("appId1", "taskId1", res, false, "", pod, false, preemptionPolicy) - asks := updateRequest.Asks + asks := updateRequest.Allocations assert.Equal(t, len(asks), 1) allocAsk := asks[0] assert.Assert(t, allocAsk != nil) assert.Assert(t, allocAsk.PreemptionPolicy != nil) assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptSelf, true) assert.Equal(t, allocAsk.PreemptionPolicy.AllowPreemptOther, true) - tags := allocAsk.Tags + tags := allocAsk.AllocationTags assert.Assert(t, tags != nil) assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"], podName) assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"namespace"], namespace) @@ -287,7 +265,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) { } updateRequest := CreateAllocationRequestForTask("appId1", "taskId1", res, false, "", pod, false, preemptionPolicy) - asks := updateRequest.Asks + asks := updateRequest.Allocations assert.Equal(t, len(asks), 1) allocAsk := asks[0] if allocAsk == nil { @@ -320,7 +298,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) { } updateRequest1 := CreateAllocationRequestForTask("appId1", "taskId1", res, false, "", pod1, false, preemptionPolicy1) - asks1 := updateRequest1.Asks + asks1 := updateRequest1.Allocations assert.Equal(t, len(asks1), 1) allocAsk1 := asks1[0] if allocAsk1 == nil { @@ -329,7 +307,7 @@ func TestCreateAllocationRequestForTask(t *testing.T) { assert.Assert(t, allocAsk1.PreemptionPolicy != nil) assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptSelf, true) assert.Equal(t, allocAsk1.PreemptionPolicy.AllowPreemptOther, false) - tags := allocAsk1.Tags + tags := allocAsk1.AllocationTags assert.Equal(t, tags[common.DomainK8s+common.GroupMeta+"podName"], podName1) assert.Equal(t, allocAsk1.Priority, int32(100)) }