Skip to content

Commit

Permalink
Merge pull request #1388 from rzetelskik/seeds-race-fix
Browse files Browse the repository at this point in the history
Fix a race condition in seed election algorithm
  • Loading branch information
scylla-operator-bot[bot] authored Sep 15, 2023
2 parents 135a7b8 + 704b6fa commit c392920
Show file tree
Hide file tree
Showing 17 changed files with 143 additions and 51 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/operator/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func (o *SidecarOptions) Run(streams genericclioptions.IOStreams, cmd *cobra.Com
return fmt.Errorf("can't wait for pod's ContainerID: %w", err)
}

member := identity.NewMemberFromObjects(service, pod)
member, err := identity.NewMemberFromObjects(service, pod)
if err != nil {
return fmt.Errorf("can't create new member from objects: %w", err)
}

labelSelector := labels.Set{
naming.OwnerUIDLabel: string(pod.UID),
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/tests/tests_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
gomegaformat "github.com/onsi/gomega/format"
"github.com/scylladb/scylla-operator/pkg/cmdutil"
"github.com/scylladb/scylla-operator/pkg/genericclioptions"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
"github.com/scylladb/scylla-operator/pkg/signals"
ginkgotest "github.com/scylladb/scylla-operator/pkg/test/ginkgo"
"github.com/scylladb/scylla-operator/pkg/thirdparty/github.com/onsi/ginkgo/v2/exposedinternal/parallel_support"
Expand Down Expand Up @@ -358,7 +358,7 @@ func (o *RunOptions) run(ctx context.Context, streams genericclioptions.IOStream
commonArgs = append(commonArgs, fmt.Sprintf("--%s=%v", cmdutil.FlagLogLevelKey, o.ParallelLogLevel))

// Propagate random seed to child processes.
if !helpers.Contains(commonArgs, func(arg string) bool {
if !slices.Contains(commonArgs, func(arg string) bool {
return strings.HasPrefix(arg, fmt.Sprintf("--%s", randomSeedFlagKey))
}) {
commonArgs = append(commonArgs, fmt.Sprintf("--%s=%v", randomSeedFlagKey, suiteConfig.RandomSeed))
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ func servicePorts(cluster *scyllav1.ScyllaCluster) []corev1.ServicePort {
// StatefulSetForRack make a StatefulSet for the rack.
// existingSts may be nil if it doesn't exist yet.
func StatefulSetForRack(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster, existingSts *appsv1.StatefulSet, sidecarImage string) (*appsv1.StatefulSet, error) {
matchLabels := naming.RackLabels(r, c)
rackLabels := naming.RackLabels(r, c)
rackLabels, err := naming.RackLabels(r, c)
if err != nil {
return nil, fmt.Errorf("can't get rack labels: %w", err)
}
matchLabels := helpers.ShallowCopyMap(rackLabels)
rackLabels[naming.ScyllaVersionLabel] = c.Spec.Version

placement := r.Placement
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func TestStatefulSetForRack(t *testing.T) {
"scylla/cluster": "basic",
"scylla/datacenter": "dc",
"scylla/rack": "rack",
"scylla/rack-ordinal": "0",
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/scylladbmonitoring/sync_grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
ocrypto "github.com/scylladb/scylla-operator/pkg/crypto"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
okubecrypto "github.com/scylladb/scylla-operator/pkg/kubecrypto"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
Expand Down Expand Up @@ -303,7 +304,7 @@ func (smc *Controller) syncGrafana(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredGrafanaSA),
slices.ToSlice(requiredGrafanaSA),
serviceAccounts,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.CoreV1().ServiceAccounts(sm.Namespace).Delete,
Expand Down Expand Up @@ -343,7 +344,7 @@ func (smc *Controller) syncGrafana(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredService),
slices.ToSlice(requiredService),
services,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.CoreV1().Services(sm.Namespace).Delete,
Expand All @@ -354,7 +355,7 @@ func (smc *Controller) syncGrafana(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredDeployment),
slices.ToSlice(requiredDeployment),
deployments,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.AppsV1().Deployments(sm.Namespace).Delete,
Expand All @@ -365,7 +366,7 @@ func (smc *Controller) syncGrafana(

err = controllerhelpers.Prune(
ctx,
helpers.FilterOutNil(helpers.ToArray(requiredIngress)),
slices.FilterOutNil(slices.ToSlice(requiredIngress)),
ingresses,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.NetworkingV1().Ingresses(sm.Namespace).Delete,
Expand Down
15 changes: 8 additions & 7 deletions pkg/controller/scylladbmonitoring/sync_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ocrypto "github.com/scylladb/scylla-operator/pkg/crypto"
monitoringv1 "github.com/scylladb/scylla-operator/pkg/externalapi/monitoring/v1"
"github.com/scylladb/scylla-operator/pkg/helpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
okubecrypto "github.com/scylladb/scylla-operator/pkg/kubecrypto"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
Expand Down Expand Up @@ -300,7 +301,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredPrometheusSA),
slices.ToSlice(requiredPrometheusSA),
serviceAccounts,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.CoreV1().ServiceAccounts(sm.Namespace).Delete,
Expand All @@ -311,7 +312,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredPrometheusService),
slices.ToSlice(requiredPrometheusService),
services,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.CoreV1().Services(sm.Namespace).Delete,
Expand All @@ -322,7 +323,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredPrometheusRoleBinding),
slices.ToSlice(requiredPrometheusRoleBinding),
roleBindings,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.RbacV1().RoleBindings(sm.Namespace).Delete,
Expand All @@ -333,7 +334,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredPrometheus),
slices.ToSlice(requiredPrometheus),
prometheuses,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.monitoringClient.Prometheuses(sm.Namespace).Delete,
Expand All @@ -344,7 +345,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.FilterOutNil(helpers.ToArray(requiredIngress)),
slices.FilterOutNil(slices.ToSlice(requiredIngress)),
ingresses,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.kubeClient.NetworkingV1().Ingresses(sm.Namespace).Delete,
Expand All @@ -355,7 +356,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredRecodingPrometheusRule, requiredAlertsPrometheusRule),
slices.ToSlice(requiredRecodingPrometheusRule, requiredAlertsPrometheusRule),
prometheusRules,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.monitoringClient.PrometheusRules(sm.Namespace).Delete,
Expand All @@ -366,7 +367,7 @@ func (smc *Controller) syncPrometheus(

err = controllerhelpers.Prune(
ctx,
helpers.ToArray(requiredScyllaDBServiceMonitor),
slices.ToSlice(requiredScyllaDBServiceMonitor),
serviceMonitors,
&controllerhelpers.PruneControlFuncs{
DeleteFunc: smc.monitoringClient.ServiceMonitors(sm.Namespace).Delete,
Expand Down
8 changes: 8 additions & 0 deletions pkg/helpers/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,11 @@ func GetMapValues[M ~map[K]V, K comparable, V any](m M) []V {
}
return res
}

func ShallowCopyMap[M ~map[K]V, K comparable, V any](m M) M {
res := make(M, len(m))
for k, v := range m {
res[k] = v
}
return res
}
20 changes: 13 additions & 7 deletions pkg/helpers/array.go → pkg/helpers/slices/slices.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package helpers
// Copyright (c) 2023 ScyllaDB

func ToArray[T any](objs ...T) []T {
package slices

func ToSlice[T any](objs ...T) []T {
res := make([]T, 0, len(objs))
return append(res, objs...)
}

func ConvertToArray[To, From any](convert func(From) To, objs ...From) []To {
func ConvertToSlice[To, From any](convert func(From) To, objs ...From) []To {
res := make([]To, 0, len(objs))

for i := range objs {
Expand All @@ -16,7 +18,7 @@ func ConvertToArray[To, From any](convert func(From) To, objs ...From) []To {
}

func ConvertSlice[To, From any](slice []From, convert func(From) To) []To {
return ConvertToArray(convert, slice...)
return ConvertToSlice(convert, slice...)
}

func Filter[T any](array []T, filterFunc func(T) bool) []T {
Expand Down Expand Up @@ -63,14 +65,18 @@ func ContainsItem[T comparable](slice []T, item T) bool {
return Contains(slice, IdentityFunc(item))
}

func Find[T comparable](slice []T, filterFunc func(T) bool) (T, bool) {
func Find[T any](slice []T, filterFunc func(T) bool) (T, int, bool) {
for i := range slice {
if filterFunc(slice[i]) {
return slice[i], true
return slice[i], i, true
}
}

return *new(T), false
return *new(T), 0, false
}

func FindItem[T comparable](slice []T, item T) (T, int, bool) {
return Find(slice, IdentityFunc(item))
}

func Flatten[T any](xs [][]T) []T {
Expand Down
1 change: 1 addition & 0 deletions pkg/naming/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
ClusterNameLabel = "scylla/cluster"
DatacenterNameLabel = "scylla/datacenter"
RackNameLabel = "scylla/rack"
RackOrdinalLabel = "scylla/rack-ordinal"
ScyllaVersionLabel = "scylla/scylla-version"
ScyllaServiceTypeLabel = "scylla-operator.scylladb.com/scylla-service-type"
ScyllaIngressTypeLabel = "scylla-operator.scylladb.com/scylla-ingress-type"
Expand Down
25 changes: 20 additions & 5 deletions pkg/naming/labels.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package naming

import (
"fmt"
"strconv"

scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
)
Expand All @@ -26,12 +30,19 @@ func DatacenterLabels(c *scyllav1.ScyllaCluster) map[string]string {

// RackLabels returns a map of label keys and values
// for the given Rack.
func RackLabels(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) map[string]string {
func RackLabels(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) (map[string]string, error) {
recLabels := ScyllaLabels()
rackLabels := DatacenterLabels(c)
rackLabels[RackNameLabel] = r.Name
_, rackOrdinal, ok := slices.Find(c.Spec.Datacenter.Racks, func(rack scyllav1.RackSpec) bool {
return rack.Name == r.Name
})
if !ok {
return nil, fmt.Errorf("can't find ordinal of rack %q in ScyllaCluster %q", r.Name, ObjRef(c))
}
rackLabels[RackOrdinalLabel] = strconv.Itoa(rackOrdinal)

return mergeLabels(rackLabels, recLabels)
return mergeLabels(rackLabels, recLabels), nil
}

// StatefulSetPodLabel returns a map of labels to uniquely
Expand All @@ -43,12 +54,16 @@ func StatefulSetPodLabel(name string) map[string]string {
}

// RackSelector returns a LabelSelector for the given rack.
func RackSelector(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) labels.Selector {
func RackSelector(r scyllav1.RackSpec, c *scyllav1.ScyllaCluster) (labels.Selector, error) {
rackLabels, err := RackLabels(r, c)
if err != nil {
return nil, fmt.Errorf("can't get rack labels: %w", err)
}

rackLabelsSet := labels.Set(RackLabels(r, c))
rackLabelsSet := labels.Set(rackLabels)
sel := labels.SelectorFromSet(rackLabelsSet)

return sel
return sel, nil
}

func ScyllaLabels() map[string]string {
Expand Down
26 changes: 24 additions & 2 deletions pkg/sidecar/identity/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"

"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/naming"
Expand All @@ -25,6 +26,7 @@ type Member struct {
// ClusterIP of the member's Service
StaticIP string
Rack string
RackOrdinal int
Datacenter string
Cluster string
ServiceLabels map[string]string
Expand All @@ -33,19 +35,29 @@ type Member struct {
Overprovisioned bool
}

func NewMemberFromObjects(service *corev1.Service, pod *corev1.Pod) *Member {
func NewMemberFromObjects(service *corev1.Service, pod *corev1.Pod) (*Member, error) {
rackOrdinalString, ok := pod.Labels[naming.RackOrdinalLabel]
if !ok {
return nil, fmt.Errorf("pod %q is missing %q label", naming.ObjRef(pod), naming.RackOrdinalLabel)
}
rackOrdinal, err := strconv.Atoi(rackOrdinalString)
if err != nil {
return nil, fmt.Errorf("can't get rack ordinal from label %q: %w", rackOrdinalString, err)
}

return &Member{
Namespace: service.Namespace,
Name: service.Name,
IP: pod.Status.PodIP,
StaticIP: service.Spec.ClusterIP,
Rack: pod.Labels[naming.RackNameLabel],
RackOrdinal: rackOrdinal,
Datacenter: pod.Labels[naming.DatacenterNameLabel],
Cluster: pod.Labels[naming.ClusterNameLabel],
ServiceLabels: service.Labels,
PodID: string(pod.UID),
Overprovisioned: pod.Status.QOSClass != corev1.PodQOSGuaranteed,
}
}, nil
}

func (m *Member) GetSeeds(ctx context.Context, coreClient v1.CoreV1Interface, externalSeeds []string) ([]string, error) {
Expand Down Expand Up @@ -73,6 +85,16 @@ func (m *Member) GetSeeds(ctx context.Context, coreClient v1.CoreV1Interface, ex

if len(otherPods) == 0 {
// We are the only one, assuming first bootstrap.

podOrdinal, err := naming.IndexFromName(m.Name)
if err != nil {
return nil, fmt.Errorf("can't get pod index from name: %w", err)
}

if m.RackOrdinal != 0 || podOrdinal != 0 {
return nil, fmt.Errorf("pod is not first in the cluster, but there are no other pods")
}

if len(externalSeeds) > 0 {
return externalSeeds, nil
}
Expand Down
Loading

0 comments on commit c392920

Please sign in to comment.