diff --git a/pkg/common/errors.go b/pkg/common/errors.go index 1e8334ea3..f73c84b46 100644 --- a/pkg/common/errors.go +++ b/pkg/common/errors.go @@ -27,3 +27,4 @@ const PreemptionPreconditionsFailed = "Preemption preconditions failed" const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed" const PreemptionShortfall = "Preemption helped but short of resources" const PreemptionDoesNotHelp = "Preemption does not help" +const NoVictimForRequiredNode = "No fit on required node, preemption does not help" diff --git a/pkg/scheduler/objects/allocation.go b/pkg/scheduler/objects/allocation.go index 37d0bd618..17fd531c2 100644 --- a/pkg/scheduler/objects/allocation.go +++ b/pkg/scheduler/objects/allocation.go @@ -475,6 +475,11 @@ func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int) { a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, predicateErrors, a.GetAllocatedResource()) } +// SendRequiredNodePreemptionFailedEvent updates the event system with required node preemption failed event. +func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) { + a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource()) +} + // GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met. func (a *Allocation) GetAllocationLog() []*AllocationLogEntry { a.RLock() diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 98bbcdb75..0a254bf9e 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -1409,9 +1409,6 @@ func (sa *Application) tryPreemption(headRoom *resources.Resource, preemptionDel } func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allocation) bool { - log.Log(log.SchedApplication).Info("Triggering preemption process for daemon set ask", - zap.String("ds allocation key", ask.GetAllocationKey())) - // try preemption and see if we can free up resource preemptor := NewRequiredNodePreemptor(reserve.node, ask) preemptor.filterAllocations() @@ -1434,9 +1431,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo "preempting allocations to free up resources to run daemon set ask: "+ask.GetAllocationKey()) return true } - log.Log(log.SchedApplication).Warn("Problem in finding the victims for preempting resources to meet required ask requirements", - zap.String("ds allocation key", ask.GetAllocationKey()), - zap.String("node id", reserve.nodeID)) + ask.LogAllocationFailure(common.NoVictimForRequiredNode, true) + ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID) return false } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 0b77db566..5b7eed879 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -21,6 +21,7 @@ package objects import ( "fmt" "math" + "strings" "testing" "time" @@ -2916,6 +2917,172 @@ func TestPredicateFailedEvents(t *testing.T) { assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin failed (2x); ", event.Message) } +func TestRequiredNodePreemption(t *testing.T) { + // tests successful RequiredNode (DaemonSet) preemption + app := newApplication(appID0, "default", "root.default") + var releaseEvents []*rmevent.RMReleaseAllocationEvent + app.rmEventHandler = &mockAppEventHandler{ + callback: func(ev interface{}) { + if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok { + releaseEvents = append(releaseEvents, rmEvent) + go func() { + rmEvent.Channel <- &rmevent.Result{ + Succeeded: true, + } + }() + } + }, + } + node := newNode(nodeID1, map[string]resources.Quantity{"first": 20}) + node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled()) + iterator := getNodeIteratorFn(node) + getNode := func(nodeID string) *Node { + return node + } + + // set queue + rootQ, err := createRootQueue(map[string]string{"first": "20"}) + assert.NilError(t, err) + childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"}) + assert.NilError(t, err) + app.SetQueue(childQ) + + // add an ask + mockEvents := mock.NewEventSystem() + askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) + ask1 := newAllocationAsk("ask-1", "app-1", askRes) + ask1.askEvents = schedEvt.NewAskEvents(mockEvents) + err = app.AddAllocationAsk(ask1) + assert.NilError(t, err, "could not add ask-1") + preemptionAttemptsRemaining := 0 + + // allocate ask + headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1") + assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key") + + // add ask2 with required node + ask2 := newAllocationAsk("ask-2", "app-1", askRes) + ask2.askEvents = schedEvt.NewAskEvents(mockEvents) + ask2.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask2) + assert.NilError(t, err, "could not add ask-2") + + // try to allocate ask2 with node being full - expect a reservation + result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved") + assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key") + err = app.Reserve(node, ask2) + assert.NilError(t, err, "reservation failed") + + // preemption + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Assert(t, ask1.IsPreempted(), "ask1 has not been preempted") + assert.Assert(t, ask2.HasTriggeredPreemption(), "ask2 has not triggered preemption") + assert.Equal(t, 1, len(releaseEvents), "unexpected number of release events") + assert.Equal(t, 1, len(releaseEvents[0].ReleasedAllocations), "unexpected number of release allocations") + assert.Equal(t, "ask-1", releaseEvents[0].ReleasedAllocations[0].AllocationKey, "allocation key") + + // 2nd attempt - no preemption this time + releaseEvents = nil + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Assert(t, releaseEvents == nil, "unexpected release event") + + // check for preemption related events + for _, event := range mockEvents.Events { + assert.Assert(t, !strings.Contains(strings.ToLower(event.Message), "preemption"), "received a preemption related event") + } +} + +func TestRequiredNodePreemptionFailed(t *testing.T) { + // tests RequiredNode (DaemonSet) preemption where the victim pod has a high priority, hence preemption is not possible + app := newApplication(appID0, "default", "root.default") + var releaseEvents []*rmevent.RMReleaseAllocationEvent + app.rmEventHandler = &mockAppEventHandler{ + callback: func(ev interface{}) { + if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok { + releaseEvents = append(releaseEvents, rmEvent) + go func() { + rmEvent.Channel <- &rmevent.Result{ + Succeeded: true, + } + }() + } + }, + } + node := newNode(nodeID1, map[string]resources.Quantity{"first": 20}) + node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled()) + iterator := getNodeIteratorFn(node) + getNode := func(nodeID string) *Node { + return node + } + + // set queue + rootQ, err := createRootQueue(map[string]string{"first": "20"}) + assert.NilError(t, err) + childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"}) + assert.NilError(t, err) + app.SetQueue(childQ) + + // add an ask with high priority + mockEvents := mock.NewEventSystem() + askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15}) + ask1 := newAllocationAsk("ask-1", "app-1", askRes) + ask1.askEvents = schedEvt.NewAskEvents(mockEvents) + ask1.priority = 1000 + err = app.AddAllocationAsk(ask1) + assert.NilError(t, err, "could not add ask-1") + preemptionAttemptsRemaining := 0 + + // allocate ask + headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) + result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1") + assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key") + + // add ask2 with required node + ask2 := newAllocationAsk("ask-2", "app-1", askRes) + ask2.askEvents = schedEvt.NewAskEvents(mockEvents) + ask2.requiredNode = nodeID1 + err = app.AddAllocationAsk(ask2) + assert.NilError(t, err, "could not add ask-2") + + // try to allocate ask2 with node being full - expect a reservation + result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved") + assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key") + err = app.Reserve(node, ask2) + assert.NilError(t, err, "reservation failed") + + // try preemption - should not succeed + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Assert(t, !ask1.IsPreempted(), "unexpected preemption of ask1") + assert.Assert(t, !ask2.HasTriggeredPreemption(), "unexpected preemption triggered from ask2") + assert.Equal(t, 0, len(releaseEvents), "unexpected number of release events") + // check for events + noEvents := 0 + var requestEvt *si.EventRecord + for _, event := range mockEvents.Events { + if event.Type == si.EventRecord_REQUEST && strings.Contains(strings.ToLower(event.Message), "preemption") { + noEvents++ + requestEvt = event + } + } + assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events") + assert.Equal(t, "Unschedulable request 'ask-2' with required node 'node-1', no preemption victim found", requestEvt.Message) + assert.Equal(t, 1, len(ask2.allocLog), "unexpected number of entries in the allocation log") + assert.Equal(t, int32(1), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count") + assert.Equal(t, common.NoVictimForRequiredNode, ask2.allocLog[common.NoVictimForRequiredNode].Message, "unexpected log message") + + // check counting & event throttling + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation") + assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events") + assert.Equal(t, int32(4), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count") +} + type testIterator struct{} func (testIterator) ForEachNode(fn func(*Node) bool) { @@ -3035,3 +3202,11 @@ func TestGetUint64Tag(t *testing.T) { }) } } + +type mockAppEventHandler struct { + callback func(ev interface{}) +} + +func (m mockAppEventHandler) HandleEvent(ev interface{}) { + m.callback(ev) +} diff --git a/pkg/scheduler/objects/events/ask_events.go b/pkg/scheduler/objects/events/ask_events.go index 64265c6dc..0e10a2530 100644 --- a/pkg/scheduler/objects/events/ask_events.go +++ b/pkg/scheduler/objects/events/ask_events.go @@ -32,8 +32,9 @@ import ( // AskEvents Request-specific events. These events are of REQUEST type, so they are eventually sent to the respective pods in K8s. type AskEvents struct { - eventSystem events.EventSystem - limiter *rate.Limiter + eventSystem events.EventSystem + predicateLimiter *rate.Limiter + reqNodeLimiter *rate.Limiter } func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string, headroom, allocatedResource *resources.Resource, queuePath string) { @@ -73,7 +74,7 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, appID string, allocate } func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateErrors map[string]int, allocatedResource *resources.Resource) { - if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() { + if !ae.eventSystem.IsEventTrackingEnabled() || !ae.predicateLimiter.Allow() { return } @@ -94,13 +95,24 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateError ae.eventSystem.AddEvent(event) } +func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node string, allocatedResource *resources.Resource) { + if !ae.eventSystem.IsEventTrackingEnabled() || !ae.reqNodeLimiter.Allow() { + return + } + + message := fmt.Sprintf("Unschedulable request '%s' with required node '%s', no preemption victim found", allocKey, node) + event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource) + ae.eventSystem.AddEvent(event) +} + func NewAskEvents(evt events.EventSystem) *AskEvents { return newAskEventsWithRate(evt, 15*time.Second, 1) } func newAskEventsWithRate(evt events.EventSystem, interval time.Duration, burst int) *AskEvents { return &AskEvents{ - eventSystem: evt, - limiter: rate.NewLimiter(rate.Every(interval), burst), + eventSystem: evt, + predicateLimiter: rate.NewLimiter(rate.Every(interval), burst), + reqNodeLimiter: rate.NewLimiter(rate.Every(interval), burst), } } diff --git a/pkg/scheduler/objects/events/ask_events_test.go b/pkg/scheduler/objects/events/ask_events_test.go index add2fc539..8f6f8558e 100644 --- a/pkg/scheduler/objects/events/ask_events_test.go +++ b/pkg/scheduler/objects/events/ask_events_test.go @@ -133,12 +133,48 @@ func TestPredicateFailedEvents(t *testing.T) { assert.Equal(t, 1, len(eventSystem.Events)) event := eventSystem.Events[0] assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message) + assert.Equal(t, si.EventRecord_REQUEST, event.Type) + assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) + assert.Equal(t, "alloc-0", event.ObjectID) + assert.Equal(t, "app-0", event.ReferenceID) eventSystem.Reset() // wait a bit, a new event is expected time.Sleep(100 * time.Millisecond) - events.SendPredicatesFailed("alloc-0", "app-0", errors, resource) + events.SendPredicatesFailed("alloc-1", "app-0", errors, resource) assert.Equal(t, 1, len(eventSystem.Events)) event = eventSystem.Events[0] - assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message) + assert.Equal(t, "Unschedulable request 'alloc-1': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message) +} + +func TestRequiredNodePreemptionFailedEvents(t *testing.T) { + resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1}) + eventSystem := mock.NewEventSystemDisabled() + events := NewAskEvents(eventSystem) + events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource) + assert.Equal(t, 0, len(eventSystem.Events)) + + eventSystem = mock.NewEventSystem() + events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1) + // only the first event is expected to be emitted due to rate limiting + for i := 0; i < 200; i++ { + events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource) + } + assert.Equal(t, 1, len(eventSystem.Events)) + event := eventSystem.Events[0] + assert.Equal(t, "Unschedulable request 'alloc-0' with required node 'node-1', no preemption victim found", event.Message) + assert.Equal(t, si.EventRecord_REQUEST, event.Type) + assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail) + assert.Equal(t, "alloc-0", event.ObjectID) + assert.Equal(t, "app-0", event.ReferenceID) + protoRes := resources.NewResourceFromProto(event.Resource) + assert.DeepEqual(t, resource, protoRes) + + eventSystem.Reset() + // wait a bit, a new event is expected + time.Sleep(100 * time.Millisecond) + events.SendRequiredNodePreemptionFailed("alloc-1", "app-0", nodeID1, resource) + assert.Equal(t, 1, len(eventSystem.Events)) + event = eventSystem.Events[0] + assert.Equal(t, "Unschedulable request 'alloc-1' with required node 'node-1', no preemption victim found", event.Message) }