Skip to content

Commit

Permalink
Fix a race condition in seed election algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
rzetelskik committed Sep 15, 2023
1 parent 3975685 commit 704b6fa
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 24 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/operator/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (o *SidecarOptions) Run(streams genericclioptions.IOStreams, cmd *cobra.Com
return fmt.Errorf("can't wait for pod's ContainerID: %w", err)
}

member := identity.NewMemberFromObjects(service, pod)
member, err := identity.NewMemberFromObjects(service, pod)
if err != nil {
return fmt.Errorf("can't create new member from objects: %w", err)
}

labelSelector := labels.Set{
naming.OwnerUIDLabel: string(pod.UID),
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ func servicePorts(cluster *scyllav1.ScyllaCluster) []corev1.ServicePort {
// StatefulSetForRack make a StatefulSet for the rack.
// existingSts may be nil if it doesn't exist yet.
func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existingSts *appsv1.StatefulSet, sidecarImage string) (*appsv1.StatefulSet, error) {
matchLabels := naming.RackLabels(r, c)
rackLabels := naming.RackLabels(r, c)
rackLabels, err := naming.RackLabels(r, c)
if err != nil {
return nil, fmt.Errorf("can't get rack labels: %w", err)
}
matchLabels := helpers.ShallowCopyMap(rackLabels)
rackLabels[naming.ScyllaVersionLabel] = c.Spec.Version

placement := r.Placement
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func TestStatefulSetForRack(t *testing.T) {
"scylla/cluster": "basic",
"scylla/datacenter": "dc",
"scylla/rack": "rack",
"scylla/rack-ordinal": "0",
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/helpers/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,11 @@ func GetMapValues[M ~map[K]V, K comparable, V any](m M) []V {
}
return res
}

func ShallowCopyMap[M ~map[K]V, K comparable, V any](m M) M {
res := make(M, len(m))
for k, v := range m {
res[k] = v
}
return res
}
10 changes: 7 additions & 3 deletions pkg/helpers/slices/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ func ContainsItem[T comparable](slice []T, item T) bool {
return Contains(slice, IdentityFunc(item))
}

func Find[T comparable](slice []T, filterFunc func(T) bool) (T, bool) {
func Find[T any](slice []T, filterFunc func(T) bool) (T, int, bool) {
for i := range slice {
if filterFunc(slice[i]) {
return slice[i], true
return slice[i], i, true
}
}

return *new(T), false
return *new(T), 0, false
}

func FindItem[T comparable](slice []T, item T) (T, int, bool) {
return Find(slice, IdentityFunc(item))
}

func Flatten[T any](xs [][]T) []T {
Expand Down
1 change: 1 addition & 0 deletions pkg/naming/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
ClusterNameLabel = "scylla/cluster"
DatacenterNameLabel = "scylla/datacenter"
RackNameLabel = "scylla/rack"
RackOrdinalLabel = "scylla/rack-ordinal"
ScyllaVersionLabel = "scylla/scylla-version"
ScyllaServiceTypeLabel = "scylla-operator.scylladb.com/scylla-service-type"
ScyllaIngressTypeLabel = "scylla-operator.scylladb.com/scylla-ingress-type"
Expand Down
25 changes: 20 additions & 5 deletions pkg/naming/labels.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package naming

import (
"fmt"
"strconv"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
)
Expand All @@ -26,12 +30,19 @@ func DatacenterLabels(c *scyllav1.ScyllaCluster) map[string]string {

// RackLabels returns a map of label keys and values
// for the given Rack.
func RackLabels(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) map[string]string {
func RackLabels(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) (map[string]string, error) {
recLabels := ScyllaLabels()
rackLabels := DatacenterLabels(c)
rackLabels[RackNameLabel] = r.Name
_, rackOrdinal, ok := slices.Find(c.Spec.Datacenter.Racks, func(rack scyllav1.RackSpec) bool {
return rack.Name == r.Name
})
if !ok {
return nil, fmt.Errorf("can't find ordinal of rack %q in ScyllaCluster %q", r.Name, ObjRef(c))
}
rackLabels[RackOrdinalLabel] = strconv.Itoa(rackOrdinal)

return mergeLabels(rackLabels, recLabels)
return mergeLabels(rackLabels, recLabels), nil
}

// StatefulSetPodLabel returns a map of labels to uniquely
Expand All @@ -43,12 +54,16 @@ func StatefulSetPodLabel(name string) map[string]string {
}

// RackSelector returns a LabelSelector for the given rack.
func RackSelector(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) labels.Selector {
func RackSelector(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) (labels.Selector, error) {
rackLabels, err := RackLabels(r, c)
if err != nil {
return nil, fmt.Errorf("can't get rack labels: %w", err)
}

rackLabelsSet := labels.Set(RackLabels(r, c))
rackLabelsSet := labels.Set(rackLabels)
sel := labels.SelectorFromSet(rackLabelsSet)

return sel
return sel, nil
}

func ScyllaLabels() map[string]string {
Expand Down
26 changes: 24 additions & 2 deletions pkg/sidecar/identity/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"

"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/naming"
Expand All @@ -25,6 +26,7 @@ type Member struct {
// ClusterIP of the member's Service
StaticIP string
Rack string
RackOrdinal int
Datacenter string
Cluster string
ServiceLabels map[string]string
Expand All @@ -33,19 +35,29 @@ type Member struct {
Overprovisioned bool
}

func NewMemberFromObjects(service *corev1.Service, pod *corev1.Pod) *Member {
func NewMemberFromObjects(service *corev1.Service, pod *corev1.Pod) (*Member, error) {
rackOrdinalString, ok := pod.Labels[naming.RackOrdinalLabel]
if !ok {
return nil, fmt.Errorf("pod %q is missing %q label", naming.ObjRef(pod), naming.RackOrdinalLabel)
}
rackOrdinal, err := strconv.Atoi(rackOrdinalString)
if err != nil {
return nil, fmt.Errorf("can't get rack ordinal from label %q: %w", rackOrdinalString, err)
}

return &Member{
Namespace: service.Namespace,
Name: service.Name,
IP: pod.Status.PodIP,
StaticIP: service.Spec.ClusterIP,
Rack: pod.Labels[naming.RackNameLabel],
RackOrdinal: rackOrdinal,
Datacenter: pod.Labels[naming.DatacenterNameLabel],
Cluster: pod.Labels[naming.ClusterNameLabel],
ServiceLabels: service.Labels,
PodID: string(pod.UID),
Overprovisioned: pod.Status.QOSClass != corev1.PodQOSGuaranteed,
}
}, nil
}

func (m *Member) GetSeeds(ctx context.Context, coreClient v1.CoreV1Interface, externalSeeds []string) ([]string, error) {
Expand Down Expand Up @@ -73,6 +85,16 @@ func (m *Member) GetSeeds(ctx context.Context, coreClient v1.CoreV1Interface, ex

if len(otherPods) == 0 {
// We are the only one, assuming first bootstrap.

podOrdinal, err := naming.IndexFromName(m.Name)
if err != nil {
return nil, fmt.Errorf("can't get pod index from name: %w", err)
}

if m.RackOrdinal != 0 || podOrdinal != 0 {
return nil, fmt.Errorf("pod is not first in the cluster, but there are no other pods")
}

if len(externalSeeds) > 0 {
return externalSeeds, nil
}
Expand Down
50 changes: 39 additions & 11 deletions pkg/sidecar/identity/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ func TestMember_GetSeeds(t *testing.T) {
thirdPod, thirdService := createPodAndSvc("pod-2", "3.3.3.3", now.Add(2*time.Second))

ts := []struct {
name string
memberName string
memberIP string
objects []runtime.Object
externalSeeds []string
expectSeeds []string
expectError error
name string
memberName string
memberIP string
memberRackOrdinal int
objects []runtime.Object
externalSeeds []string
expectSeeds []string
expectError error
}{
{
name: "error when no pods are found",
Expand Down Expand Up @@ -118,6 +119,32 @@ func TestMember_GetSeeds(t *testing.T) {
externalSeeds: []string{"10.0.1.1", "10.0.1.2"},
expectSeeds: []string{"10.0.1.1", "10.0.1.2", secondService.Spec.ClusterIP},
},
{
name: "error when node is not first in rack, but there are no other pods",
memberName: secondPod.Name,
memberIP: secondService.Spec.ClusterIP,
memberRackOrdinal: 0,
objects: []runtime.Object{secondPod, secondService},
expectSeeds: nil,
expectError: fmt.Errorf("pod is not first in the cluster, but there are no other pods"),
},
{
name: "error when node is first in rack of ordinal > 0, but there are no other pods",
memberName: firstPod.Name,
memberIP: firstService.Spec.ClusterIP,
memberRackOrdinal: 1,
objects: []runtime.Object{firstPod, firstService},
expectSeeds: nil,
expectError: fmt.Errorf("pod is not first in the cluster, but there are no other pods"),
},
{
name: "bootstrap with other pod when node is first in rack of ordinal > 0 and there are other pods",
memberName: secondPod.Name,
memberIP: secondService.Spec.ClusterIP,
memberRackOrdinal: 1,
objects: []runtime.Object{firstPod, firstService, secondPod, secondService},
expectSeeds: []string{firstService.Spec.ClusterIP},
},
}

for i := range ts {
Expand All @@ -129,10 +156,11 @@ func TestMember_GetSeeds(t *testing.T) {
defer cancel()

member := Member{
Cluster: "my-cluster",
Namespace: "namespace",
Name: test.memberName,
StaticIP: test.memberIP,
Cluster: "my-cluster",
Namespace: "namespace",
Name: test.memberName,
RackOrdinal: test.memberRackOrdinal,
StaticIP: test.memberIP,
}

fakeClient := fake.NewSimpleClientset(test.objects...)
Expand Down

0 comments on commit 704b6fa

Please sign in to comment.