Skip to content

Commit

Permalink
fix(cluster): add a check for the unfinished update (#539)
Browse files Browse the repository at this point in the history
* fix(cluster): add a check for the unfinished update

* fix(cluster): The lebel switching time is too long when updating the pods

add timeout for get sqlrunner.
  • Loading branch information
runkecheng authored Jun 29, 2022
1 parent 5be7c46 commit 9519fdb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 24 deletions.
9 changes: 9 additions & 0 deletions internal/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ func NewSQLRunner(cfg *Config, errs ...error) (SQLRunner, closeFunc, error) {
if err != nil {
return nil, close, err
}
db.SetConnMaxIdleTime(10 * time.Second)
db.SetConnMaxLifetime(1 * time.Minute)
if err := db.Ping(); err != nil {
internalLog.V(1).Info("failed to ping mysql", "error", err)
if cErr := db.Close(); cErr != nil {
internalLog.Error(cErr, "failed closing the database connection")
}
return nil, close, err
}

// Close connection function.
close = func() {
Expand Down
62 changes: 45 additions & 17 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/go-test/deep"
"github.com/iancoleman/strcase"
"github.com/imdario/mergo"
"github.com/presslabs/controller-util/mergo/transformers"
Expand All @@ -34,7 +33,9 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -277,17 +278,18 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
}
// Check if statefulset changed.
if !s.sfsUpdated(existing) {
return controllerutil.OperationResultNone, nil
}

// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}
// Need roll update.
if !equality.Semantic.DeepEqual(existing.Spec.Template, s.sfs.Spec.Template) {
s.log.Info("update statefulset pods", "name", s.Name, "diff", deep.Equal(existing.Spec.Template, s.sfs.Spec.Template))
// Update every pods of statefulset.
if s.podsAllUpdated(ctx) {
return controllerutil.OperationResultNone, nil
} else {
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}
}
} else {
// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}
Expand All @@ -302,7 +304,7 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
// updatePod update the pods, update follower nodes first.
// This can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
// updatedRevision will not update with the currentRevision when using `onDelete`.
// currentRevision will not update with the updatedRevision when using `onDelete`.
// https://github.com/kubernetes/kubernetes/pull/106059
if s.sfs.Status.UpdatedReplicas == *s.sfs.Spec.Replicas {
return nil
Expand Down Expand Up @@ -333,7 +335,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
var leaderPod corev1.Pod
for _, pod := range pods.Items {
// Check if the pod is healthy.
err := wait.PollImmediate(time.Second*2, time.Minute, func() (bool, error) {
err := wait.PollImmediate(time.Second*2, time.Second*30, func() (bool, error) {
s.cli.Get(ctx, client.ObjectKeyFromObject(&pod), &pod)
if pod.ObjectMeta.Labels["healthy"] == "yes" {
return true, nil
Expand Down Expand Up @@ -489,14 +491,17 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error {
}

func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error {
if s.sfs.Status.UpdateRevision == "" {
return fmt.Errorf("update revision is empty")
}
// Check version, if not latest, delete node.
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
s.log.Info("pod is already updated", "pod name", pod.Name)
} else {
s.Status.State = apiv1alpha1.ClusterUpdateState
s.log.Info("updating pod", "pod", pod.Name, "key", s.Unwrap())
s.log.Info("updating pod", "pod", pod.Name)
// Try to delete pod and wait for pod restart.
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
err := wait.PollImmediate(time.Second*5, time.Minute*2, func() (bool, error) {
if err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil {
return false, nil
}
Expand Down Expand Up @@ -585,7 +590,30 @@ func (s *StatefulSetSyncer) backupIsRunning(ctx context.Context) (bool, error) {

// Updates to statefulset spec for fields other than 'replicas', 'template', and 'updateStrategy' are forbidden.
func (s *StatefulSetSyncer) sfsUpdated(existing *appsv1.StatefulSet) bool {
return existing.Spec.Replicas != s.sfs.Spec.Replicas ||
return *existing.Spec.Replicas != *s.sfs.Spec.Replicas ||
!equality.Semantic.DeepEqual(existing.Spec.Template, s.sfs.Spec.Template) ||
existing.Spec.UpdateStrategy != s.sfs.Spec.UpdateStrategy
}

func (s *StatefulSetSyncer) podsAllUpdated(ctx context.Context) bool {
podlist := corev1.PodList{}
labelSelector := s.GetLabels().AsSelector()
// Find the pods that revision is old.
r, err := labels.NewRequirement("controller-revision-hash", selection.NotEquals, []string{s.sfs.Status.UpdateRevision})
if err != nil {
s.log.V(1).Info("failed to create label requirement", "error", err)
return false
}
labelSelector = labelSelector.Add(*r)
if err := s.cli.List(ctx,
&podlist,
&client.ListOptions{
Namespace: s.sfs.Namespace,
LabelSelector: labelSelector,
},
); err != nil {
s.log.V(1).Info("failed to list pods", "error", err)
return false
}
return len(podlist.Items) == 0
}
35 changes: 28 additions & 7 deletions mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (s *StatusSyncer) AutoRebuild(ctx context.Context, pod *corev1.Pod) error {

// updateNodeStatus update the node status.
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
closeCh := make(chan func())
for _, pod := range pods {
podName := pod.Name
host := fmt.Sprintf("%s.%s.%s", podName, s.GetNameForResource(utils.HeadlessSVC), s.Namespace)
Expand All @@ -245,13 +246,33 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
}

isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, host))
defer closeConn()
if err != nil {
s.log.V(1).Info("failed to connect the mysql", "node", node.Name, "error", err)
node.Message = err.Error()
} else {
var sqlRunner internal.SQLRunner
var closeConn func()
errCh := make(chan error)
go func(sqlRunner *internal.SQLRunner, errCh chan error, closeCh chan func()) {
var err error
*sqlRunner, closeConn, err = s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
s.cli, s.MysqlCluster.GetClusterKey(), utils.OperatorUser, host))
if err != nil {
s.log.V(1).Info("failed to get sql runner", "node", node.Name, "error", err)
errCh <- err
return
}
if closeConn != nil {
closeCh <- closeConn
return
}
errCh <- nil
}(&sqlRunner, errCh, closeCh)

var err error
select {
case <-errCh:
case closeConn := <-closeCh:
defer closeConn()
case <-time.After(time.Second * 5):
}
if sqlRunner != nil {
isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry)
if err != nil {
s.log.V(1).Info("failed to check slave status", "node", node.Name, "error", err)
Expand Down

0 comments on commit 9519fdb

Please sign in to comment.