Skip to content

Commit

Permalink
refactor affinity to use same startegy as flux operator (#59)
Browse files Browse the repository at this point in the history
* refactor affinity to use same startegy as flux operator

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored Sep 12, 2023
1 parent 1a4fb50 commit e78597d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/jobs/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (m *LauncherWorker) AddWorkers(
) (*jobset.ReplicatedJob, error) {

numWorkers := spec.Spec.Pods - 1
workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter)
workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter, m.SoleTenancy)
if err != nil {
return workers, err
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (m *LauncherWorker) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replicate
m.ensureDefaultNames()

// Generate a replicated job for the launcher (LauncherWorker) and workers
launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter)
launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter, m.SoleTenancy)
if err != nil {
return js, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (m StorageGeneric) Description() string {
return m.Summary
}

// By default assume storage does not have sole tenancy
func (m StorageGeneric) HasSoleTenancy() bool {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func GetApplicationReplicatedJobs(
m := (*metric)

// This defaults to one replicated job, named "m", no custom replicated job name, and sole tenancy false
job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "")
job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy())
if err != nil {
return rjs, err
}
Expand Down
63 changes: 58 additions & 5 deletions pkg/metrics/jobset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
soleTenancyValue = "sole-tenancy"
)

const podLabelAppName = "app.kubernetes.io/name"

// GetJobSet is called by the controller to return some JobSet based
// on the type: application, storage, or standalone
func GetJobSet(
Expand All @@ -45,7 +47,7 @@ func GetJobSet(
successJobs := getSuccessJobs(set.Metrics())

// A base JobSet can hold one or more replicated jobs
js := getBaseJobSet(spec, successJobs, set.HasSoleTenancy())
js := getBaseJobSet(spec, successJobs)

// Get one or more replicated jobs, depending on the type
rjs, err := set.ReplicatedJobs(spec)
Expand Down Expand Up @@ -80,7 +82,7 @@ func getSuccessJobs(metrics []*Metric) []string {
}

// getBaseJobSet shared for either an application or isolated jobset
func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *jobset.JobSet {
func getBaseJobSet(set *api.MetricSet, successSet []string) *jobset.JobSet {

// When suspend is true we have a hard time debugging jobs, so keep false
suspend := false
Expand Down Expand Up @@ -112,9 +114,6 @@ func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *j
}

// Do we want to assign 1 node: 1 pod? We can use Pod Anti-affinity for that
if soleTenancy {
js.ObjectMeta.Annotations = map[string]string{jobset.ExclusiveKey: "kubernetes.io/hostname"}
}
return &js
}

Expand All @@ -125,6 +124,7 @@ func GetReplicatedJob(
pods int32,
completions int32,
jobname string,
soleTenancy bool,
) (*jobset.ReplicatedJob, error) {

// Default replicated job name, if not set
Expand Down Expand Up @@ -183,6 +183,11 @@ func GetReplicatedJob(
},
}

// Do we want sole tenancy?
if soleTenancy {
jobspec.Template.Spec.Affinity = getAffinity(set)
}

// Do we have a pull secret for the application image?
if set.Spec.Application.PullSecret != "" {
jobspec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
Expand All @@ -193,3 +198,51 @@ func GetReplicatedJob(
job.Template.Spec = jobspec
return &job, nil
}

// getAffinity returns to pod affinity to ensure 1 address / node
func getAffinity(set *api.MetricSet) *corev1.Affinity {
return &corev1.Affinity{
// Prefer to schedule pods on the same zone
PodAffinity: &corev1.PodAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
// added in getPodLabels
Key: podLabelAppName,
Operator: metav1.LabelSelectorOpIn,
Values: []string{set.Name},
},
},
},
TopologyKey: "topology.kubernetes.io/zone",
},
},
},
},
// Prefer to schedule pods on different nodes
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
// added in getPodLabels
Key: podLabelAppName,
Operator: metav1.LabelSelectorOpIn,
Values: []string{set.Name},
},
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
}
}
1 change: 0 additions & 1 deletion pkg/metrics/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type MetricSet interface {
Metrics() []*Metric
EntrypointScripts(*api.MetricSet) []EntrypointScript
ReplicatedJobs(*api.MetricSet) ([]jobset.ReplicatedJob, error)
HasSoleTenancy() bool
}

// get an application default entrypoint, if not determined by metric
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (m *StorageMetricSet) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replica

// Storage metrics do not need to share the process namespace
// The jobname empty string will use the default, no custom replicated job name, and sole tenancy false
job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "")
job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy())
if err != nil {
return rjs, err
}
Expand Down

0 comments on commit e78597d

Please sign in to comment.