Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2816] Clean up AllocationReleaseRequest logic and adjust co… #895

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (task *Task) beforeTaskCompleted() {
"Task %s is completed", task.alias)
}

// releaseAllocation sends the release request for the Allocation or the AllocationAsk to the core.
// releaseAllocation sends the release request for the Allocation to the core.
func (task *Task) releaseAllocation() {
// scheduler api might be nil in some tests
if task.context.apiProvider.GetAPIs().SchedulerAPI != nil {
Expand All @@ -494,25 +494,31 @@ func (task *Task) releaseAllocation() {
zap.String("task", task.GetTaskState()),
zap.String("terminationType", task.terminationType))

// The message depends on current task state, generate requests accordingly.
// If allocated send an AllocationReleaseRequest,
// If not allocated yet send an AllocationAskReleaseRequest
// Send an AllocationReleaseRequest,
var releaseRequest *si.AllocationRequest
s := TaskStates()
switch task.GetTaskState() {
case s.New, s.Pending, s.Scheduling, s.Rejected:
releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.application.partition, task.terminationType)
default:

// Check if the task is in a state where it has not been allocated yet.
if task.GetTaskState() != s.New && task.GetTaskState() != s.Pending &&
task.GetTaskState() != s.Scheduling && task.GetTaskState() != s.Rejected {
// Task is in a state where it might have been allocated.
if task.allocationKey == "" {
log.Log(log.ShimCacheTask).Warn("BUG: task allocationKey is empty on release",
zap.String("applicationID", task.applicationID),
zap.String("taskID", task.taskID),
zap.String("taskAlias", task.alias),
zap.String("task", task.GetTaskState()))
zap.String("taskState", task.GetTaskState()))
}
releaseRequest = common.CreateReleaseRequestForTask(task.applicationID, task.taskID, task.application.partition, task.terminationType)
}

// Create the release request.
releaseRequest = common.CreateReleaseRequestForTask(
task.applicationID,
task.taskID,
task.application.partition,
task.terminationType,
)

if releaseRequest.Releases != nil {
log.Log(log.ShimCacheTask).Info("releasing allocations",
zap.Int("numOfAllocationsToRelease", len(releaseRequest.Releases.AllocationsToRelease)))
Expand Down
Loading