Skip to content

Commit

Permalink
[YUNIKORN-2779] Shim: Use UpdateAllocation for both asks and allocations
Browse files Browse the repository at this point in the history
  • Loading branch information
craigcondit committed Aug 15, 2024
1 parent eed4ea1 commit f73ba6d
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 394 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586
github.com/apache/yunikorn-core v0.0.0-20240815191722-7c5b32590f8a
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9 h1:s1Co/K+cR9Q/GW0e974dToW9eyLQZxYoCp0TCoEuEj0=
github.com/apache/yunikorn-core v0.0.0-20240802210614-4aec626c6bf9/go.mod h1:S9yGBGA2i2hAajtEc2t4lmiPJDZz3Ek8eVxz5KhJqGI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586 h1:ZVpo9Qj2/gvwX6Rl44UxkZBm2pZWEJDYWTramc9hwF0=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240731203810-92032b13d586/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/apache/yunikorn-core v0.0.0-20240815191722-7c5b32590f8a h1:aQ5l+ziopny1kzo8b61JCbc59WNaJ5qRZxWYdSVfga0=
github.com/apache/yunikorn-core v0.0.0-20240815191722-7c5b32590f8a/go.mod h1:QHKfJ2RyZuQnZg28SnypmnvFxN/zfoYf+hmfxiVdq5g=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e h1:ZOLst6ROwUrgoUQbEdYaz28iKuiU5YNYGtelKsTFhqw=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240815142741-38a38685cd4e/go.mod h1:WuHJpVk34t8N5+1ErYGj/5Qq33/cRzL4YtuoAsbMtWc=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
Expand Down
35 changes: 6 additions & 29 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,42 +627,19 @@ func (app *Application) handleFailApplicationEvent(errMsg string) {
}
}

func (app *Application) handleReleaseAppAllocationEvent(allocationKey string, terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("allocationKey", allocationKey),
zap.String("terminationType", terminationType))

for _, task := range app.taskMap {
if task.allocationKey == allocationKey {
task.setTaskTerminationType(terminationType)
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err))
}
app.publishPlaceholderTimeoutEvents(task)
}
}
}

func (app *Application) handleReleaseAppAllocationAskEvent(taskID string, terminationType string) {
func (app *Application) handleReleaseAppAllocationEvent(taskID string, terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID),
zap.String("terminationType", terminationType))

