Skip to content

Commit

Permalink
[YUNIKORN-2211] Replace Allocation uuid with allocationID (#740)
Browse files Browse the repository at this point in the history
Closes: #740

Signed-off-by: Manikandan R <[email protected]>
  • Loading branch information
manirajv06 committed Dec 5, 2023
1 parent ff939a5 commit 2738571
Show file tree
Hide file tree
Showing 21 changed files with 121 additions and 121 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ module github.com/apache/yunikorn-k8shim
go 1.20

require (
github.com/apache/yunikorn-core v0.0.0-20231127054725-3b9c96615796
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231020041412-6f80d179257c
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/looplab/fsm v1.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
github.com/apache/yunikorn-core v0.0.0-20231127054725-3b9c96615796 h1:3OiAqfOtLrldegMNOq6kcq/vDMo4mqhqQkOS96uI0Ik=
github.com/apache/yunikorn-core v0.0.0-20231127054725-3b9c96615796/go.mod h1:nZRI1fm9wa3bhdD4tpDtrEh7ll/Ft/z+NG/gi8l8M14=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231020041412-6f80d179257c h1:KTIC3f+3aQdAo42YRxs27VpDWY6y73bxXpWcAii2IlQ=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231020041412-6f80d179257c/go.mod h1:3NQfrhroMqU++kDTroBrTyCRKAczwwX//Fkj/ag/rsY=
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07 h1:DNhQrQJYmPpujUBzLtSxFyV4Y1L69hVXuNiE0+EitYA=
github.com/apache/yunikorn-core v0.0.0-20231203141034-3ae625bcfc07/go.mod h1:JG66N3TskSNVAMoAUbAVagS14ZrOgcjGpRXbcpAMMvI=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653 h1:pUbVmmR+LWuy0L8dGCZNue9UNpWKsY7yFYcCtPtWAic=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20231201001639-c81397b31653/go.mod h1:zDWV5y9Zh9DM1C65RCVXT1nhNNO8kykVW7bzPFamNYw=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,14 @@ func (app *Application) handleFailApplicationEvent(errMsg string) {
}
}

func (app *Application) handleReleaseAppAllocationEvent(allocUUID string, terminationType string) {
func (app *Application) handleReleaseAppAllocationEvent(allocationID string, terminationType string) {
log.Log(log.ShimCacheApplication).Info("try to release pod from application",
zap.String("appID", app.applicationID),
zap.String("allocationUUID", allocUUID),
zap.String("allocationID", allocationID),
zap.String("terminationType", terminationType))

for _, task := range app.taskMap {
if task.allocationUUID == allocUUID {
if task.allocationID == allocationID {
task.setTaskTerminationType(terminationType)
err := task.DeleteTaskPod(task.pod)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,15 @@ func (ue UpdateApplicationReservationEvent) GetApplicationID() string {
// ------------------------
type ReleaseAppAllocationEvent struct {
applicationID string
allocationUUID string
allocationID string
terminationType string
event ApplicationEventType
}

func NewReleaseAppAllocationEvent(appID string, allocTermination si.TerminationType, uuid string) ReleaseAppAllocationEvent {
func NewReleaseAppAllocationEvent(appID string, allocTermination si.TerminationType, allocationID string) ReleaseAppAllocationEvent {
return ReleaseAppAllocationEvent{
applicationID: appID,
allocationUUID: uuid,
allocationID: allocationID,
terminationType: si.TerminationType_name[int32(allocTermination)],
event: ReleaseAppAllocation,
}
Expand All @@ -288,7 +288,7 @@ func (re ReleaseAppAllocationEvent) GetApplicationID() string {

func (re ReleaseAppAllocationEvent) GetArgs() []interface{} {
args := make([]interface{}, 2)
args[0] = re.allocationUUID
args[0] = re.allocationID
args[1] = re.terminationType
return args
}
Expand Down Expand Up @@ -558,9 +558,9 @@ func newAppState() *fsm.FSM { //nolint:funlen
log.Log(log.ShimFSM).Error("fail to parse event arg", zap.Error(err))
return
}
allocUUID := eventArgs[0]
allocationID := eventArgs[0]
terminationType := eventArgs[1]
app.handleReleaseAppAllocationEvent(allocUUID, terminationType)
app.handleReleaseAppAllocationEvent(allocationID, terminationType)
},
ReleaseAppAllocationAsk.String(): func(_ context.Context, event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
Expand Down
60 changes: 30 additions & 30 deletions pkg/cache/application_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,39 +671,39 @@ func TestUpdateApplicationReservationEventGetApplicationID(t *testing.T) {

func TestNewReleaseAppAllocationEvent(t *testing.T) {
tests := []struct {
name string
appID, allocationUUID string
terminationType si.TerminationType
wantID, wantUUID, wantType string
wantEvent ApplicationEventType
name string
appID, allocationID string
terminationType si.TerminationType
wantID, wantAllocationID, wantType string
wantEvent ApplicationEventType
}{
{TestCreateName, "testAppId001", "testUUID001", si.TerminationType_TIMEOUT, "testAppId001", "testUUID001", "TIMEOUT", ReleaseAppAllocation},
{TestCreateName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, "testAppId001", "testAllocationID001", "TIMEOUT", ReleaseAppAllocation},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationUUID)
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID)
t.Run(tt.name, func(t *testing.T) {
if instance.applicationID != tt.wantID || instance.allocationUUID != tt.wantUUID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent {
if instance.applicationID != tt.wantID || instance.allocationID != tt.wantAllocationID || instance.terminationType != tt.wantType || instance.event != tt.wantEvent {
t.Errorf("want %s %s %s %s, got %s %s %s %s",
tt.wantID, tt.wantUUID, tt.wantType, tt.wantEvent,
instance.applicationID, instance.allocationUUID, instance.terminationType, instance.event)
tt.wantID, tt.wantAllocationID, tt.wantType, tt.wantEvent,
instance.applicationID, instance.allocationID, instance.terminationType, instance.event)
}
})
}
}

func TestReleaseAppAllocationEventGetEvent(t *testing.T) {
tests := []struct {
name string
appID, allocationUUID string
terminationType si.TerminationType
wantEvent ApplicationEventType
name string
appID, allocationID string
terminationType si.TerminationType
wantEvent ApplicationEventType
}{
{TestEventName, "testAppId001", "testUUID001", si.TerminationType_TIMEOUT, ReleaseAppAllocation},
{TestEventName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, ReleaseAppAllocation},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationUUID)
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID)
event := instance.GetEvent()
t.Run(tt.name, func(t *testing.T) {
if event != tt.wantEvent.String() {
Expand All @@ -715,18 +715,18 @@ func TestReleaseAppAllocationEventGetEvent(t *testing.T) {

func TestReleaseAppAllocationEventGetArgs(t *testing.T) {
tests := []struct {
name string
appID, allocationUUID string
terminationType si.TerminationType
wantLen int
castOk []bool
wantArg []string
name string
appID, allocationID string
terminationType si.TerminationType
wantLen int
castOk []bool
wantArg []string
}{
{TestArgsName, "testAppId001", "testUUID001", si.TerminationType_TIMEOUT, 2, []bool{true, true}, []string{"testUUID001", "TIMEOUT"}},
{TestArgsName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, 2, []bool{true, true}, []string{"testAllocationID001", "TIMEOUT"}},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationUUID)
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID)
args := instance.GetArgs()
t.Run(tt.name, func(t *testing.T) {
if len(args) != tt.wantLen {
Expand All @@ -748,16 +748,16 @@ func TestReleaseAppAllocationEventGetArgs(t *testing.T) {

func TestReleaseAppAllocationEventGetApplicationID(t *testing.T) {
tests := []struct {
name string
appID, allocationUUID string
terminationType si.TerminationType
wantID string
name string
appID, allocationID string
terminationType si.TerminationType
wantID string
}{
{TestAppIDName, "testAppId001", "testUUID001", si.TerminationType_TIMEOUT, "testAppId001"},
{TestAppIDName, "testAppId001", "testAllocationID001", si.TerminationType_TIMEOUT, "testAppId001"},
}

for _, tt := range tests {
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationUUID)
instance := NewReleaseAppAllocationEvent(tt.appID, tt.terminationType, tt.allocationID)
appID := instance.GetApplicationID()
t.Run(tt.name, func(t *testing.T) {
if appID != tt.wantID {
Expand Down
26 changes: 13 additions & 13 deletions pkg/cache/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,17 @@ func TestReleaseAppAllocation(t *testing.T) {
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, ms)
task := NewTask("task01", app, context, pod)
app.addTask(task)
task.allocationUUID = taskUUID
task.allocationID = taskAllocationID
// app must be running states
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskUUID))
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID))
if err == nil {
// this should give an error
t.Error("expecting error got 'nil'")
}
// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskUUID))
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestTryReservePostRestart(t *testing.T) {
Containers: containers,
},
})
task0.allocationUUID = string(task0.pod.UID)
task0.allocationID = string(task0.pod.UID)
task0.nodeName = "fake-host"
task0.sm.SetState(TaskStates().Allocated)

Expand Down Expand Up @@ -997,22 +997,22 @@ func TestReleaseAppAllocationInFailingState(t *testing.T) {
app := NewApplication(appID, "root.abc", "testuser", testGroups, map[string]string{}, ms)
task := NewTask("task01", app, context, pod)
app.addTask(task)
task.allocationUUID = taskUUID
task.allocationID = taskAllocationID
// app must be running states
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskUUID))
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID))
if err == nil {
// this should give an error
t.Error("expecting error got 'nil'")
}
// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskUUID))
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
app.SetState(ApplicationStates().Failing)
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskUUID))
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, taskAllocationID))
assert.NilError(t, err)
// after handle release event the states of app must be failing
assertAppState(t, app, ApplicationStates().Failing, 3*time.Second)
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func TestResumingStateTransitions(t *testing.T) {
// Add tasks
app.addTask(task1)
app.addTask(task2)
task1.allocationUUID = taskUUID
task1.allocationID = taskAllocationID
context.addApplication(app)

// Set app state to "reserving"
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
assert.Equal(t, len(app.GetNewTasks()), 1)

appID := "app00001"
UUID := "UID-POD-00002"
allocationID := "UID-POD-00002"

context.addApplication(app)
task1 := context.AddTask(&AddTaskRequest{
Expand All @@ -1184,16 +1184,16 @@ func TestPlaceholderTimeoutEvents(t *testing.T) {
_, taskErr := app.GetTask("task02")
assert.NilError(t, taskErr, "Task should exist")

task1.allocationUUID = UUID
task1.allocationID = allocationID

// app must be running states
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID))
err := app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, allocationID))
assert.Error(t, err, "event ReleaseAppAllocation inappropriate in current state New")

// set app states to running, let event can be trigger
app.SetState(ApplicationStates().Running)
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, UUID))
err = app.handle(NewReleaseAppAllocationEvent(appID, si.TerminationType_TIMEOUT, allocationID))
assert.NilError(t, err)
// after handle release event the states of app must be running
assertAppState(t, app, ApplicationStates().Running, 3*time.Second)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/appmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (svc *AppManagementService) GetExistingAllocation(pod *v1.Pod) *si.Allocati
return &si.Allocation{
AllocationKey: string(pod.UID),
AllocationTags: meta.Tags,
UUID: string(pod.UID),
AllocationID: string(pod.UID),
ResourcePerAlloc: common.GetPodResource(pod),
NodeID: pod.Spec.NodeName,
ApplicationID: meta.ApplicationID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/appmgmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestAMSvcGetExistingAllocation(t *testing.T) {
alloc := am.GetExistingAllocation(pod)
assert.Equal(t, alloc.ApplicationID, "app00001")
assert.Equal(t, alloc.AllocationKey, string(pod.UID))
assert.Equal(t, alloc.UUID, string(pod.UID))
assert.Equal(t, alloc.AllocationID, string(pod.UID))
assert.Equal(t, alloc.NodeID, "allocated-node")
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,11 +846,11 @@ func TestRecoverTask(t *testing.T) {
assert.Equal(t, len(app.getTasks(TaskStates().New)), 1)

taskInfoVerifiers := []struct {
taskID string
expectedState string
expectedAllocationUUID string
expectedPodName string
expectedNodeName string
taskID string
expectedState string
expectedAllocationID string
expectedPodName string
expectedNodeName string
}{
{taskUID1, TaskStates().Bound, taskUID1, "pod1", fakeNodeName},
{taskUID2, TaskStates().Completed, taskUID2, "pod2", fakeNodeName},
Expand All @@ -864,7 +864,7 @@ func TestRecoverTask(t *testing.T) {
rt, err := app.GetTask(tt.taskID)
assert.NilError(t, err)
assert.Equal(t, rt.GetTaskState(), tt.expectedState)
assert.Equal(t, rt.allocationUUID, tt.expectedAllocationUUID)
assert.Equal(t, rt.allocationID, tt.expectedAllocationID)
assert.Equal(t, rt.pod.Name, tt.expectedPodName)
assert.Equal(t, rt.alias, fmt.Sprintf("%s/%s", podNamespace, tt.expectedPodName))
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestAddExistingAllocation(t *testing.T) {
alloc01 := si.Allocation{
AllocationKey: "pod001",
AllocationTags: nil,
UUID: "podUID001",
AllocationID: "podUID001",
ResourcePerAlloc: nil,
Priority: 0,
NodeID: "host001",
Expand All @@ -47,7 +47,7 @@ func TestAddExistingAllocation(t *testing.T) {
assert.Equal(t, len(node.existingAllocations), 1)
alloc02 := node.existingAllocations[0]
assert.Equal(t, alloc02.AllocationKey, alloc01.AllocationKey)
assert.Equal(t, alloc02.UUID, alloc01.UUID)
assert.Equal(t, alloc02.AllocationID, alloc01.AllocationID)
assert.Equal(t, alloc02.NodeID, alloc01.NodeID)
assert.Equal(t, alloc02.PartitionName, alloc01.PartitionName)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/scheduler_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
// got allocation for pod, bind pod to the scheduled node
log.Log(log.ShimRMCallback).Debug("callback: response to new allocation",
zap.String("allocationKey", alloc.AllocationKey),
zap.String("UUID", alloc.UUID),
zap.String("allocationID", alloc.AllocationID),
zap.String("applicationID", alloc.ApplicationID),
zap.String("nodeID", alloc.NodeID))

Expand All @@ -60,7 +60,7 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons
return err
}
if app := callback.context.GetApplication(alloc.ApplicationID); app != nil {
ev := NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.UUID, alloc.NodeID)
ev := NewAllocateTaskEvent(app.GetApplicationID(), alloc.AllocationKey, alloc.AllocationID, alloc.NodeID)
dispatcher.Dispatch(ev)
}
}
Expand All @@ -78,15 +78,15 @@ func (callback *AsyncRMCallback) UpdateAllocation(response *si.AllocationRespons

for _, release := range response.Released {
log.Log(log.ShimRMCallback).Debug("callback: response to released allocations",
zap.String("UUID", release.UUID))
zap.String("AllocationID", release.AllocationID))

// update cache
callback.context.ForgetPod(release.GetAllocationKey())

// TerminationType 0 mean STOPPED_BY_RM
if release.TerminationType != si.TerminationType_STOPPED_BY_RM {
// send release app allocation to application states machine
ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.UUID)
ev := NewReleaseAppAllocationEvent(release.ApplicationID, release.TerminationType, release.AllocationID)
dispatcher.Dispatch(ev)
}
}
Expand Down
Loading

0 comments on commit 2738571

Please sign in to comment.