Skip to content

Commit

Permalink
Simplify the Affinity struct exposed to service operators
Browse files Browse the repository at this point in the history
This patch introduces some more wrapping around corev1.PodAffinitySpec
to simplify what is exposed in the service operators. In particular,
it is possible to consume the AffinityOverrides struct that can
implement either affinity or antiaffinity rules. They uses the same
structs behind the scenes (PodAffinityTerm and WeightedPodAffinityTerm),
for both RequiredDuringSchedulingIgnoredDuringExecution and
PreferredDuringSchedulingIgnoredDuringExecution, but they have a
different semantic when included in the corev1.Affinity k8s object.
The old behavior of distributing Pods is currently preserved for
operators where we do not want to provide this interface.

Signed-off-by: Francesco Pantano <[email protected]>
  • Loading branch information
fmount committed Dec 4, 2024
1 parent 1e202d7 commit 3c061cf
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 59 deletions.
180 changes: 139 additions & 41 deletions modules/common/affinity/affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
)
Expand All @@ -31,49 +30,51 @@ func DistributePods(
selectorKey string,
selectorValues []string,
topologyKey string,
overrides *AffinityOverrideSpec,
) *corev1.Affinity {
defaultAffinity := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
// This rule ensures that two replicas of the same selector
// should not run if possible on the same worker node
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
{
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: selectorKey,
Operator: metav1.LabelSelectorOpIn,
Values: selectorValues,
},
},
},
// usually corev1.LabelHostname "kubernetes.io/hostname"
// https://github.com/kubernetes/api/blob/master/core/v1/well_known_labels.go#L20
TopologyKey: topologyKey,
},
Weight: 100,
},
},
overrides *Overrides,
) (*corev1.Affinity, error) {
// By default apply an anti-affinity policy using corev1.LabelHostname as
// preferred scheduling policy: this maintains backward compatibility with
// an already deployed environment
defaultAffinity := DefaultAffinity(
Rules{
SelectorKey: selectorKey,
SelectorValues: selectorValues,
TopologyKey: topologyKey,
Weight: DefaultPreferredWeight,
},
)
if overrides == nil || (overrides.Affinity == nil && overrides.AntiAffinity == nil) {
return defaultAffinity, nil
}
// patch the default affinity Object with the data passed as input
if overrides != nil {
patchedAffinity, _ := toCoreAffinity(defaultAffinity, overrides)
return patchedAffinity

affinityPatch := corev1.Affinity{}
if overrides.Affinity != nil {
affinityPatch = NewAffinity(overrides.Affinity)
}
return defaultAffinity

antiAffinityPatch := corev1.Affinity{}
if overrides.AntiAffinity != nil {
antiAffinityPatch = NewAntiAffinity(overrides.AntiAffinity)
}

overridesSpec := &OverrideSpec{
PodAffinity: affinityPatch.PodAffinity,
PodAntiAffinity: antiAffinityPatch.PodAntiAffinity,
}

// patch the default affinity Object with the data passed as input
patchedAffinity, err := toCoreAffinity(defaultAffinity, overridesSpec)
return patchedAffinity, err
}

// toCoreAffinity -
func toCoreAffinity(
affinity *v1.Affinity,
override *AffinityOverrideSpec,
) (*v1.Affinity, error) {

aff := &v1.Affinity{
affinity *corev1.Affinity,
override *OverrideSpec,
) (*corev1.Affinity, error) {
aff := &corev1.Affinity{
PodAntiAffinity: affinity.PodAntiAffinity,
PodAffinity: affinity.PodAffinity,
PodAffinity: affinity.PodAffinity,
}
if override != nil {
if override != nil {
Expand All @@ -85,13 +86,11 @@ func toCoreAffinity(
if err != nil {
return aff, fmt.Errorf("error marshalling Affinity Spec: %w", err)
}

patchedJSON, err := strategicpatch.StrategicMergePatch(origAffinit, patch, v1.Affinity{})
patchedJSON, err := strategicpatch.StrategicMergePatch(origAffinit, patch, corev1.Affinity{})
if err != nil {
return aff, fmt.Errorf("error patching Affinity Spec: %w", err)
}

patchedSpec := v1.Affinity{}
patchedSpec := corev1.Affinity{}
err = json.Unmarshal(patchedJSON, &patchedSpec)
if err != nil {
return aff, fmt.Errorf("error unmarshalling patched Service Spec: %w", err)
Expand All @@ -101,3 +100,102 @@ func toCoreAffinity(
}
return aff, nil
}

// WeightedPodAffinityTerm - returns a WeightedPodAffinityTerm that is assigned
// to the Affinity or AntiAffinity rule
func (affinity *Rules) WeightedPodAffinityTerm() []corev1.WeightedPodAffinityTerm {
if affinity == nil {
return []corev1.WeightedPodAffinityTerm{}
}
affinityTerm := []corev1.WeightedPodAffinityTerm{
{
Weight: affinity.Weight,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: affinity.SelectorKey,
Operator: metav1.LabelSelectorOpIn,
Values: affinity.SelectorValues,
},
},
},
TopologyKey: affinity.TopologyKey,
},
},
}
return affinityTerm
}

// PodAffinityTerm -
func (affinity *Rules) PodAffinityTerm() []corev1.PodAffinityTerm {
if affinity == nil {
return []corev1.PodAffinityTerm{}
}
affinityTerm := []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: affinity.SelectorKey,
Operator: metav1.LabelSelectorOpIn,
Values: affinity.SelectorValues,
},
},
},
TopologyKey: affinity.TopologyKey,
},
}
return affinityTerm
}

