Skip to content

Commit

Permalink
Extended status check for reconciliation (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
lllamnyp authored Jun 25, 2024
2 parents 468c477 + f81648b commit c1bd6db
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 32 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/etcdcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type EtcdClusterSpec struct {
const (
EtcdConditionInitialized = "Initialized"
EtcdConditionReady = "Ready"
EtcdConditionError = "Error"
)

type EtcdCondType string
Expand All @@ -66,6 +67,7 @@ const (
EtcdCondTypeWaitingForFirstQuorum EtcdCondType = "WaitingForFirstQuorum"
EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady"
EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady"
EtcdCondTypeSplitbrain EtcdCondType = "Splitbrain"
)

const (
Expand All @@ -74,6 +76,7 @@ const (
EtcdReadyCondNegMessage EtcdCondMessage = "Cluster StatefulSet is not Ready"
EtcdReadyCondPosMessage EtcdCondMessage = "Cluster StatefulSet is Ready"
EtcdReadyCondNegWaitingForQuorum EtcdCondMessage = "Waiting for first quorum to be established"
EtcdErrorCondSplitbrainMessage EtcdCondMessage = "Etcd endpoints reporting more than one unique cluster ID"
)

// EtcdClusterStatus defines the observed state of EtcdCluster
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
139 changes: 118 additions & 21 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/aenix-io/etcd-operator/internal/log"
Expand All @@ -47,6 +48,10 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
etcdDefaultTimeout = 5 * time.Second
)

// EtcdClusterReconciler reconciles a EtcdCluster object
type EtcdClusterReconciler struct {
client.Client
Expand All @@ -56,6 +61,7 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch
Expand All @@ -80,13 +86,68 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, nil
}

state := observables{}

// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}

// fetch STS if exists
err = r.Get(ctx, req.NamespacedName, &state.statefulSet)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
}
state.stsExists = state.statefulSet.UID != ""

// fetch endpoints
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
state.endpointsFound = clusterClient != nil && singleClients != nil

if !state.endpointsFound {
if !state.stsExists {
// TODO: happy path for new cluster creation
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
}
}

// get status of every endpoint and member list from every endpoint
state.etcdStatuses = make([]etcdStatus, len(singleClients))
{
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
for i := range singleClients {
wg.Add(1)
go func(i int) {
defer wg.Done()
state.etcdStatuses[i].fill(ctx, singleClients[i])
}(i)
}
wg.Wait()
cancel()
}
state.setClusterID()
if state.inSplitbrain() {
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
factory.SetCondition(instance, factory.NewCondition(etcdaenixiov1alpha1.EtcdConditionError).
WithStatus(true).
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain)).
WithMessage(string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage)).
Complete(),
)
return r.updateStatus(ctx, instance)
}
// fill conditions
if len(instance.Status.Conditions) == 0 {
factory.FillConditions(instance)
}

// ensure managed resources
if err = r.ensureClusterObjects(ctx, instance); err != nil {
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
}

Expand Down Expand Up @@ -138,8 +199,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {

if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
Expand All @@ -148,30 +209,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
}
log.Debug(ctx, "cluster state configmap reconciled")

if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile headless service failed")
return err
}
log.Debug(ctx, "headless service reconciled")

if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile client service failed")
return err
}
log.Debug(ctx, "client service reconciled")

if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile pdb failed")
return err
}
log.Debug(ctx, "pdb reconciled")

return nil
}

Expand Down Expand Up @@ -498,3 +541,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie

return nil
}

// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(concurrentOperations)
wrapWithMsg := func(err error, msg string) error {
if err != nil {
return fmt.Errorf(msg+": %w", err)
}
return nil
}
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
"couldn't ensure client service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)

for i := 0; i < concurrentOperations; i++ {
if err := <-c; err != nil {
cancel()

// let all goroutines select the ctx.Done() case to avoid races on closed channels
wg.Wait()
return err
}
}
return nil
}
63 changes: 63 additions & 0 deletions internal/controller/factory/etcd_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package factory

import (
"context"
"fmt"

"github.com/aenix-io/etcd-operator/api/v1alpha1"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) {
cfg, err := configFromCluster(ctx, cluster, cli)
if err != nil {
return nil, nil, err
}
if len(cfg.Endpoints) == 0 {
return nil, nil, nil
}
eps := cfg.Endpoints
clusterClient, err := clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err)
}
singleClients := make([]*clientv3.Client, len(eps))
for i, ep := range eps {
cfg.Endpoints = []string{ep}
singleClients[i], err = clientv3.New(cfg)
if err != nil {
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err)
}
}
return clusterClient, singleClients, nil
}

func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) {
ep := v1.Endpoints{}
err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
if client.IgnoreNotFound(err) != nil {
return clientv3.Config{}, err
}
if err != nil {
return clientv3.Config{Endpoints: []string{}}, nil
}

names := map[string]struct{}{}
urls := make([]string, 0, 8)
for _, v := range ep.Subsets {
for _, addr := range v.Addresses {
names[addr.Hostname] = struct{}{}
}
for _, addr := range v.NotReadyAddresses {
names[addr.Hostname] = struct{}{}
}
}
for name := range names {
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379"))
}

return clientv3.Config{Endpoints: urls}, nil
}
80 changes: 80 additions & 0 deletions internal/controller/observables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package controller

import (
"context"
"sync"

clientv3 "go.etcd.io/etcd/client/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// etcdStatus holds the details of the status that an etcd endpoint
// can return about itself, i.e. its own status and its perceived
// member list
type etcdStatus struct {
endpointStatus *clientv3.StatusResponse
endpointStatusError error
memberList *clientv3.MemberListResponse
memberListError error
}

// observables stores observations that the operator can make about
// states of objects in kubernetes
type observables struct {
statefulSet appsv1.StatefulSet
stsExists bool
endpointsFound bool
etcdStatuses []etcdStatus
clusterID uint64
_ int
_ []corev1.PersistentVolumeClaim
}

// setClusterID populates the clusterID field based on etcdStatuses
func (o *observables) setClusterID() {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].endpointStatus != nil {
o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId
return
}
}
}

// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses.
// If more than one unique ID is reported, cluster is in splitbrain.
func (o *observables) inSplitbrain() bool {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].endpointStatus != nil {
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId {
return true
}
}
}
return false
}

// fill takes a single-endpoint client and populates the fields of etcdStatus
// with the endpoint's status and its perceived member list.
func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0])
}()
s.memberList, s.memberListError = c.MemberList(ctx)
wg.Wait()
}

// TODO: make a real function
func (o *observables) _() int {
if o.etcdStatuses != nil {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].memberList != nil {
return len(o.etcdStatuses[i].memberList.Members)
}
}
}
return 0
}
Loading

0 comments on commit c1bd6db

Please sign in to comment.