Skip to content

Commit

Permalink
*: add mysql8.0.19~33 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
acekingke committed Aug 11, 2023
1 parent 511a4a0 commit a64d2af
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 24 deletions.
26 changes: 23 additions & 3 deletions api/v1alpha1/mysqlcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"

"github.com/blang/semver"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (r *MysqlCluster) ValidateCreate() error {
return err
}

if err := r.validateMysqlVersionAndImage(); err != nil {
if err := r.validateMysqlVersionAndImage(r); err != nil {
return err
}

Expand Down Expand Up @@ -96,7 +97,7 @@ func (r *MysqlCluster) ValidateUpdate(old runtime.Object) error {
return err
}

if err := r.validateMysqlVersionAndImage(); err != nil {
if err := r.validateMysqlVersionAndImage(oldCluster); err != nil {
return err
}

Expand Down Expand Up @@ -198,11 +199,30 @@ func (r *MysqlCluster) validateLowTableCase(oldCluster *MysqlCluster) error {
}

// Validate MysqlVersion and spec.MysqlOpts.image are conflict.
func (r *MysqlCluster) validateMysqlVersionAndImage() error {
func (r *MysqlCluster) validateMysqlVersionAndImage(oldCluster *MysqlCluster) error {
if r.Spec.MysqlOpts.Image != "" && r.Spec.MysqlVersion != "" {
if !strings.Contains(r.Spec.MysqlOpts.Image, r.Spec.MysqlVersion) {
return apierrors.NewForbidden(schema.GroupResource{}, "", fmt.Errorf("spec.MysqlOpts.Image and spec.MysqlVersion are conflict"))
}
// little version upgrade
if r.Spec.MysqlOpts.Image != oldCluster.Spec.MysqlOpts.Image {
// just to greater, cannot be less.
arr := strings.Split(r.Spec.MysqlOpts.Image, ":")
vernew := arr[len(arr)-1]
arr = strings.Split(oldCluster.Spec.MysqlOpts.Image, ":")
verold := arr[len(arr)-1]
mysqlSemVernew, _ := semver.Parse(vernew)
mysqlSemVerOld, _ := semver.Parse(verold)
if mysqlSemVernew.Major < mysqlSemVerOld.Major ||
(mysqlSemVernew.Major == mysqlSemVerOld.Major && mysqlSemVernew.Minor < mysqlSemVerOld.Minor) ||
(mysqlSemVernew.Major == mysqlSemVerOld.Major && mysqlSemVernew.Minor == mysqlSemVerOld.Minor &&
mysqlSemVernew.Patch < mysqlSemVerOld.Patch) {
return apierrors.NewForbidden(schema.GroupResource{}, "", fmt.Errorf("spec.MysqlOpts.Image cannot downgrade"))
}
if mysqlSemVerOld.Major == 5 && mysqlSemVernew.Major == 8 {
return apierrors.NewForbidden(schema.GroupResource{}, "", fmt.Errorf("spec.MysqlOpts.Image do not support upgrade from 5 to 8"))
}
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestValidateUpdate(t *testing.T) {
},
},
}
err := mysqlcluster.validateMysqlVersionAndImage()
err := mysqlcluster.validateMysqlVersionAndImage(mysqlcluster)
assert.Error(t, err)
}
{
Expand All @@ -151,7 +151,7 @@ func TestValidateUpdate(t *testing.T) {
},
},
}
err := mysqlcluster.validateMysqlVersionAndImage()
err := mysqlcluster.validateMysqlVersionAndImage(mysqlcluster)
assert.NoError(t, err)
}
}
28 changes: 21 additions & 7 deletions cmd/xenon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
)