// NewAffinity -
func NewAffinity(p *PodScheduling) corev1.Affinity {
aff := &corev1.Affinity{
PodAffinity: &corev1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: p.RequiredScheduling.PodAffinityTerm(),
PreferredDuringSchedulingIgnoredDuringExecution: p.PreferredScheduling.WeightedPodAffinityTerm(),
},
}
return *aff
}

// NewAntiAffinity -
func NewAntiAffinity(p *PodScheduling) corev1.Affinity {
aff := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: p.RequiredScheduling.PodAffinityTerm(),
PreferredDuringSchedulingIgnoredDuringExecution: p.PreferredScheduling.WeightedPodAffinityTerm(),
},
}
return *aff
}

// DefaultAffinity -
func DefaultAffinity(aff Rules) *corev1.Affinity {
return &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
// This rule ensures that two replicas of the same selector
// should not run if possible on the same worker node
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
{
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: aff.SelectorKey,
Operator: metav1.LabelSelectorOpIn,
Values: aff.SelectorValues,
},
},
},
// usually corev1.LabelHostname "kubernetes.io/hostname"
// https://github.com/kubernetes/api/blob/master/core/v1/well_known_labels.go#L20
TopologyKey: aff.TopologyKey,
},
Weight: aff.Weight,
},
},
},
}
}
2 changes: 1 addition & 1 deletion modules/common/affinity/affinity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestDistributePods(t *testing.T) {
t.Run("Default pod distribution", func(t *testing.T) {
g := NewWithT(t)

d := DistributePods("ThisSelector", []string{"selectorValue1", "selectorValue2"}, "ThisTopologyKey")
d, _ := DistributePods("ThisSelector", []string{"selectorValue1", "selectorValue2"}, "ThisTopologyKey", nil)

g.Expect(d).To(BeEquivalentTo(affinityObj))
})
Expand Down
40 changes: 34 additions & 6 deletions modules/common/affinity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,45 @@ import (
corev1 "k8s.io/api/core/v1"
)

// OverrideSpec - service override configuration for the Affinity propagated to the Pods
// Allows for the manifest of the created StatefulSet to be overwritten with custom Pod affinity configuration.
type OverrideSpec struct {
Spec *AffinityOverrideSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
}
const (
// DefaultPreferredWeight -
DefaultPreferredWeight = 100
)

type AffinityOverrideSpec struct {
// OverrideSpec -
type OverrideSpec struct {
// Describes pod affinity scheduling rules (e.g. co-locate this pod in the same node, zone, etc. as some other pod(s)).
// +optional
PodAffinity *corev1.PodAffinity `json:"podAffinity,omitempty" protobuf:"bytes,2,opt,name=podAffinity"`
// Describes pod anti-affinity scheduling rules (e.g. avoid putting this pod in the same node, zone, etc. as some other pod(s)).
// +optional
PodAntiAffinity *corev1.PodAntiAffinity `json:"podAntiAffinity,omitempty" protobuf:"bytes,3,opt,name=podAntiAffinity"`
}

// Rules -
// +kubebuilder:object:generate:=true
type Rules struct {
// +kubebuilder:validation:Optional
SelectorKey string `json:"selectorKey,omitempty" protobuf:"bytes,2,opt,name=selectorKey"`
// +kubebuilder:validation:Optional
SelectorValues []string `json:"selectorValues,omitempty" protobuf:"bytes,2,opt,name=selectorValues"`
// https://github.com/kubernetes/api/blob/master/core/v1/well_known_labels.go#L20
// +kubebuilder:validation:Optional
TopologyKey string `json:"topologyKey,omitempty" protobuf:"bytes,2,opt,name=topologyKey"`
// +kubebuilder:validation:Optional
Weight int32 `json:"weight,omitempty" protobuf:"bytes,2,opt,name=weight"`
}

// PodScheduling -
// +kubebuilder:object:generate:=true
type PodScheduling struct {
RequiredScheduling *Rules `json:"required,omitempty" protobuf:"bytes,2,opt,name=required"`
PreferredScheduling *Rules `json:"preferred,omitempty" protobuf:"bytes,2,opt,name=referred"`
}

// Overrides -
// +kubebuilder:object:generate:=true
type Overrides struct {
Affinity *PodScheduling `json:"affinity,omitempty"`
AntiAffinity *PodScheduling `json:"antiAffinity,omitempty"`
}
72 changes: 61 additions & 11 deletions modules/common/affinity/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3c061cf

Please sign in to comment.