Skip to content

Commit

Permalink
Merge branch 'master' into YUNIKORN-2629_ContextLock
Browse files Browse the repository at this point in the history
  • Loading branch information
pbacsko authored Aug 1, 2024
2 parents 2dddcd7 + 138d53a commit 70c749f
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ linters-settings:
goimports:
local-prefixes: github.com/apache/yunikorn
govet:
check-shadowing: true
shadow: true
goconst:
min-occurrences: 5
funlen:
Expand Down
76 changes: 50 additions & 26 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ endif

# shellcheck
SHELLCHECK_VERSION=v0.9.0
SHELLCHECK_BIN=${TOOLS_DIR}/shellcheck
SHELLCHECK_PATH=${TOOLS_DIR}/shellcheck-$(SHELLCHECK_VERSION)
SHELLCHECK_BIN=${SHELLCHECK_PATH}/shellcheck
SHELLCHECK_ARCHIVE := shellcheck-$(SHELLCHECK_VERSION).$(OS).$(HOST_ARCH).tar.xz
ifeq (darwin, $(OS))
ifeq (arm64, $(HOST_ARCH))
Expand All @@ -143,41 +144,56 @@ ifeq (armv7l, $(HOST_ARCH))
SHELLCHECK_ARCHIVE := shellcheck-$(SHELLCHECK_VERSION).$(OS).armv6hf.tar.xz
endif
endif
export PATH := $(BASE_DIR)/$(SHELLCHECK_PATH):$(PATH)

# golangci-lint
GOLANGCI_LINT_VERSION=1.57.2
GOLANGCI_LINT_BIN=$(TOOLS_DIR)/golangci-lint
GOLANGCI_LINT_PATH=$(TOOLS_DIR)/golangci-lint-v$(GOLANGCI_LINT_VERSION)
GOLANGCI_LINT_BIN=$(GOLANGCI_LINT_PATH)/golangci-lint
GOLANGCI_LINT_ARCHIVE=golangci-lint-$(GOLANGCI_LINT_VERSION)-$(OS)-$(EXEC_ARCH).tar.gz
GOLANGCI_LINT_ARCHIVEBASE=golangci-lint-$(GOLANGCI_LINT_VERSION)-$(OS)-$(EXEC_ARCH)
export PATH := $(BASE_DIR)/$(GOLANGCI_LINT_PATH):$(PATH)

# kubectl
KUBECTL_VERSION=v1.27.7
KUBECTL_BIN=$(TOOLS_DIR)/kubectl
KUBECTL_PATH=$(TOOLS_DIR)/kubectl-$(KUBECTL_VERSION)
KUBECTL_BIN=$(KUBECTL_PATH)/kubectl
export PATH := $(BASE_DIR)/$(KUBECTL_PATH):$(PATH)

# kind
KIND_VERSION=v0.23.0
KIND_BIN=$(TOOLS_DIR)/kind
KIND_PATH=$(TOOLS_DIR)/kind-$(KIND_VERSION)
KIND_BIN=$(KIND_PATH)/kind
export PATH := $(BASE_DIR)/$(KIND_PATH):$(PATH)

# helm
HELM_VERSION=v3.12.1
HELM_BIN=$(TOOLS_DIR)/helm
HELM_PATH=$(TOOLS_DIR)/helm-$(HELM_VERSION)
HELM_BIN=$(HELM_PATH)/helm
HELM_ARCHIVE=helm-$(HELM_VERSION)-$(OS)-$(EXEC_ARCH).tar.gz
HELM_ARCHIVE_BASE=$(OS)-$(EXEC_ARCH)
export PATH := $(BASE_DIR)/$(HELM_PATH):$(PATH)

# spark
export SPARK_VERSION=3.3.3
# sometimes the image is not avaiable with $SPARK_VERSION, the minor version must match
export SPARK_PYTHON_VERSION=3.3.1
export SPARK_HOME=$(BASE_DIR)$(TOOLS_DIR)/spark
export SPARK_HOME=$(BASE_DIR)$(TOOLS_DIR)/spark-v$(SPARK_VERSION)
export SPARK_SUBMIT_CMD=$(SPARK_HOME)/bin/spark-submit
export SPARK_PYTHON_IMAGE=docker.io/apache/spark-py:v$(SPARK_PYTHON_VERSION)
export PATH := $(SPARK_HOME):$(PATH)

# go-licenses
GO_LICENSES_VERSION=v1.6.0
GO_LICENSES_BIN=$(TOOLS_DIR)/go-licenses
GO_LICENSES_PATH=$(TOOLS_DIR)/go-licenses-$(GO_LICENSES_VERSION)
GO_LICENSES_BIN=$(GO_LICENSES_PATH)/go-licenses
export PATH := $(BASE_DIR)/$(GO_LICENSES_PATH):$(PATH)

