Skip to content

Commit

Permalink
Merge pull request volcano-sh#2809 from CharlesQQ/leader-elect
Browse files Browse the repository at this point in the history
fix(app):  Introduced flag to specify leader election behavious
  • Loading branch information
volcano-sh-bot authored Apr 1, 2024
2 parents 3dec7ad + 0640aa6 commit 1c8d95f
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 98 deletions.
36 changes: 18 additions & 18 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"os"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/component-base/config"
componentbaseconfigvalidation "k8s.io/component-base/config/validation"

"volcano.sh/volcano/pkg/kube"
)
Expand All @@ -38,16 +41,18 @@ const (

// ServerOption is the main context object for the controllers.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
EnableLeaderElection bool
LockObjectNamespace string
PrintVersion bool
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
// leaderElection defines the configuration of leader election.
LeaderElection config.LeaderElectionConfiguration
// Deprecated: use ResourceNamespace instead.
LockObjectNamespace string
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
Expand Down Expand Up @@ -84,9 +89,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+
"after server cert).")
fs.StringVar(&s.KeyFile, "tls-private-key-file", s.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", true, "Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated vc-controller-manager for high availability; it is enabled by default")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object; it is volcano-system by default")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object; it is volcano-system by default, will deprecated, please use --leader-elect-resource-namespace instead.")
fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
Expand All @@ -100,12 +103,9 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
}

// CheckOptionOrDie checks the LockObjectNamespace.
// CheckOptionOrDie check leader election flag when LeaderElection is enabled.
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}
return nil
return componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate()
}

// readCAFiles read data from ca file path
Expand Down
27 changes: 24 additions & 3 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,35 @@ package options
import (
"reflect"
"testing"
"time"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options"

"volcano.sh/volcano/pkg/kube"
commonutil "volcano.sh/volcano/pkg/util"
)

func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s := NewServerOption()

commonutil.LeaderElectionDefault(&s.LeaderElection)
s.LeaderElection.ResourceName = "vc-controller-manager"
componentbaseoptions.BindLeaderElectionFlags(&s.LeaderElection, fs)
s.AddFlags(fs)

args := []string{
"--master=127.0.0.1",
"--kube-api-burst=200",
"--scheduler-name=volcano",
"--scheduler-name=volcano2",
"--leader-elect-lease-duration=60s",
"--leader-elect-renew-deadline=20s",
"--leader-elect-retry-period=10s",
}
fs.Parse(args)

