Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2368] Shim: Send updated resource requests to core #912

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ go 1.22.0
toolchain go1.22.5

require (
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e h1:VaihjHjtmsDK7HEOjlX8KCz7QDxmZSf71CSCuOgjqcc=
github.com/apache/yunikorn-core v0.0.0-20240827175300-6939b13d1d0e/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3 h1:ySu0cpFSYFGNtf+PZw4ulzO+cWOyJMYJs+AjmwGWM80=
github.com/apache/yunikorn-core v0.0.0-20240908061623-6f06490bcfa3/go.mod h1:HYeyzHhZt43oG54pasKHrwHM+Jeji8nFoAE2bcLWLYg=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a h1:3WRXGTvhunGBZj8AVZDxx7Bs/AXiH9mvf2jYcuDyklA=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20240827015655-68e8c6cca28a/go.mod h1:co3uU98sj1CUTPNTM13lTyi+CY0DOgDndDW2KiUjktU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
12 changes: 7 additions & 5 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,14 @@ func (app *Application) onReservationStateChange() {

for _, t := range app.getTasks(TaskStates().Bound) {
if t.placeholder {
if _, ok := desireCounts[t.taskGroupName]; ok {
desireCounts[t.taskGroupName]--
taskGroupName := t.GetTaskGroupName()
if _, ok := desireCounts[taskGroupName]; ok {
desireCounts[taskGroupName]--
} else {
log.Log(log.ShimCacheApplication).Debug("placeholder taskGroupName set on pod is unknown for application",
zap.String("application", app.applicationID),
zap.String("podName", t.GetTaskPod().Name),
zap.String("taskGroupName", t.taskGroupName))
zap.String("taskGroupName", taskGroupName))
}
}
}
Expand Down Expand Up @@ -659,12 +660,13 @@ func (app *Application) handleAppTaskCompletedEvent() {
}

func (app *Application) publishPlaceholderTimeoutEvents(task *Task) {
if app.originatingTask != nil && task.IsPlaceholder() && task.terminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
taskTerminationType := task.GetTaskTerminationType()
if app.originatingTask != nil && task.IsPlaceholder() && taskTerminationType == si.TerminationType_name[int32(si.TerminationType_TIMEOUT)] {
log.Log(log.ShimCacheApplication).Debug("trying to send placeholder timeout events to the original pod from application",
zap.String("appID", app.applicationID),
zap.Stringer("app request originating pod", app.originatingTask.GetTaskPod()),
zap.String("taskID", task.taskID),
zap.String("terminationType", task.terminationType))
zap.String("terminationType", taskTerminationType))
events.GetRecorder().Eventf(app.originatingTask.GetTaskPod().DeepCopy(), nil, v1.EventTypeWarning, "GangScheduling",
"PlaceholderTimeOut", "Application %s placeholder has been timed out", app.applicationID)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@
app := ctx.getApplication(appID)
if app != nil {
if task := app.GetTask(taskID); task != nil {
task.setTaskPod(pod)
task.SetTaskPod(pod)
}
}

Expand Down Expand Up @@ -1194,7 +1194,7 @@
Reason: "SchedulingSkipped",
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,

Check warning on line 1197 in pkg/cache/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/context.go#L1197

Added line #L1197 was not covered by tests
v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable",
"Task %s is skipped from scheduling because the queue quota has been exceed", task.alias)
}
Expand All @@ -1209,7 +1209,7 @@
Reason: v1.PodReasonUnschedulable,
Message: request.Reason,
}) {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
events.GetRecorder().Eventf(task.GetTaskPod().DeepCopy(), nil,
v1.EventTypeNormal, "PodUnschedulable", "PodUnschedulable",
"Task %s is pending for the requested resources become available", task.alias)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (mgr *PlaceholderManager) createAppPlaceholders(app *Application) error {
// map task group to count of already created placeholders
tgCounts := make(map[string]int32)
for _, ph := range app.getPlaceHolderTasks() {
tgCounts[ph.getTaskGroupName()]++
tgCounts[ph.GetTaskGroupName()]++
}

// iterate all task groups, create placeholders for all the min members
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
task.setAllocationKey(alloc.AllocationKey)

if err := callback.context.AssumePod(alloc.AllocationKey, alloc.NodeID); err != nil {
task.failWithEvent(err.Error(), "AssumePodError")
task.FailWithEvent(err.Error(), "AssumePodError")
return err
}

Expand Down
110 changes: 71 additions & 39 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,27 @@
)

type Task struct {
taskID string
alias string
applicationID string
application *Application
taskID string
alias string
applicationID string
application *Application
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
createTime time.Time
placeholder bool
originator bool
sm *fsm.FSM

// mutable resources, require locking
allocationKey string
resource *si.Resource
pod *v1.Pod
podStatus v1.PodStatus // pod status, maintained separately for efficiency reasons
context *Context
nodeName string
createTime time.Time
taskGroupName string
placeholder bool
terminationType string
originator bool
schedulingState TaskSchedulingState
sm *fsm.FSM
lock *locking.RWMutex
resource *si.Resource
pod *v1.Pod

lock *locking.RWMutex
}

func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
Expand Down Expand Up @@ -135,14 +138,10 @@
}