# ginkgo
GINKGO_BIN=$(TOOLS_DIR)/ginkgo
GINKGO_VERSION=v2.19.0
GINKGO_PATH=$(TOOLS_DIR)/ginkgo-$(GINKGO_VERSION)
GINKGO_BIN=$(GINKGO_PATH)/ginkgo
export PATH := $(BASE_DIR)/$(GINKGO_PATH):$(PATH)

FLAG_PREFIX=github.com/apache/yunikorn-k8shim/pkg/conf

Expand Down Expand Up @@ -222,50 +238,61 @@ init: conf/scheduler-config-local.yaml
conf/scheduler-config-local.yaml: conf/scheduler-config.yaml
./scripts/plugin-conf-gen.sh $(KUBECONFIG) "conf/scheduler-config.yaml" "conf/scheduler-config-local.yaml"

# Print tools version
.PHONY: print_kubectl_version
print_kubectl_version:
@echo $(KUBECTL_VERSION)
.PHONY: print_kind_version
print_kind_version:
@echo $(KIND_VERSION)
.PHONY: print_helm_version
print_helm_version:
@echo $(HELM_VERSION)

# Install tools
.PHONY: tools
tools: $(SHELLCHECK_BIN) $(GOLANGCI_LINT_BIN) $(KUBECTL_BIN) $(KIND_BIN) $(HELM_BIN) $(SPARK_SUBMIT_CMD) $(GO_LICENSES_BIN) $(GINKGO_BIN)

# Install shellcheck
$(SHELLCHECK_BIN):
@echo "installing shellcheck $(SHELLCHECK_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@mkdir -p "$(SHELLCHECK_PATH)"
@curl -sSfL "https://github.com/koalaman/shellcheck/releases/download/$(SHELLCHECK_VERSION)/$(SHELLCHECK_ARCHIVE)" \
| tar -x -J --strip-components=1 -C "$(TOOLS_DIR)" "shellcheck-$(SHELLCHECK_VERSION)/shellcheck"
| tar -x -J --strip-components=1 -C "$(SHELLCHECK_PATH)" "shellcheck-$(SHELLCHECK_VERSION)/shellcheck"

# Install golangci-lint
$(GOLANGCI_LINT_BIN):
@echo "installing golangci-lint v$(GOLANGCI_LINT_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@mkdir -p "$(GOLANGCI_LINT_PATH)"
@curl -sSfL "https://github.com/golangci/golangci-lint/releases/download/v$(GOLANGCI_LINT_VERSION)/$(GOLANGCI_LINT_ARCHIVE)" \
| tar -x -z --strip-components=1 -C "$(TOOLS_DIR)" "$(GOLANGCI_LINT_ARCHIVEBASE)/golangci-lint"
| tar -x -z --strip-components=1 -C "$(GOLANGCI_LINT_PATH)" "$(GOLANGCI_LINT_ARCHIVEBASE)/golangci-lint"

# Install kubectl
$(KUBECTL_BIN):
@echo "installing kubectl $(KUBECTL_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@mkdir -p "$(KUBECTL_PATH)"
@curl -sSfL -o "$(KUBECTL_BIN)" \
"https://storage.googleapis.com/kubernetes-release/release/$(KUBECTL_VERSION)/bin/$(OS)/$(EXEC_ARCH)/kubectl" && \
chmod +x "$(KUBECTL_BIN)"

# Install kind
$(KIND_BIN):
@echo "installing kind $(KIND_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@mkdir -p "$(KIND_PATH)"
@curl -sSfL -o "$(KIND_BIN)" \
"https://kind.sigs.k8s.io/dl/$(KIND_VERSION)/kind-$(OS)-$(EXEC_ARCH)" && \
chmod +x "$(KIND_BIN)"

# Install helm
$(HELM_BIN):
@echo "installing helm $(HELM_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@mkdir -p "$(HELM_PATH)"
@curl -sSfL "https://get.helm.sh/$(HELM_ARCHIVE)" \
| tar -x -z --strip-components=1 -C "$(TOOLS_DIR)" "$(HELM_ARCHIVE_BASE)/helm"
| tar -x -z --strip-components=1 -C "$(HELM_PATH)" "$(HELM_ARCHIVE_BASE)/helm"

