Skip to content

Commit

Permalink
restart only replicas ahead of the master (#113)
Browse files Browse the repository at this point in the history
* restart only replicas ahead of the master

* fix isSlavePermanentlyLost

---------

Co-authored-by: Aleksandr Shevchuk <[email protected]>
  • Loading branch information
teem0n and Aleksandr Shevchuk authored Jun 27, 2024
1 parent eff3563 commit a5a64da
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 49 deletions.
36 changes: 25 additions & 11 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt
continue
}
sgtids := gtids.ParseGtidSet(sstatus.ExecutedGtidSet)
if sstatus.ReplicationState != mysql.ReplicationRunning || isSplitBrained(sgtids, mgtids, muuid) {
if sstatus.ReplicationState != mysql.ReplicationRunning || gtids.IsSplitBrained(sgtids, mgtids, muuid) {
app.logger.Errorf("calc active nodes: %s is not replicating or splitbrained, deleting from active...", host)
continue
}
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node

// first, shrink HA-group, if needed
if waitSlaveCount > oldWaitSlaveCount {
err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount)
err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount)
if err != nil {
app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err)
return err
Expand All @@ -1092,13 +1092,13 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node

// and finally enlarge HA-group, if needed
for _, host := range becomeActive {
err := app.enableSemiSyncOnSlave(host)
err := app.enableSemiSyncOnSlave(host, clusterState[host], masterState)
if err != nil {
app.logger.Errorf("failed to enable semi-sync on slave %s: %v", host, err)
}
}
if waitSlaveCount < oldWaitSlaveCount {
err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount)
err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount)
if err != nil {
app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err)
}
Expand Down Expand Up @@ -1136,18 +1136,31 @@ func (app *App) adjustSemiSyncOnMaster(node *mysql.Node, state *NodeState, waitS
return nil
}

func (app *App) enableSemiSyncOnSlave(host string) error {
func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *NodeState) error {
node := app.cluster.Get(host)
err := node.SemiSyncSetSlave()
if err != nil {
app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err)
return err
}
err = node.RestartReplica()
if err != nil {
app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err)
return err
masterGtidSet := gtids.ParseGtidSet(masterState.MasterState.ExecutedGtidSet)
slaveGtidSet := gtids.ParseGtidSet(slaveState.SlaveState.ExecutedGtidSet)

if gtids.IsSlaveAhead(slaveGtidSet, masterGtidSet) {
// we should restart only replicas ahead of the master
err = node.RestartReplica()
if err != nil {
app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err)
return err
}
} else {
err = node.RestartSlaveIOThread()
if err != nil {
app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err)
return err
}
}

return nil
}

Expand Down Expand Up @@ -1859,12 +1872,13 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}
app.logger.Debugf("repair: %s GTID set = %v, new stream_from GTID set is %v", host, myGITIDs, candidateGTIDs)

if !isGTIDLessOrEqual(myGITIDs, candidateGTIDs) && !isGTIDLessOrEqual(candidateGTIDs, myGITIDs) {
// TODO: replace with IsSplitBrained
if gtids.IsSlaveAhead(myGITIDs, candidateGTIDs) && gtids.IsSlaveAhead(candidateGTIDs, myGITIDs) {
app.logger.Errorf("repair: %s and %s are splitbrained...", host, upstreamCandidate)
app.writeEmergeFile("cascade replica splitbain detected")
return
}
if isGTIDLessOrEqual(myGITIDs, candidateGTIDs) {
if gtids.IsSlaveBehindOrEqual(myGITIDs, candidateGTIDs) {
app.logger.Infof("repair: new stream_from host GTID set is superset of our GTID set. Switching Master_Host, Starting replication")
err = app.performChangeMaster(host, upstreamCandidate)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
newGtidSet := gtids.ParseGtidSet(status.GetExecutedGtidSet())
oldGtidSet := gtids.ParseGtidSet(replState.LastGTIDExecuted)

if !isGTIDLessOrEqual(newGtidSet, oldGtidSet) {
if gtids.IsSlaveAhead(newGtidSet, oldGtidSet) {
delete(app.replRepairState, key)
}
}
Expand Down
32 changes: 2 additions & 30 deletions internal/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/google/uuid"
"github.com/yandex/mysync/internal/log"
"github.com/yandex/mysync/internal/mysql"
"github.com/yandex/mysync/internal/mysql/gtids"
Expand Down Expand Up @@ -228,34 +226,8 @@ func isSlavePermanentlyLost(sstatus mysql.ReplicaStatus, masterGtidSet gtids.GTI
return true
}
slaveGtidSet := gtids.ParseGtidSet(sstatus.GetExecutedGtidSet())
return !isGTIDLessOrEqual(slaveGtidSet, masterGtidSet)
}

func isGTIDLessOrEqual(slaveGtidSet, masterGtidSet gtids.GTIDSet) bool {
return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet)
}

func isSplitBrained(slaveGtidSet, masterGtidSet gtids.GTIDSet, masterUUID uuid.UUID) bool {
mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet)
mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet)
for _, slaveSet := range mysqlSlaveGtidSet.Sets {
masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()]
if !ok {
return true
}

if masterSet.Contain(slaveSet) {
continue
}

if masterSet.SID == masterUUID {
continue
}

return true
}

return false
// TODO: why ahead = lost?..
return gtids.IsSlaveAhead(slaveGtidSet, masterGtidSet)
}

func validatePriority(priority *int64) error {
Expand Down
13 changes: 7 additions & 6 deletions internal/app/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/yandex/mysync/internal/log"
"github.com/yandex/mysync/internal/mysql"
"github.com/yandex/mysync/internal/mysql/gtids"
)

func mustGTIDSet(s string) gomysql.GTIDSet {
Expand Down Expand Up @@ -338,41 +339,41 @@ func TestIsSplitBrained(t *testing.T) {
slaveGTID := mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok := isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok := gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica is lagging behind the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-99," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica is lagging behind the new master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica applied the transaction from the master before the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-101," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica applied a transaction not from the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-101")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.True(t, ok)

// the replica applied a new transaction not from the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-101," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100," +
"BB6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.True(t, ok)
}
37 changes: 37 additions & 0 deletions internal/mysql/gtids/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package gtids

import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/google/uuid"
)

func IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet GTIDSet) bool {
return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet)
}

func IsSlaveAhead(slaveGtidSet, masterGtidSet GTIDSet) bool {
return !IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet)
}

func IsSplitBrained(slaveGtidSet, masterGtidSet GTIDSet, masterUUID uuid.UUID) bool {
mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet)
mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet)
for _, slaveSet := range mysqlSlaveGtidSet.Sets {
masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()]
if !ok {
return true
}

if masterSet.Contain(slaveSet) {
continue
}

if masterSet.SID == masterUUID {
continue
}

return true
}

return false
}
2 changes: 1 addition & 1 deletion internal/mysql/gtids/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type GTIDSet = mysql.GTIDSet

func ParseGtidSet(gtidset string) mysql.GTIDSet {
func ParseGtidSet(gtidset string) GTIDSet {
parsed, err := mysql.ParseGTIDSet(mysql.MySQLFlavor, gtidset)
if err != nil {
panic(err)
Expand Down

0 comments on commit a5a64da

Please sign in to comment.