const (
leaderStopCommand = "kill -9 $(pidof mysqld)"
leaderStopCommand = "kill -9 $(cat %s)"
mysqlUser = "root"
mysqlHost = "127.0.0.1"
mysqlPwd = ""
Expand Down Expand Up @@ -126,7 +126,7 @@ func leaderStop() error {
return fmt.Errorf("failed to get the connection of local MySQL: %s", err.Error())
}
defer conn.Close()

pidfile := "/var/run/mysqld/mysqld.pid"
if isReadonly(conn) {
log.Info("I am readonly, skip the leader stop")
os.Exit(0)
Expand Down Expand Up @@ -193,13 +193,18 @@ func leaderStop() error {
ch <- err
}
log.Info("flushed binary logs:", stmt)
pidfile, err = getPidFile(conn)
if err != nil {
ch <- err
}
}()
select {
case err := <-ch:
return err
case <-time.After(5 * time.Second):
log.Info("timeout")
if err := killMysqld(); err != nil {

if err := killMysqld(pidfile); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -427,6 +432,15 @@ func isReadonly(db *sql.DB) bool {
return readOnly == 1
}

func getPidFile(db *sql.DB) (string, error) {
pidfile := ""
err := db.QueryRow("SELECT @@pid_file").Scan(&pidfile)
if err != nil {
return "", err
}
return pidfile, err
}

// Returns true if all GTIDs in mySet are also in leaderSet. Returns false otherwise.
// https://dev.mysql.com/doc/refman/5.7/en/gtid-functions.html#function_gtid-subset
func HaveErrantTransactions(db *sql.DB, leaderSet, mySet string) (bool, error) {
Expand Down Expand Up @@ -566,7 +580,7 @@ func patchPodLabel(n MySQLNode, patch string) error {
return nil
}

func killMysqld() error {
func killMysqld(pidfile string) error {
config, err := NewConfig()
if err != nil {
panic(err)
Expand All @@ -580,9 +594,9 @@ func killMysqld() error {
Namespace: ns,
Container: "mysql",
}

killMySQLCommand := []string{leaderStopCommand}
log.Infof("killing mysql command: %s", leaderStopCommand)
comstr := fmt.Sprintf(leaderStopCommand, pidfile)
killMySQLCommand := []string{comstr}
log.Infof("killing mysql command: %s", comstr)
var output, stderr string
output, stderr, err = RunRemoteCommand(k, cfg, killMySQLCommand)
log.Info("output=[" + output + "]")
Expand Down
1 change: 1 addition & 0 deletions mysqlcluster/container/init_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (c *initMysql) getImage() string {
func (c *initMysql) getCommand() []string {
// Because initialize mysql contain error, so do it in commands.
return []string{"bash", "-c", "/docker-entrypoint.sh mysqld;" +
"if test -f /docker-entrypoint-initdb.d/upgrade.sh; then /docker-entrypoint-initdb.d/upgrade.sh;fi;" +
"if test -f /docker-entrypoint-initdb.d/clone.sh; then /docker-entrypoint-initdb.d/clone.sh;fi;" +
"if test -f /docker-entrypoint-initdb.d/plugin.sh; then /docker-entrypoint-initdb.d/plugin.sh; fi "}
}
Expand Down
2 changes: 1 addition & 1 deletion mysqlcluster/container/init_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestGetInitMysqlImage(t *testing.T) {
}

func TestGetInitMysqlCommand(t *testing.T) {
assert.Equal(t, initMysqlCase.Command, []string{"bash", "-c", "/docker-entrypoint.sh mysqld;if test -f /docker-entrypoint-initdb.d/clone.sh; then /docker-entrypoint-initdb.d/clone.sh;fi;if test -f /docker-entrypoint-initdb.d/plugin.sh; then /docker-entrypoint-initdb.d/plugin.sh; fi "})
assert.Equal(t, initMysqlCase.Command, []string{"bash", "-c", "/docker-entrypoint.sh mysqld;if test -f /docker-entrypoint-initdb.d/upgrade.sh; then /docker-entrypoint-initdb.d/upgrade.sh;fi;if test -f /docker-entrypoint-initdb.d/clone.sh; then /docker-entrypoint-initdb.d/clone.sh;fi;if test -f /docker-entrypoint-initdb.d/plugin.sh; then /docker-entrypoint-initdb.d/plugin.sh; fi "})
}

func TestGetInitMysqlEnvVar(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion mysqlcluster/container/init_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar {
Value: "1",
})
}

if c.NeedUpgrade {
envs = append(envs, corev1.EnvVar{
Name: "NEED_UPGRADE",
Value: "1",
})
}
return envs
}

Expand Down
2 changes: 2 additions & 0 deletions mysqlcluster/mysqlcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
type MysqlCluster struct {
*apiv1alpha1.MysqlCluster
log logr.Logger
// add flag to indict need upgrade
NeedUpgrade bool
}

// New returns a pointer to MysqlCluster.
Expand Down
11 changes: 10 additions & 1 deletion mysqlcluster/mysqlcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ var (
}
testCluster = MysqlCluster{
&mysqlCluster, logf.Log.WithName("mysqlcluster"),
false,
}
)

func TestNew(t *testing.T) {
want := &MysqlCluster{
&mysqlCluster, logf.Log.WithName("mysqlcluster"),
false,
}
assert.Equal(t, want, New(&mysqlCluster))
}
Expand Down Expand Up @@ -403,7 +405,7 @@ func TestEnsureVolumeClaimTemplates(t *testing.T) {
},
}
testCase := MysqlCluster{
&testMysql, logf.Log.WithName("mysqlcluster"),
&testMysql, logf.Log.WithName("mysqlcluster"), false,
}
want := []corev1.PersistentVolumeClaim{
{
Expand Down Expand Up @@ -449,6 +451,7 @@ func TestEnsureVolumeClaimTemplates(t *testing.T) {
testMysql.Spec.Persistence.StorageClass = &storageClass
testCase := MysqlCluster{
&testMysql, logf.Log.WithName("mysqlcluster"),
testCluster.NeedUpgrade,
}
guard := gomonkey.ApplyFunc(controllerutil.SetControllerReference, func(_ metav1.Object, _ metav1.Object, _ *runtime.Scheme) error {
return nil
Expand All @@ -467,6 +470,7 @@ func TestEnsureVolumeClaimTemplates(t *testing.T) {
testMysql.Spec.Persistence.Size = "10Gi"
testCase := MysqlCluster{
&testMysql, logf.Log.WithName("mysqlcluster"),
false,
}
guard := gomonkey.ApplyFunc(controllerutil.SetControllerReference, func(_ metav1.Object, _ metav1.Object, _ *runtime.Scheme) error {
return fmt.Errorf("test")
Expand Down Expand Up @@ -537,6 +541,7 @@ func TestEnsureMysqlConf(t *testing.T) {
testMysqlCase := testMysql
testCase := MysqlCluster{
&testMysqlCase, logf.Log.WithName("mysqlcluster"),
false,
}
testCase.EnsureMysqlConf()
wantSize = strconv.FormatUint(uint64(0.45*float64(gb)), 10)
Expand All @@ -555,6 +560,7 @@ func TestEnsureMysqlConf(t *testing.T) {
testMysqlCase.Spec.MysqlOpts.MysqlConf["innodb_buffer_pool_size"] = strconv.FormatUint(uint64(600*mb), 10)
testCase := MysqlCluster{
&testMysqlCase, logf.Log.WithName("mysqlcluster"),
false,
}
testCase.EnsureMysqlConf()
wantSize := strconv.FormatUint(uint64(600*float64(mb)), 10)
Expand All @@ -575,6 +581,7 @@ func TestEnsureMysqlConf(t *testing.T) {
testMysqlCase.Spec.MysqlOpts.MysqlConf["innodb_buffer_pool_size"] = strconv.FormatUint(uint64(1.7*float64(gb)), 10)
testCase := MysqlCluster{
&testMysqlCase, logf.Log.WithName("mysqlcluster"),
false,
}
testCase.EnsureMysqlConf()
wantSize := strconv.FormatUint(uint64(1.6*float64(gb)), 10)
Expand All @@ -594,6 +601,7 @@ func TestEnsureMysqlConf(t *testing.T) {
testMysqlCase.Spec.MysqlOpts.MysqlConf["innodb_buffer_pool_size"] = strconv.FormatUint(uint64(1.7*float64(gb)), 10)
testCase := MysqlCluster{
&testMysqlCase, logf.Log.WithName("mysqlcluster"),
false,
}
testCase.EnsureMysqlConf()
wantSize := strconv.FormatUint(uint64(1.2*float64(gb)), 10)
Expand All @@ -615,6 +623,7 @@ func TestEnsureMysqlConf(t *testing.T) {
testMysqlCase.Spec.MysqlOpts.Resources.Requests["memory"] = *memoryCase
testCase := MysqlCluster{
&testMysqlCase, logf.Log.WithName("mysqlcluster"),
false,
}
testCase.EnsureMysqlConf()
wantSize := strconv.FormatUint(uint64(2*float64(gb)), 10)
Expand Down
26 changes: 24 additions & 2 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ func (s *StatefulSetSyncer) doExpandPVCs(ctx context.Context) error {
// for example:
// Pod0 has created successful,but Pod1 is creating. then change PVC from 20Gi to 30Gi .
// Pod0's PVC need to expand, but Pod1's PVC has created as 30Gi, so need to skip it.
if equality.Semantic.DeepEqual(currentPVC.Status.Capacity, item.Spec.Resources.Requests) {
// Notice: bug: greater is also good
// see `newStorage.Cmp(*oldRequest.Storage())`
if currentPVC.Status.Capacity.Storage().Cmp(*item.Spec.Resources.Requests.Storage()) == 1 ||
equality.Semantic.DeepEqual(currentPVC.Status.Capacity, item.Spec.Resources.Requests) {
return true, nil
}
return false, nil
Expand Down Expand Up @@ -276,13 +279,23 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
}
// Deep copy the old statefulset from StatefulSetSyncer.
existing := s.sfs.DeepCopy()

// Sync data from mysqlcluster.spec to statefulset.
if err = s.mutate(); err != nil {
return controllerutil.OperationResultNone, err
}

// Check if statefulset changed.
if !s.sfsUpdated(existing) {
if s.podsAllUpdated(ctx) {
if s.sfs.Labels != nil && s.sfs.Status.ReadyReplicas == s.sfs.Status.Replicas {
s.sfs.Labels = nil
// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}
}

return controllerutil.OperationResultNone, nil
} else {
if err := s.updatePod(ctx); err != nil {
Expand Down Expand Up @@ -371,6 +384,11 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {

// mutate set the statefulset.
func (s *StatefulSetSyncer) mutate() error {
if s.sfs.Spec.Template.Spec.Containers != nil && s.sfs.Spec.Template.Spec.Containers[0].Image != s.Spec.MysqlOpts.Image {
s.sfs.Labels = map[string]string{
"needUpdate": "true",
}
}
s.sfs.Spec.ServiceName = s.GetNameForResource(utils.StatefulSet)
s.sfs.Spec.Replicas = s.Spec.Replicas
s.sfs.Spec.Selector = metav1.SetAsLabelSelector(s.GetSelectorLabels())
Expand All @@ -395,7 +413,11 @@ func (s *StatefulSetSyncer) mutate() error {
}
s.sfs.Spec.Template.ObjectMeta.Annotations["config_rev"] = s.cmRev
s.sfs.Spec.Template.ObjectMeta.Annotations["secret_rev"] = s.sctRev

if s.sfs.Labels["needUpdate"] == "true" {
s.NeedUpgrade = true
} else {
s.NeedUpgrade = false
}
err := mergo.Merge(&s.sfs.Spec.Template.Spec, s.ensurePodSpec(), mergo.WithTransformers(transformers.PodSpec))
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions sidecar/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type Config struct {
XCloudS3AccessKey string
XCloudS3SecretKey string
XCloudS3Bucket string

// Need Upgrade
NeedUpgrade bool
}

// NewInitConfig returns a pointer to Config.
Expand All @@ -150,6 +153,10 @@ func NewInitConfig() *Config {
if len(getEnvValue("INIT_TOKUDB")) > 0 {
initTokuDB = true
}
needUpgrade := false
if len(getEnvValue("NEED_UPGRADE")) > 0 {
needUpgrade = true
}

admitDefeatHearbeatCount, err := strconv.ParseInt(getEnvValue("ADMIT_DEFEAT_HEARBEAT_COUNT"), 10, 32)
if err != nil {
Expand Down Expand Up @@ -205,6 +212,8 @@ func NewInitConfig() *Config {
ClusterName: getEnvValue("CLUSTER_NAME"),
CloneFlag: false,
GtidPurged: "",
// need upgrade
NeedUpgrade: needUpgrade,
}
}

Expand Down
Loading

0 comments on commit a64d2af

Please sign in to comment.