# Install spark
$(SPARK_SUBMIT_CMD):
@echo "installing spark $(SPARK_VERSION)"
@echo "installing spark v$(SPARK_VERSION)"
@rm -rf "$(SPARK_HOME)" "$(SPARK_HOME).tmp"
@mkdir -p "$(SPARK_HOME).tmp"
@curl -sSfL "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz" \
Expand All @@ -275,23 +302,20 @@ $(SPARK_SUBMIT_CMD):
# Install go-licenses
$(GO_LICENSES_BIN):
@echo "installing go-licenses $(GO_LICENSES_VERSION)"
@mkdir -p "$(TOOLS_DIR)"
@GOBIN="$(BASE_DIR)/$(TOOLS_DIR)" "$(GO)" install "github.com/google/go-licenses@$(GO_LICENSES_VERSION)"
@mkdir -p "$(GO_LICENSES_PATH)"
@GOBIN="$(BASE_DIR)/$(GO_LICENSES_PATH)" "$(GO)" install "github.com/google/go-licenses@$(GO_LICENSES_VERSION)"

$(GINKGO_BIN):
@echo "installing ginkgo"
@mkdir -p "$(TOOLS_DIR)"
@GOBIN="$(BASE_DIR)/$(TOOLS_DIR)" "$(GO)" install "github.com/onsi/ginkgo/v2/ginkgo"
@echo "installing ginkgo $(GINKGO_VERSION)"
@mkdir -p "$(GINKGO_PATH)"
@GOBIN="$(BASE_DIR)/$(GINKGO_PATH)" "$(GO)" install "github.com/onsi/ginkgo/v2/ginkgo@$(GINKGO_VERSION)"

# Run lint against the previous commit for PR and branch build
# In dev setup look at all changes on top of master
.PHONY: lint
lint: $(GOLANGCI_LINT_BIN)
@echo "running golangci-lint"
@git symbolic-ref -q HEAD && REV="origin/HEAD" || REV="HEAD^" ; \
headSHA=$$(git rev-parse --short=12 $${REV}) ; \
echo "checking against commit sha $${headSHA}" ; \
"${GOLANGCI_LINT_BIN}" run
@"${GOLANGCI_LINT_BIN}" run

# Check scripts
.PHONY: check_scripts
Expand Down
3 changes: 2 additions & 1 deletion pkg/admission/conf/am_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package conf

