diff --git a/pkg/jobs/launcher.go b/pkg/jobs/launcher.go index 5eaddeb..f18fc2c 100644 --- a/pkg/jobs/launcher.go +++ b/pkg/jobs/launcher.go @@ -188,7 +188,7 @@ func (m *LauncherWorker) AddWorkers( ) (*jobset.ReplicatedJob, error) { numWorkers := spec.Spec.Pods - 1 - workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter) + workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter, m.SoleTenancy) if err != nil { return workers, err } @@ -221,7 +221,7 @@ func (m *LauncherWorker) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replicate m.ensureDefaultNames() // Generate a replicated job for the launcher (LauncherWorker) and workers - launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter) + launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter, m.SoleTenancy) if err != nil { return js, err } diff --git a/pkg/jobs/storage.go b/pkg/jobs/storage.go index 289ba90..674a505 100644 --- a/pkg/jobs/storage.go +++ b/pkg/jobs/storage.go @@ -47,6 +47,7 @@ func (m StorageGeneric) Description() string { return m.Summary } +// By default assume storage does not have sole tenancy func (m StorageGeneric) HasSoleTenancy() bool { return false } diff --git a/pkg/metrics/application.go b/pkg/metrics/application.go index 6400b2a..d0a4853 100644 --- a/pkg/metrics/application.go +++ b/pkg/metrics/application.go @@ -56,7 +56,7 @@ func GetApplicationReplicatedJobs( m := (*metric) // This defaults to one replicated job, named "m", no custom replicated job name, and sole tenancy false - job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "") + job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy()) if err != nil { return rjs, err } diff --git a/pkg/metrics/jobset.go b/pkg/metrics/jobset.go index 56e63db..f7a0768 100644 --- a/pkg/metrics/jobset.go +++ b/pkg/metrics/jobset.go @@ -28,6 +28,8 @@ var ( soleTenancyValue = "sole-tenancy" ) +const podLabelAppName = "app.kubernetes.io/name" + // GetJobSet is called by the controller to return some JobSet based // on the type: application, storage, or standalone func GetJobSet( @@ -45,7 +47,7 @@ func GetJobSet( successJobs := getSuccessJobs(set.Metrics()) // A base JobSet can hold one or more replicated jobs - js := getBaseJobSet(spec, successJobs, set.HasSoleTenancy()) + js := getBaseJobSet(spec, successJobs) // Get one or more replicated jobs, depending on the type rjs, err := set.ReplicatedJobs(spec) @@ -80,7 +82,7 @@ func getSuccessJobs(metrics []*Metric) []string { } // getBaseJobSet shared for either an application or isolated jobset -func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *jobset.JobSet { +func getBaseJobSet(set *api.MetricSet, successSet []string) *jobset.JobSet { // When suspend is true we have a hard time debugging jobs, so keep false suspend := false @@ -112,9 +114,6 @@ func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *j } // Do we want to assign 1 node: 1 pod? We can use Pod Anti-affinity for that - if soleTenancy { - js.ObjectMeta.Annotations = map[string]string{jobset.ExclusiveKey: "kubernetes.io/hostname"} - } return &js } @@ -125,6 +124,7 @@ func GetReplicatedJob( pods int32, completions int32, jobname string, + soleTenancy bool, ) (*jobset.ReplicatedJob, error) { // Default replicated job name, if not set @@ -183,6 +183,11 @@ func GetReplicatedJob( }, } + // Do we want sole tenancy? + if soleTenancy { + jobspec.Template.Spec.Affinity = getAffinity(set) + } + // Do we have a pull secret for the application image? if set.Spec.Application.PullSecret != "" { jobspec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{ @@ -193,3 +198,51 @@ func GetReplicatedJob( job.Template.Spec = jobspec return &job, nil } + +// getAffinity returns to pod affinity to ensure 1 address / node +func getAffinity(set *api.MetricSet) *corev1.Affinity { + return &corev1.Affinity{ + // Prefer to schedule pods on the same zone + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + // added in getPodLabels + Key: podLabelAppName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{set.Name}, + }, + }, + }, + TopologyKey: "topology.kubernetes.io/zone", + }, + }, + }, + }, + // Prefer to schedule pods on different nodes + PodAntiAffinity: &corev1.PodAntiAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + // added in getPodLabels + Key: podLabelAppName, + Operator: metav1.LabelSelectorOpIn, + Values: []string{set.Name}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + } +} diff --git a/pkg/metrics/metricset.go b/pkg/metrics/metricset.go index d9f4a17..7912fd1 100644 --- a/pkg/metrics/metricset.go +++ b/pkg/metrics/metricset.go @@ -48,7 +48,6 @@ type MetricSet interface { Metrics() []*Metric EntrypointScripts(*api.MetricSet) []EntrypointScript ReplicatedJobs(*api.MetricSet) ([]jobset.ReplicatedJob, error) - HasSoleTenancy() bool } // get an application default entrypoint, if not determined by metric diff --git a/pkg/metrics/storage.go b/pkg/metrics/storage.go index 86200f9..dd1c9e3 100644 --- a/pkg/metrics/storage.go +++ b/pkg/metrics/storage.go @@ -22,7 +22,7 @@ func (m *StorageMetricSet) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replica // Storage metrics do not need to share the process namespace // The jobname empty string will use the default, no custom replicated job name, and sole tenancy false - job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "") + job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy()) if err != nil { return rjs, err }