if task, ok := app.taskMap[taskID]; ok {
task.setTaskTerminationType(terminationType)
if task.IsPlaceholder() {
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation ask from application", zap.Error(err))
}
app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("skip to release allocation ask, ask is not a placeholder",
zap.String("appID", app.applicationID),
zap.String("taskID", taskID))
err := task.DeleteTaskPod()
if err != nil {
log.Log(log.ShimCacheApplication).Error("failed to release allocation from application", zap.Error(err))

Check warning on line 640 in pkg/cache/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/application.go#L640

Added line #L640 was not covered by tests
}
app.publishPlaceholderTimeoutEvents(task)
} else {
log.Log(log.ShimCacheApplication).Warn("task not found",
zap.String("appID", app.applicationID),
Expand Down
64 changes: 3 additions & 61 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ const (
KillApplication
KilledApplication
ReleaseAppAllocation
ReleaseAppAllocationAsk
ResumingApplication
AppTaskCompleted
)

func (ae ApplicationEventType) String() string {
return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ReleaseAppAllocationAsk", "ResumingApplication", "AppTaskCompleted"}[ae]
return [...]string{"SubmitApplication", "AcceptApplication", "TryReserve", "UpdateReservation", "RunApplication", "RejectApplication", "CompleteApplication", "FailApplication", "KillApplication", "KilledApplication", "ReleaseAppAllocation", "ResumingApplication", "AppTaskCompleted"}[ae]
}

// ------------------------
Expand Down Expand Up @@ -295,37 +294,6 @@ func (re ReleaseAppAllocationEvent) GetEvent() string {
return re.event.String()
}

type ReleaseAppAllocationAskEvent struct {
applicationID string
taskID string
terminationType string
event ApplicationEventType
}

func NewReleaseAppAllocationAskEvent(appID string, allocTermination si.TerminationType, taskID string) ReleaseAppAllocationAskEvent {
return ReleaseAppAllocationAskEvent{
applicationID: appID,
taskID: taskID,
terminationType: si.TerminationType_name[int32(allocTermination)],
event: ReleaseAppAllocationAsk,
}
}

func (re ReleaseAppAllocationAskEvent) GetApplicationID() string {
return re.applicationID
}

func (re ReleaseAppAllocationAskEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
args[0] = re.taskID
args[1] = re.terminationType
return args
}

func (re ReleaseAppAllocationAskEvent) GetEvent() string {
return re.event.String()
}

// ------------------------
// Resuming application
// ------------------------
Expand Down Expand Up @@ -434,7 +402,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
},
{
Name: ReleaseAppAllocation.String(),
Src: []string{states.Running},
Src: []string{states.Running, states.Accepted, states.Reserving},
Dst: states.Running,
},
{
Expand All @@ -447,21 +415,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
Src: []string{states.Resuming},
Dst: states.Resuming,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Running, states.Accepted, states.Reserving},
Dst: states.Running,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Failing},
Dst: states.Failing,
},
{
Name: ReleaseAppAllocationAsk.String(),
Src: []string{states.Resuming},
Dst: states.Resuming,
},
{
Name: CompleteApplication.String(),
Src: []string{states.Running},
Expand Down Expand Up @@ -543,17 +496,6 @@ func newAppState() *fsm.FSM { //nolint:funlen
app.onReservationStateChange()
},
ReleaseAppAllocation.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
log.Log(log.ShimFSM).Error("fail to parse event arg", zap.Error(err))
return
}
allocationKey := eventArgs[0]
terminationType := eventArgs[1]
app.handleReleaseAppAllocationEvent(allocationKey, terminationType)
},
ReleaseAppAllocationAsk.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
eventArgs := make([]string, 2)
if err := events.GetEventArgsAsStrings(eventArgs, event.Args[1].([]interface{})); err != nil {
Expand All @@ -562,7 +504,7 @@ func newAppState() *fsm.FSM { //nolint:funlen
}
taskID := eventArgs[0]
terminationType := eventArgs[1]
app.handleReleaseAppAllocationAskEvent(taskID, terminationType)
app.handleReleaseAppAllocationEvent(taskID, terminationType)
},
AppTaskCompleted.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
100 changes: 0 additions & 100 deletions pkg/cache/application_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,105 +769,6 @@ func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) {
}
}

func TestNewReleaseAppAllocationAskEvent(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantID, wantTaskID, wantType string
wantEvent ApplicationEventType
}{
{TestCreateName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001", "testTaskId001", "TIMEOUT", ReleaseAppAllocationAsk},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
t.Run(tt.name, func(t *testing.T) {
if instance.applicationID != tt.wantID || instance.taskID != tt.taskID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent {
t.Errorf("want %s %s %s %s, got %s %s %s %s",
tt.wantID, tt.taskID, tt.wantType, tt.wantEvent,
instance.applicationID, instance.taskID, instance.terminationType, instance.event)
}
})
}
}

func TestReleaseAppAllocationAskEventGetEvent(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantEvent ApplicationEventType
}{
{TestEventName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, ReleaseAppAllocationAsk},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
event := instance.GetEvent()
t.Run(tt.name, func(t *testing.T) {
if event != tt.wantEvent.String() {
t.Errorf("want %s, got %s", tt.wantEvent, event)
}
})
}
}

func TestReleaseAppAllocationAskEventGetArgs(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantLen int
wantTaskID, wantType string
castOk []bool
wantArg []string
}{
{TestArgsName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, 2, "testTaskId001", "TIMEOUT", []bool{true, true}, []string{"testTaskId001", "TIMEOUT"}},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
args := instance.GetArgs()
t.Run(tt.name, func(t *testing.T) {
if len(args) != tt.wantLen {
t.Errorf("want %d, got %d", tt.wantLen, len(args))

for index, arg := range args {
info, ok := arg.(string)
if ok != tt.castOk[index] {
t.Errorf("want %v, got %v", tt.castOk[index], ok)
}
if info != tt.wantArg[index] {
t.Errorf("want %s, got %s", tt.wantArg[index], info)
}
}
}
})
}
}

func TestReleaseAppAllocationAskEventGetApplicationID(t *testing.T) {
tests := []struct {
name string
appID, taskID string
terminationType si.TerminationType
wantID string
}{
{TestAppIDName, "testAppId001", "testTaskId001", si.TerminationType_TIMEOUT, "testAppId001"},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationAskEvent(tt.appID, tt.terminationType, tt.taskID)
appID := instance.GetApplicationID()
t.Run(tt.name, func(t *testing.T) {
if appID != tt.wantID {
t.Errorf("want %s, got %s", tt.wantID, appID)
}
})
}
}