import (
"errors"
"fmt"
"regexp"
"strconv"
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionController
func (acc *AdmissionControllerConf) RegisterHandlers(configMaps informersv1.ConfigMapInformer) error {
_, err := configMaps.Informer().AddEventHandler(&configMapUpdateHandler{conf: acc})
if err != nil {
return fmt.Errorf("failed to create register handlers: %w", err)
return errors.Join(errors.New("failed to create register handlers: "), err)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/admission/namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package admission

import (
"fmt"
"errors"

v1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -62,7 +62,7 @@ func NewNamespaceCache(namespaces informersv1.NamespaceInformer) (*NamespaceCach
if namespaces != nil {
_, err := namespaces.Informer().AddEventHandler(&namespaceUpdateHandler{cache: nsc})
if err != nil {
return nil, fmt.Errorf("failed to create namespace cache: %w", err)
return nil, errors.Join(errors.New("failed to create namespace cache: "), err)
}
}
return nsc, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/admission/priority_class_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package admission

import (
"fmt"
"errors"

schedulingv1 "k8s.io/api/scheduling/v1"
informersv1 "k8s.io/client-go/informers/scheduling/v1"
Expand All @@ -45,7 +45,7 @@ func NewPriorityClassCache(priorityClasses informersv1.PriorityClassInformer) (*
if priorityClasses != nil {
_, err := priorityClasses.Informer().AddEventHandler(&priorityClassUpdateHandler{cache: pcc})
if err != nil {
return nil, fmt.Errorf("failed to create a new cache and register the handler: %w", err)
return nil, errors.Join(errors.New("failed to create a new cache and register the handler: "), err)
}
}
return pcc, nil
Expand Down
24 changes: 14 additions & 10 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (ctx *Context) updateNodeInternal(node *v1.Node, register bool) {
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

Expand Down Expand Up @@ -286,22 +286,26 @@ func (ctx *Context) UpdatePod(_, newObj interface{}) {
log.Log(log.ShimContext).Error("failed to update pod", zap.Error(err))
return
}
if utils.GetApplicationIDFromPod(pod) == "" {
applicationID := utils.GetApplicationIDFromPod(pod)
if applicationID == "" {
ctx.updateForeignPod(pod)
} else {
ctx.updateYuniKornPod(pod)
ctx.updateYuniKornPod(applicationID, pod)
}
}

func (ctx *Context) updateYuniKornPod(pod *v1.Pod) {
// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
if taskMeta, ok := getTaskMetadata(pod); ok {
if app := ctx.GetApplication(taskMeta.ApplicationID); app != nil {
ctx.notifyTaskComplete(taskMeta.ApplicationID, taskMeta.TaskID)
}
func (ctx *Context) updateYuniKornPod(appID string, pod *v1.Pod) {
var app *Application
taskID := string(pod.UID)
if app = ctx.getApplication(appID); app != nil {
if task := app.GetTask(taskID); task != nil {
task.setTaskPod(pod)
}
}

// treat terminated pods like a remove
if utils.IsPodTerminated(pod) {
ctx.notifyTaskComplete(appID, taskID)
log.Log(log.ShimContext).Debug("Request to update terminated pod, removing from cache", zap.String("podName", pod.Name))
ctx.schedulerCache.RemovePod(pod)
return
Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,10 @@ func TestUpdatePod(t *testing.T) {
context.UpdatePod(pod1, pod3)
pod = context.schedulerCache.GetPod(uid1)
assert.Check(t, pod == nil, "pod still found after termination")
app := context.getApplication(appID1)
// ensure that an updated pod is updated inside the Task
task := app.GetTask(uid1)
assert.Assert(t, task.GetTaskPod() == pod3, "task pod has not been updated")

// ensure a non-terminated pod is updated
context.UpdatePod(pod1, pod2)
Expand Down
16 changes: 9 additions & 7 deletions pkg/cache/placeholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou

// prepare the resource lists
requests := GetPlaceholderResourceRequests(taskGroup.MinResource)
var zeroSeconds int64 = 0
placeholderPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: placeholderName,
Expand Down Expand Up @@ -113,13 +114,14 @@ func newPlaceholder(placeholderName string, app *Application, taskGroup TaskGrou
},
},
},
RestartPolicy: constants.PlaceholderPodRestartPolicy,
SchedulerName: constants.SchedulerName,
NodeSelector: taskGroup.NodeSelector,
Tolerations: taskGroup.Tolerations,
Affinity: taskGroup.Affinity,
TopologySpreadConstraints: taskGroup.TopologySpreadConstraints,
PriorityClassName: priorityClassName,
RestartPolicy: constants.PlaceholderPodRestartPolicy,
SchedulerName: constants.SchedulerName,
NodeSelector: taskGroup.NodeSelector,
Tolerations: taskGroup.Tolerations,
Affinity: taskGroup.Affinity,
TopologySpreadConstraints: taskGroup.TopologySpreadConstraints,
PriorityClassName: priorityClassName,
TerminationGracePeriodSeconds: &zeroSeconds,
},
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (task *Task) getNodeName() string {
}

func (task *Task) DeleteTaskPod() error {
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.pod)
return task.context.apiProvider.GetAPIs().KubeClient.Delete(task.GetTaskPod())
}

func (task *Task) UpdateTaskPodStatus(pod *v1.Pod) (*v1.Pod, error) {
Expand Down Expand Up @@ -544,9 +544,11 @@ func (task *Task) releaseAllocation() {
// this reduces the scheduling overhead by blocking such
// request away from the core scheduler.
func (task *Task) sanityCheckBeforeScheduling() error {
task.lock.RLock()
// Check PVCs used by the pod
namespace := task.pod.Namespace
manifest := &(task.pod.Spec)
task.lock.RUnlock()
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
Expand Down Expand Up @@ -599,3 +601,9 @@ func (task *Task) failWithEvent(errorMessage, actionReason string) {
events.GetRecorder().Eventf(task.pod.DeepCopy(),
nil, v1.EventTypeWarning, actionReason, actionReason, errorMessage)
}

func (task *Task) setTaskPod(pod *v1.Pod) {
task.lock.Lock()
defer task.lock.Unlock()
task.pod = pod
}
6 changes: 3 additions & 3 deletions pkg/client/apifactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package client

import (
"fmt"
"errors"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *APIFactory) AddEventHandler(handlers *ResourceEventHandlers) error {

log.Log(log.ShimClient).Info("registering event handler", zap.Stringer("type", handlers.Type))
if err := s.addEventHandlers(handlers.Type, h, 0); err != nil {
return fmt.Errorf("failed to initialize event handlers: %w", err)
return errors.Join(errors.New("failed to initialize event handlers: "), err)
}
return nil
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *APIFactory) addEventHandlers(
}

if err != nil {
return fmt.Errorf("failed to add event handlers: %w", err)
return errors.Join(errors.New("failed to add event handlers: "), err)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/predicates/predicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (p *predicateManagerImpl) runPreFilterPlugins(ctx context.Context, state *f
zap.String("pluginName", plugin),
zap.String("pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)),
zap.Error(err))
return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", plugin, err)), plugin, skip
return framework.AsStatus(errors.Join(fmt.Errorf("running PreFilter plugin %q: ", plugin), err)), plugin, skip
}
// Merge is nil safe and returns a new PreFilterResult result if mergedNodes was nil
mergedNodes = mergedNodes.Merge(nodes)
Expand Down
Loading

0 comments on commit 70c749f

Please sign in to comment.