Expand All @@ -52,9 +65,17 @@ func TestAddFlags(t *testing.T) {
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
InheritOwnerAnnotations: true,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{60 * time.Second},
RenewDeadline: metav1.Duration{20 * time.Second},
RetryPeriod: metav1.Duration{10 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: defaultLockObjectNamespace,
ResourceName: "vc-controller-manager",
},
LockObjectNamespace: defaultLockObjectNamespace,
WorkerThreadsForPG: 5,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
28 changes: 12 additions & 16 deletions cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"os"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
Expand All @@ -42,12 +41,6 @@ import (
"volcano.sh/volcano/pkg/signals"
)

const (
leaseDuration = 15 * time.Second
renewDeadline = 10 * time.Second
retryPeriod = 5 * time.Second
)

// Run the controller.
func Run(opt *options.ServerOption) error {
config, err := kube.BuildConfig(opt.KubeClientOptions)
Expand All @@ -65,7 +58,7 @@ func Run(opt *options.ServerOption) error {

ctx := signals.SetupSignalContext()

if !opt.EnableLeaderElection {
if !opt.LeaderElection.LeaderElect {
run(ctx)
return fmt.Errorf("finished without leader elect")
}
Expand All @@ -77,7 +70,7 @@ func Run(opt *options.ServerOption) error {

// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LockObjectNamespace)})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: leaderElectionClient.CoreV1().Events(opt.LeaderElection.ResourceNamespace)})
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

hostname, err := os.Hostname()
Expand All @@ -86,10 +79,13 @@ func Run(opt *options.ServerOption) error {
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := hostname + "_" + string(uuid.NewUUID())

rl, err := resourcelock.New(resourcelock.LeasesResourceLock,
opt.LockObjectNamespace,
"vc-controller-manager",
// set ResourceNamespace value to LockObjectNamespace when it's not empty,compatible with old flag
if len(opt.LockObjectNamespace) > 0 {
opt.LeaderElection.ResourceNamespace = opt.LockObjectNamespace
}
rl, err := resourcelock.New(opt.LeaderElection.ResourceLock,
opt.LeaderElection.ResourceNamespace,
opt.LeaderElection.ResourceName,
leaderElectionClient.CoreV1(),
leaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Expand All @@ -102,9 +98,9 @@ func Run(opt *options.ServerOption) error {

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
LeaseDuration: opt.LeaderElection.LeaseDuration.Duration,
RenewDeadline: opt.LeaderElection.RenewDeadline.Duration,
RetryPeriod: opt.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
Expand Down
15 changes: 10 additions & 5 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (

"github.com/spf13/pflag"
_ "go.uber.org/automaxprocs"

"k8s.io/apimachinery/pkg/util/wait"
cliflag "k8s.io/component-base/cli/flag"
componentbaseoptions "k8s.io/component-base/config/options"
"k8s.io/klog/v2"

"volcano.sh/volcano/cmd/controller-manager/app"
"volcano.sh/volcano/cmd/controller-manager/app/options"
_ "volcano.sh/volcano/pkg/controllers/garbagecollector"
_ "volcano.sh/volcano/pkg/controllers/job"
_ "volcano.sh/volcano/pkg/controllers/jobflow"
_ "volcano.sh/volcano/pkg/controllers/jobtemplate"
_ "volcano.sh/volcano/pkg/controllers/podgroup"
_ "volcano.sh/volcano/pkg/controllers/queue"

"volcano.sh/volcano/cmd/controller-manager/app"
"volcano.sh/volcano/cmd/controller-manager/app/options"
commonutil "volcano.sh/volcano/pkg/util"
"volcano.sh/volcano/pkg/version"
)

Expand All @@ -46,8 +46,13 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
klog.InitFlags(nil)

fs := pflag.CommandLine
s := options.NewServerOption()
s.AddFlags(pflag.CommandLine)

s.AddFlags(fs)
commonutil.LeaderElectionDefault(&s.LeaderElection)
s.LeaderElection.ResourceName = "vc-controller-manager"
componentbaseoptions.BindLeaderElectionFlags(&s.LeaderElection, fs)

cliflag.InitFlags()

Expand Down
54 changes: 26 additions & 28 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"time"

"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/component-base/config"
componentbaseconfigvalidation "k8s.io/component-base/config/validation"

"volcano.sh/volcano/pkg/kube"
)
Expand All @@ -47,24 +50,26 @@ const (

// ServerOption is the main context object for the controller manager.
type ServerOption struct {
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
SchedulerNames []string
SchedulerConf string
SchedulePeriod time.Duration
EnableLeaderElection bool
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
EnableMetrics bool
ListenAddress string
EnablePriorityClass bool
EnableCSIStorage bool
KubeClientOptions kube.ClientOptions
CertFile string
KeyFile string
CaCertFile string
CertData []byte
KeyData []byte
CaCertData []byte
SchedulerNames []string
SchedulerConf string
SchedulePeriod time.Duration
// leaderElection defines the configuration of leader election.
LeaderElection config.LeaderElectionConfiguration
// Deprecated: use ResourceNamespace instead.
LockObjectNamespace string
DefaultQueue string
PrintVersion bool
EnableMetrics bool
ListenAddress string
EnablePriorityClass bool
EnableCSIStorage bool
// vc-scheduler will load (not activate) custom plugins which are in this directory
PluginsDir string
EnableHealthz bool
Expand Down Expand Up @@ -107,16 +112,13 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+
"after server cert).")
fs.StringVar(&s.KeyFile, "tls-private-key-file", s.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object; it is volcano-system by default, will deprecated, please use --leader-elect-resource-namespace instead.")
// volcano scheduler will ignore pods with scheduler names other than specified with the option
fs.StringArrayVar(&s.SchedulerNames, "scheduler-name", []string{defaultSchedulerName}, "vc-scheduler will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file")
fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle")
fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job")
fs.BoolVar(&s.EnableLeaderElection, "leader-elect", true,
"Start a leader election client and gain leadership before "+
"executing the main loop. Enable this when running replicated vc-scheduler for high availability; it is enabled by default")
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", defaultLockObjectNamespace, "Define the namespace of the lock object that is used for leader election; it is volcano-system by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnablePriorityClass, "priority-class", true,
Expand Down Expand Up @@ -145,13 +147,9 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&s.IgnoredCSIProvisioners, "ignored-provisioners", nil, "The provisioners that will be ignored during pod pvc request computation and preemption.")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled.
// CheckOptionOrDie check leader election flag when LeaderElection is enabled.
func (s *ServerOption) CheckOptionOrDie() error {
if s.EnableLeaderElection && s.LockObjectNamespace == "" {
return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled")
}

return nil
return componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate()
}

// RegisterOptions registers options.
Expand Down
25 changes: 21 additions & 4 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,47 @@ import (
"time"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"
componentbaseoptions "k8s.io/component-base/config/options"

"volcano.sh/volcano/pkg/kube"
commonutil "volcano.sh/volcano/pkg/util"
)

func TestAddFlags(t *testing.T) {
fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError)
s := NewServerOption()
commonutil.LeaderElectionDefault(&s.LeaderElection)
componentbaseoptions.BindLeaderElectionFlags(&s.LeaderElection, fs)
s.AddFlags(fs)

args := []string{
"--schedule-period=5m",
"--priority-class=false",
"--cache-dumper=false",
"--leader-elect-lease-duration=60s",
"--leader-elect-renew-deadline=20s",
"--leader-elect-retry-period=10s",
}
fs.Parse(args)

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
SchedulerNames: []string{defaultSchedulerName},
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{60 * time.Second},
RenewDeadline: metav1.Duration{20 * time.Second},
RetryPeriod: metav1.Duration{10 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceNamespace: defaultLockObjectNamespace,
},
LockObjectNamespace: defaultLockObjectNamespace,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeClientOptions: kube.ClientOptions{
Master: "",
KubeConfig: "",
Expand All @@ -55,8 +74,6 @@ func TestAddFlags(t *testing.T) {
MinNodesToFind: defaultMinNodesToFind,
MinPercentageOfNodesToFind: defaultMinPercentageOfNodesToFind,
PercentageOfNodesToFind: defaultPercentageOfNodesToFind,
EnableLeaderElection: true,
LockObjectNamespace: defaultLockObjectNamespace,
NodeWorkerThreads: defaultNodeWorkers,
CacheDumpFileDir: "/tmp",
}
Expand Down
Loading

0 comments on commit 1c8d95f

Please sign in to comment.