func TestNewResumingApplicationEvent(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -902,7 +803,6 @@ func TestApplicationEventsAsString(t *testing.T) {
assert.Equal(t, KillApplication.String(), "KillApplication")
assert.Equal(t, KilledApplication.String(), "KilledApplication")
assert.Equal(t, ReleaseAppAllocation.String(), "ReleaseAppAllocation")
assert.Equal(t, ReleaseAppAllocationAsk.String(), "ReleaseAppAllocationAsk")
assert.Equal(t, ResumingApplication.String(), "ResumingApplication")
assert.Equal(t, AppTaskCompleted.String(), "AppTaskCompleted")
}
19 changes: 0 additions & 19 deletions pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
}
}

for _, reject := range response.Rejected {
// request rejected by the scheduler, put it back and try scheduling again
log.Log(log.ShimRMCallback).Debug("callback: response to rejected ask",
zap.String("allocationKey", reject.AllocationKey))
dispatcher.Dispatch(NewRejectTaskEvent(reject.ApplicationID, reject.AllocationKey,
fmt.Sprintf("task %s ask from application %s is rejected by scheduler",
reject.AllocationKey, reject.ApplicationID)))
}

for _, reject := range response.RejectedAllocations {
// request rejected by the scheduler, reject it
log.Log(log.ShimRMCallback).Debug("callback: response to rejected allocation",
Expand All @@ -110,16 +101,6 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
}
}

for _, ask := range response.ReleasedAsks {
log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
zap.String("allocation key", ask.AllocationKey))

if ask.TerminationType == si.TerminationType_TIMEOUT {
ev := NewReleaseAppAllocationAskEvent(ask.ApplicationID, ask.TerminationType, ask.AllocationKey)
dispatcher.Dispatch(ev)
}
}

return nil
}

Expand Down
48 changes: 0 additions & 48 deletions pkg/cache/scheduler_callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,6 @@ func TestUpdateAllocation_NewTask_PodAlreadyAssigned(t *testing.T) {
assert.Equal(t, TaskSchedAllocated, task.schedulingState)
}

func TestUpdateAllocation_AskRejected(t *testing.T) {
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
defer dispatcher.Stop()

err := callback.UpdateAllocation(&si.AllocationResponse{
Rejected: []*si.RejectedAllocationAsk{
{
ApplicationID: appID,
AllocationKey: taskUID1,
},
},
})
assert.NilError(t, err, "error updating allocation")
task := context.getTask(appID, taskUID1)
err = utils.WaitForCondition(func() bool {
return task.GetTaskState() == TaskStates().Failed
}, 10*time.Millisecond, time.Second)
assert.NilError(t, err, "task has not transitioned to Failed state")
}

func TestUpdateAllocation_AllocationRejected(t *testing.T) {
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
Expand Down Expand Up @@ -239,33 +218,6 @@ func TestUpdateAllocation_AllocationReleased_StoppedByRM(t *testing.T) {
assert.Error(t, err, "timeout waiting for condition") // pod is not expected to be deleted
}

func TestUpdateAllocation_AskReleased(t *testing.T) {
callback, context := initCallbackTest(t, false, true)
defer dispatcher.UnregisterAllEventHandlers()
defer dispatcher.Stop()
app := context.getApplication(appID)
app.sm.SetState(ApplicationStates().Running)
var deleteCalled atomic.Bool
context.apiProvider.(*client.MockedAPIProvider).MockDeleteFn(func(pod *v1.Pod) error { //nolint:errcheck
deleteCalled.Store(true)
return nil
})

err := callback.UpdateAllocation(&si.AllocationResponse{
ReleasedAsks: []*si.AllocationAskRelease{
{
ApplicationID: appID,
AllocationKey: taskUID1,
TerminationType: si.TerminationType_TIMEOUT,
},
},
})
assert.NilError(t, err, "error updating allocation")
assert.Assert(t, !context.schedulerCache.IsAssumedPod(taskUID1))
err = utils.WaitForCondition(deleteCalled.Load, 10*time.Millisecond, time.Second)
assert.NilError(t, err, "pod has not been deleted")
}

func TestUpdateApplication_Accepted(t *testing.T) {
callback, context := initCallbackTest(t, false, false)
defer dispatcher.UnregisterAllEventHandlers()
Expand Down
Loading

0 comments on commit f73ba6d

Please sign in to comment.