Skip to content

Commit

Permalink
mysql: fix rebuild node with small amounts of data failed
Browse files Browse the repository at this point in the history
If the time between kill mysql and start mysql is less than
ping-timeout times admit-ping-count, rebuildme will failed.
  • Loading branch information
dbkernel committed Dec 24, 2020
1 parent 9611545 commit 61f83eb
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 25 deletions.
19 changes: 19 additions & 0 deletions src/cli/callx/callx.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,25 @@ func KillMysqldRPC(node string) error {
return nil
}

func SetMysqlStateRPC(node string, state model.MysqlState) error {
cli, cleanup, err := GetClient(node)
if err != nil {
return err
}
defer cleanup()

method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = state
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
err = cli.Call(method, req, rsp)
if err != nil {
return err
}

return nil
}

func WaitMysqldShutdownRPC(node string) error {
cli, cleanup, err := GetClient(node)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/cli/cmd/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func mysqlRebuildMeCommandFn(cmd *cobra.Command, args []string) {
// wait
err = callx.WaitMysqldShutdownRPC(self)
ErrorOK(err)

// set the mysql state to dead, avoid failure to rebuild node with small amounts of data
err = callx.SetMysqlStateRPC(self, model.MysqlDead)
ErrorOK(err)
}

// 7. check bestone is not in BACKUPING again
Expand Down
35 changes: 35 additions & 0 deletions src/model/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package model
const (
RPCMysqlStatus = "MysqlRPC.Status"
RPCMysqlGTIDSubtract = "MysqlRPC.GTIDSubtract"
RPCMysqlSetState = "MysqlRPC.SetState"
RPCMysqlSetGlobalSysVar = "MysqlRPC.SetGlobalSysVar"
RPCMysqlCreateUserWithPrivileges = "UserRPC.CreateUserWithPrivileges"
RPCMysqlCreateNormalUser = "UserRPC.CreateNormalUser"
Expand All @@ -25,6 +26,18 @@ const (
RPCMysqlIsWorking = "MysqlRPC.IsWorking"
)

type (
// State enum.
MysqlState string
)

const (
// MysqlAlive enum.
MysqlAlive MysqlState = "ALIVE"
// MysqlDead enum.
MysqlDead MysqlState = "DEAD"
)

// GTID info
type GTID struct {
// Mysql master log file which the slave is reading
Expand Down Expand Up @@ -182,6 +195,28 @@ func NewMysqlGTIDSubtractRPCResponse(code string) *MysqlGTIDSubtractRPCResponse
return &MysqlGTIDSubtractRPCResponse{RetCode: code}
}

type MysqlSetStateRPCRequest struct {
// The IP of this request
From string

// The new state
State MysqlState
}

type MysqlSetStateRPCResponse struct {
// Return code to rpc client:
// OK or other errors
RetCode string
}

func NewMysqlSetStateRPCRequest() *MysqlSetStateRPCRequest {
return &MysqlSetStateRPCRequest{}
}

func NewMysqlSetStateRPCResponse(code string) *MysqlSetStateRPCResponse {
return &MysqlSetStateRPCResponse{RetCode: code}
}

// user
type MysqlUserRPCRequest struct {
// The IP of this request
Expand Down
6 changes: 3 additions & 3 deletions src/mysql/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *Mysql) PingStop() {
// the slaves Slave_IO_Running is false, because it's in connecting state
func (m *Mysql) Promotable() bool {
log := m.log
promotable := (m.GetState() == MysqlAlive)
promotable := (m.GetState() == model.MysqlAlive)
if promotable {
gtid, err := m.GetGTID()
if err != nil {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (m *Mysql) WaitUntilAfterGTID(targetGTID string) error {
}

// GetState returns the mysql state.
func (m *Mysql) GetState() State {
func (m *Mysql) GetState() model.MysqlState {
return m.getState()
}

Expand Down Expand Up @@ -315,7 +315,7 @@ func (m *Mysql) WaitMysqlWorks(timeout int) error {
go func() {
for {
m.Ping()
if m.GetState() == MysqlAlive {
if m.GetState() == model.MysqlAlive {
errChannel <- nil
break
}
Expand Down
5 changes: 3 additions & 2 deletions src/mysql/attr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ package mysql

import (
"fmt"
"model"
)

func (m *Mysql) setState(state State) {
func (m *Mysql) setState(state model.MysqlState) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.state = state
}

func (m *Mysql) getState() State {
func (m *Mysql) getState() model.MysqlState {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.state
Expand Down
18 changes: 5 additions & 13 deletions src/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,11 @@ import (
)

type (
// State enum.
State string

// Option enum.
Option string
)

const (
// MysqlAlive enum.
MysqlAlive State = "ALIVE"
// MysqlDead enum.
MysqlDead State = "DEAD"

// MysqlReadonly enum.
MysqlReadonly Option = "READONLY"
// MysqlReadwrite enum.
Expand All @@ -49,7 +41,7 @@ type Mysql struct {
db *sql.DB
conf *config.MysqlConfig
log *xlog.Log
state State
state model.MysqlState
option Option
mutex sync.RWMutex
dbmutex sync.RWMutex
Expand All @@ -66,7 +58,7 @@ func NewMysql(conf *config.MysqlConfig, queryTimeout int, log *xlog.Log) *Mysql
db: nil,
log: log,
conf: conf,
state: MysqlDead,
state: model.MysqlDead,
mysqlHandler: getHandler(conf.Version),
pingTicker: common.NormalTicker(conf.PingTimeout),
}
Expand All @@ -92,7 +84,7 @@ func (m *Mysql) Ping() {
log.Error("mysql[%v].ping.getdb.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits)
if m.downs > downsLimits {
log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits)
m.setState(MysqlDead)
m.setState(model.MysqlDead)
}
m.IncMysqlDowns()
m.downs++
Expand All @@ -103,7 +95,7 @@ func (m *Mysql) Ping() {
log.Error("mysql[%v].ping.error[%v].downs:%v,downslimits:%v", m.getConnStr(), err, m.downs, downsLimits)
if m.downs > downsLimits {
log.Error("mysql.dead.downs:%v,downslimits:%v", m.downs, downsLimits)
m.setState(MysqlDead)
m.setState(model.MysqlDead)
}
m.IncMysqlDowns()
m.downs++
Expand All @@ -121,7 +113,7 @@ func (m *Mysql) Ping() {

// reset downs.
m.downs = 0
m.setState(MysqlAlive)
m.setState(model.MysqlAlive)
m.pingEntry = *pe
}

Expand Down
7 changes: 4 additions & 3 deletions src/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package mysql

import (
"config"
"model"
"testing"
"time"
"xbase/common"
Expand All @@ -28,7 +29,7 @@ func TestMysql(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlAlive
want := model.MysqlAlive
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand All @@ -42,7 +43,7 @@ func TestStateDead(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlDead
want := model.MysqlDead
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand All @@ -56,7 +57,7 @@ func TestCreateReplUser(t *testing.T) {

time.Sleep(time.Duration(config.DefaultMysqlConfig().PingTimeout*2) * time.Millisecond)
got := mysql.GetState()
want := MysqlAlive
want := model.MysqlAlive
assert.Equal(t, want, got)
mysql.PingStop()
}
Expand Down
9 changes: 8 additions & 1 deletion src/mysql/rpc_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *MysqlRPC) StartSlave(req *model.MysqlRPCRequest, rsp *model.MysqlRPCRes

// IsWorking used to check the mysql works or not.
func (m *MysqlRPC) IsWorking(req *model.MysqlRPCRequest, rsp *model.MysqlRPCResponse) error {
if m.mysql.GetState() == MysqlAlive {
if m.mysql.GetState() == model.MysqlAlive {
rsp.RetCode = model.OK
} else {
rsp.RetCode = model.ErrorMySQLDown
Expand Down Expand Up @@ -118,3 +118,10 @@ func (m *MysqlRPC) GTIDSubtract(req *model.MysqlGTIDSubtractRPCRequest, rsp *mod
}
return nil
}

// SetState used to set the mysql state.
func (m *MysqlRPC) SetState(req *model.MysqlSetStateRPCRequest, rsp *model.MysqlSetStateRPCResponse) error {
rsp.RetCode = model.OK
m.mysql.setState(req.State)
return nil
}
41 changes: 40 additions & 1 deletion src/mysql/rpc_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,53 @@ func TestMysqlRPCStatus(t *testing.T) {
}
want := model.NewMysqlStatusRPCResponse(model.OK)
want.GTID = GTID
want.Status = string(MysqlDead)
want.Status = string(model.MysqlDead)
want.Stats = &model.MysqlStats{}

got := rsp
assert.Equal(t, want, got)
}
}

func TestMysqlRPCSetState(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
port := common.RandomPort(8000, 9000)
id, _, cleanup := MockMysql(log, port, NewMockGTIDB())
defer cleanup()

// MysqlDead
{
method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = model.MysqlDead
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
c, cleanup := MockGetClient(t, id)
defer cleanup()
err := c.Call(method, req, rsp)
assert.Nil(t, err)

want := model.NewMysqlSetStateRPCResponse(model.OK)
got := rsp
assert.Equal(t, want, got)
}

// MysqlAlive
{
method := model.RPCMysqlSetState
req := model.NewMysqlSetStateRPCRequest()
req.State = model.MysqlAlive
rsp := model.NewMysqlSetStateRPCResponse(model.OK)
c, cleanup := MockGetClient(t, id)
defer cleanup()
err := c.Call(method, req, rsp)
assert.Nil(t, err)

want := model.NewMysqlSetStateRPCResponse(model.OK)
got := rsp
assert.Equal(t, want, got)
}
}

func TestMysqlRPCSetSysVar(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
port := common.RandomPort(8000, 9000)
Expand Down
3 changes: 1 addition & 2 deletions src/raft/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package raft

import (
"model"
"mysql"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -287,7 +286,7 @@ func (r *Leader) processRequestVoteRequest(req *model.RaftRPCRequest) *model.Raf
// broadcast hearbeat requests to other peers of the cluster
func (r *Leader) sendHeartbeat(mysqlDown *bool, c chan *model.RaftRPCResponse) {
// check MySQL down
if r.mysql.GetState() == mysql.MysqlDead {
if r.mysql.GetState() == model.MysqlDead {
*mysqlDown = true
return
}
Expand Down

0 comments on commit 61f83eb

Please sign in to comment.