func (task *Task) GetTaskID() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskID
}

func (task *Task) IsPlaceholder() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.placeholder
}

Expand All @@ -157,19 +156,25 @@
task.taskGroupName = groupName
}

func (task *Task) setTaskTerminationType(terminationTyp string) {
func (task *Task) setTaskTerminationType(terminationType string) {
task.lock.Lock()
defer task.lock.Unlock()
task.terminationType = terminationTyp
task.terminationType = terminationType
}

func (task *Task) getTaskGroupName() string {
func (task *Task) GetTaskTerminationType() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.terminationType
}

func (task *Task) GetTaskGroupName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.taskGroupName
}

func (task *Task) getNodeName() string {
func (task *Task) GetNodeName() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.nodeName
Expand Down Expand Up @@ -222,8 +227,6 @@
}

func (task *Task) IsOriginator() bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.originator
}

Expand Down Expand Up @@ -286,13 +289,33 @@
log.Log(log.ShimCacheTask).Debug("scheduling pod",
zap.String("podName", task.pod.Name))

// send update allocation event to core
task.updateAllocation()

if !utils.PodAlreadyBound(task.pod) {
// if this is a new request, add events to pod
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has gang scheduling enabled
// in this case, post an event to indicate the task is being gang scheduled
if !task.placeholder && task.taskGroupName != "" {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch",
"Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName)
}
}
}

// updateAllocation updates the core scheduler when task information changes.
// This function must be called with the task lock held.
func (task *Task) updateAllocation() {
// build preemption policy
preemptionPolicy := &si.PreemptionPolicy{
AllowPreemptSelf: task.isPreemptSelfAllowed(),
AllowPreemptOther: task.isPreemptOtherAllowed(),
}

// submit allocation ask
// submit allocation
rr := common.CreateAllocationForTask(
task.applicationID,
task.taskID,
Expand All @@ -305,22 +328,9 @@
preemptionPolicy)
log.Log(log.ShimCacheTask).Debug("send update request", zap.Stringer("request", rr))
if err := task.context.apiProvider.GetAPIs().SchedulerAPI.UpdateAllocation(rr); err != nil {
log.Log(log.ShimCacheTask).Debug("failed to send scheduling request to scheduler", zap.Error(err))
log.Log(log.ShimCacheTask).Debug("failed to send allocation to scheduler", zap.Error(err))

Check warning on line 331 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L331

Added line #L331 was not covered by tests
return
}

if !utils.PodAlreadyBound(task.pod) {
// if this is a new request, add events to pod
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil, v1.EventTypeNormal, "Scheduling", "Scheduling",
"%s is queued and waiting for allocation", task.alias)
// if this task belongs to a task group, that means the app has gang scheduling enabled
// in this case, post an event to indicate the task is being gang scheduled
if !task.placeholder && task.taskGroupName != "" {
events.GetRecorder().Eventf(task.pod.DeepCopy(), nil,
v1.EventTypeNormal, "GangScheduling", "TaskGroupMatch",
"Pod belongs to the taskGroup %s, it will be scheduled as a gang member", task.taskGroupName)
}
}
}

// this is called after task reaches PENDING state,
Expand Down Expand Up @@ -604,20 +614,42 @@
return false, pod
}

func (task *Task) GetAllocationKey() string {
task.lock.RLock()
defer task.lock.RUnlock()
return task.allocationKey

Check warning on line 620 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L617-L620

Added lines #L617 - L620 were not covered by tests
}

func (task *Task) setAllocationKey(allocationKey string) {
task.lock.Lock()
defer task.lock.Unlock()
task.allocationKey = allocationKey
}

func (task *Task) FailWithEvent(errorMessage, actionReason string) {
task.lock.RLock()
defer task.lock.RUnlock()
task.failWithEvent(errorMessage, actionReason)
}

func (task *Task) failWithEvent(errorMessage, actionReason string) {
dispatcher.Dispatch(NewFailTaskEvent(task.applicationID, task.taskID, errorMessage))
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason, errorMessage)
}

func (task *Task) setTaskPod(pod *v1.Pod) {
func (task *Task) SetTaskPod(pod *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()

task.pod = pod
oldResource := task.resource
newResource := common.GetPodResource(pod)
if !common.Equals(oldResource, newResource) {
// pod resources have changed
task.resource = newResource

Check warning on line 650 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L650

Added line #L650 was not covered by tests

// update allocation in core
task.updateAllocation()

Check warning on line 653 in pkg/cache/task.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/task.go#L653

Added line #L653 was not covered by tests
}
}
4 changes: 2 additions & 2 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestReleaseTaskAllocation(t *testing.T) {
// bind a task is a async process, wait for it to happen
err = utils.WaitForCondition(
func() bool {
return task.getNodeName() == "node-1"
return task.GetNodeName() == "node-1"
},
100*time.Millisecond,
3*time.Second,
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSetTaskGroup(t *testing.T) {
}
task := NewTask("task01", app, mockedContext, pod)
task.setTaskGroupName("test-group")
assert.Equal(t, task.getTaskGroupName(), "test-group")
assert.Equal(t, task.GetTaskGroupName(), "test-group")
}

//nolint:funlen
Expand Down