Skip to content

Commit

Permalink
Merge pull request #3 from apache/master
Browse files Browse the repository at this point in the history
update from apache
  • Loading branch information
kingamarton authored Apr 14, 2020
2 parents cf3b44e + d127a56 commit ad23003
Show file tree
Hide file tree
Showing 23 changed files with 217 additions and 186 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ sched_image: scheduler
@mkdir -p ./deployments/image/configmap/admission-controller-init-scripts
@cp -r ./deployments/admission-controllers/scheduler/* deployments/image/configmap/admission-controller-init-scripts/
@sed -i'.bkp' 's/clusterVersion=.*"/clusterVersion=${VERSION}"/' deployments/image/configmap/Dockerfile
@coreSHA=$$(go list -m "github.com/apache/incubator-yunikorn-core" | cut -d "-" -f4) ; \
siSHA=$$(go list -m "github.com/apache/incubator-yunikorn-scheduler-interface" | cut -d "-" -f5) ; \
@coreSHA=$$(go list -m "github.com/apache/incubator-yunikorn-core" | cut -d "-" -f5) ; \
siSHA=$$(go list -m "github.com/apache/incubator-yunikorn-scheduler-interface" | cut -d "-" -f6) ; \
shimSHA=$$(git rev-parse --short=12 HEAD) ; \
docker build ./deployments/image/configmap -t ${REGISTRY}/yunikorn-scheduler-k8s:${VERSION} \
--label "yunikorn-core-revision=$${coreSHA}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ spec:
labels:
app: yunikorn
spec:
tolerations:
- operator: "Exists"
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand All @@ -23,7 +25,7 @@ spec:
- key: component
operator: In
values:
- scheduler
- yunikorn-scheduler
topologyKey: "kubernetes.io/hostname"
containers:
- name: yunikorn-admission-controller-webhook
Expand Down
2 changes: 1 addition & 1 deletion deployments/scheduler/scheduler-load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
metadata:
labels:
app: yunikorn
component: scheduler
component: yunikorn-scheduler
name: yunikorn-scheduler
spec:
containers:
Expand Down
2 changes: 1 addition & 1 deletion deployments/scheduler/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ spec:
metadata:
labels:
app: yunikorn
component: scheduler
component: yunikorn-scheduler
name: yunikorn-scheduler
spec:
hostNetwork: true
Expand Down
2 changes: 1 addition & 1 deletion helm-charts/yunikorn/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
name: yunikorn-scheduler
labels:
app: yunikorn
component: scheduler
component: yunikorn-scheduler
release: {{ .Release.Name }}
spec:
serviceAccountName: {{ .Values.serviceAccount }}
Expand Down
2 changes: 1 addition & 1 deletion pkg/appmgmt/appmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (svc *AppManagementService) Start() error {
return err
}

log.Logger.Info("service started",
log.Logger.Info("app management service started",
zap.String("serviceName", optService.Name()))
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func NewApplication(appID, queueName, user string, tags map[string]string, sched
string(events.RecoverApplication): app.handleRecoverApplicationEvent,
string(events.RejectApplication): app.handleRejectApplicationEvent,
string(events.CompleteApplication): app.handleCompleteApplicationEvent,
events.EnterState: app.enterState,
},
)

Expand Down Expand Up @@ -349,3 +350,10 @@ func (app *Application) handleRejectApplicationEvent(event *fsm.Event) {
func (app *Application) handleCompleteApplicationEvent(event *fsm.Event) {
// TODO app lifecycle updates
}

func (app *Application) enterState(event *fsm.Event) {
log.Logger.Info("app state",
zap.String("appID", app.applicationID),
zap.String("queue", app.queue),
zap.String("state", app.GetApplicationState()))
}
21 changes: 11 additions & 10 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ func (ctx *Context) addNode(obj interface{}) {
}

// add node to secondary scheduler cache
log.Logger.Info("adding node to cache", zap.String("NodeName", node.Name))
log.Logger.Debug("adding node to cache", zap.String("NodeName", node.Name))
ctx.schedulerCache.AddNode(node)

// add node to internal cache
ctx.nodes.addNode(node)

// post the event
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "Accepted",
"node is accepted by the scheduler")
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "NodeAccepted",
fmt.Sprintf("node %s is accepted by the scheduler", node.Name))
}

func (ctx *Context) updateNode(oldObj, newObj interface{}) {
Expand All @@ -140,8 +140,6 @@ func (ctx *Context) updateNode(oldObj, newObj interface{}) {
}

// update secondary cache
log.Logger.Debug("updating node in cache",
zap.String("OldNodeName", oldNode.Name))
if err := ctx.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
log.Logger.Error("unable to update node in scheduler cache",
zap.Error(err))
Expand Down Expand Up @@ -171,8 +169,8 @@ func (ctx *Context) deleteNode(obj interface{}) {
ctx.nodes.deleteNode(node)

// post the event
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "Deleted",
"node is deleted from the scheduler")
events.GetRecorder().Eventf(node, v1.EventTypeNormal, "NodeDeleted",
fmt.Sprintf("node %s is deleted from the scheduler", node.Name))
}

func (ctx *Context) addPodToCache(obj interface{}) {
Expand All @@ -182,7 +180,7 @@ func (ctx *Context) addPodToCache(obj interface{}) {
return
}

log.Logger.Info("adding pod to cache", zap.String("podName", pod.Name))
log.Logger.Debug("adding pod to cache", zap.String("podName", pod.Name))
if err := ctx.schedulerCache.AddPod(pod); err != nil {
log.Logger.Error("add pod to scheduler cache failed",
zap.String("podName", pod.Name),
Expand All @@ -207,7 +205,7 @@ func (ctx *Context) removePodFromCache(obj interface{}) {
return
}

log.Logger.Info("removing pod from cache", zap.String("podName", pod.Name))
log.Logger.Debug("removing pod from cache", zap.String("podName", pod.Name))
if err := ctx.schedulerCache.RemovePod(pod); err != nil {
log.Logger.Error("failed to remove pod from scheduler cache",
zap.String("podName", pod.Name),
Expand Down Expand Up @@ -486,7 +484,10 @@ func (ctx *Context) RemoveApplication(appID string) error {

// this implements ApplicationManagementProtocol
func (ctx *Context) AddTask(request *interfaces.AddTaskRequest) interfaces.ManagedTask {
log.Logger.Debug("AddTask", zap.Any("Request", request))
log.Logger.Debug("AddTask",
zap.String("appID", request.Metadata.ApplicationID),
zap.String("taskID", request.Metadata.TaskID),
zap.Bool("isRecovery", request.Recovery))
if managedApp := ctx.GetApplication(request.Metadata.ApplicationID); managedApp != nil {
if app, valid := managedApp.(*Application); valid {
existingTask, err := app.GetTask(request.Metadata.TaskID)
Expand Down
103 changes: 54 additions & 49 deletions pkg/cache/context_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,60 +80,63 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
// add all known nodes to cache, waiting for recover
for _, node := range allNodes {
ctx.nodes.addAndReportNode(node, false)
// current, disable getting pods for a node during test,
// because in the tests, we don't really send existing allocations
// we simply simulate to accept or reject nodes on conditions.
if !ctx.apiProvider.IsTestingMode() {
var podList *corev1.PodList
podList, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().
CoreV1().Pods("").
List(metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name),
})
if err != nil {
return err
}
}

occupiedResources := common.NewResourceBuilder().Build()
for _, pod := range podList.Items {
// only handle assigned pods
if !utils.IsAssignedPod(&pod) {
continue
}
// yunikorn scheduled pods add to existing allocations
if utils.GeneralPodFilter(&pod) {
if existingAlloc := getExistingAllocation(mgr, &pod); existingAlloc != nil {
log.Logger.Debug("existing allocation",
zap.String("appID", existingAlloc.ApplicationID),
zap.String("podUID", string(pod.UID)),
zap.String("podNodeName", existingAlloc.NodeID))
if err = ctx.nodes.addExistingAllocation(existingAlloc); err != nil {
log.Logger.Warn("add existing allocation failed", zap.Error(err))
}
}
} else if utils.IsPodRunning(&pod) {
// pod is running but not scheduled by us
// we should report this occupied resource to scheduler-core
occupiedResources = common.Add(occupiedResources, common.GetPodResource(&pod))
if err = ctx.nodes.cache.AddPod(&pod); err != nil {
log.Logger.Warn("failed to update scheduler-cache",
zap.Error(err))
// current, disable getting pods for a node during test,
// because in the tests, we don't really send existing allocations
// we simply simulate to accept or reject nodes on conditions.
if !ctx.apiProvider.IsTestingMode() {
var podList *corev1.PodList
podList, err = ctx.apiProvider.GetAPIs().KubeClient.GetClientSet().
CoreV1().Pods("").
List(metav1.ListOptions{})
if err != nil {
return err
}

nodeOccupiedResources := make(map[string]*si.Resource)
for _, pod := range podList.Items {
// only handle assigned pods
if !utils.IsAssignedPod(&pod) {
continue
}
// yunikorn scheduled pods add to existing allocations
if utils.GeneralPodFilter(&pod) {
if existingAlloc := getExistingAllocation(mgr, &pod); existingAlloc != nil {
log.Logger.Debug("existing allocation",
zap.String("appID", existingAlloc.ApplicationID),
zap.String("podUID", string(pod.UID)),
zap.String("podNodeName", existingAlloc.NodeID))
if err = ctx.nodes.addExistingAllocation(existingAlloc); err != nil {
log.Logger.Warn("add existing allocation failed", zap.Error(err))
}
}
} else if utils.IsPodRunning(&pod) {
// pod is running but not scheduled by us
// we should report this occupied resource to scheduler-core
occupiedResource := nodeOccupiedResources[pod.Spec.NodeName]
if occupiedResource == nil {
occupiedResource = common.NewResourceBuilder().Build()
}
occupiedResource = common.Add(occupiedResource, common.GetPodResource(&pod))
nodeOccupiedResources[pod.Spec.NodeName] = occupiedResource
if err = ctx.nodes.cache.AddPod(&pod); err != nil {
log.Logger.Warn("failed to update scheduler-cache",
zap.Error(err))
}
}
}

// why we need to calculate the occupied resources here? why not add an event-handler
// in node_coordinator#addPod?
// this is because the occupied resources must be calculated and counted before the
// scheduling started. If we do both updating existing occupied resources along with
// new pods scheduling, due to the fact that we cannot predicate the ordering of K8s
// events, it could be dangerous because we might schedule pods onto some node that
// doesn't have enough capacity (occupied resources not yet reported).
log.Logger.Info("update occupied resources that allocated by other scheduler",
zap.String("node", node.Name),
zap.String("totalOccupied", occupiedResources.String()))
if cachedNode := ctx.nodes.getNode(node.Name); cachedNode != nil {
cachedNode.setOccupiedResource(occupiedResources)
// why we need to calculate the occupied resources here? why not add an event-handler
// in node_coordinator#addPod?
// this is because the occupied resources must be calculated and counted before the
// scheduling started. If we do both updating existing occupied resources along with
// new pods scheduling, due to the fact that we cannot predicate the ordering of K8s
// events, it could be dangerous because we might schedule pods onto some node that
// doesn't have enough capacity (occupied resources not yet reported).
for nodeName, occupiedResource := range nodeOccupiedResources {
if cachedNode := ctx.nodes.getNode(nodeName); cachedNode != nil {
cachedNode.setOccupiedResource(occupiedResource)
}
}
}
Expand All @@ -156,6 +159,8 @@ func (ctx *Context) recover(mgr []interfaces.Recoverable, due time.Duration) err
nodesRecovered++
case events.States().Node.Draining:
nodesRecovered++
case events.States().Node.Rejected:
nodesRecovered++
}
}

Expand Down
Loading

0 comments on commit ad23003

Please sign in to comment.