diff --git a/internal/app/app.go b/internal/app/app.go index f5e6f910..8b87edaf 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -940,7 +940,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt continue } sgtids := gtids.ParseGtidSet(sstatus.ExecutedGtidSet) - if sstatus.ReplicationState != mysql.ReplicationRunning || isSplitBrained(sgtids, mgtids, muuid) { + if sstatus.ReplicationState != mysql.ReplicationRunning || gtids.IsSplitBrained(sgtids, mgtids, muuid) { app.logger.Errorf("calc active nodes: %s is not replicating or splitbrained, deleting from active...", host) continue } @@ -1069,7 +1069,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node // first, shrink HA-group, if needed if waitSlaveCount > oldWaitSlaveCount { - err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount) + err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount) if err != nil { app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err) return err @@ -1092,13 +1092,13 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node // and finally enlarge HA-group, if needed for _, host := range becomeActive { - err := app.enableSemiSyncOnSlave(host) + err := app.enableSemiSyncOnSlave(host, clusterState[host], masterState) if err != nil { app.logger.Errorf("failed to enable semi-sync on slave %s: %v", host, err) } } if waitSlaveCount < oldWaitSlaveCount { - err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount) + err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount) if err != nil { app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err) } @@ -1136,18 +1136,31 @@ func (app *App) adjustSemiSyncOnMaster(node *mysql.Node, state *NodeState, waitS return nil } -func (app *App) enableSemiSyncOnSlave(host string) error { +func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *NodeState) error { node := app.cluster.Get(host) err := node.SemiSyncSetSlave() if err != nil { app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err) return err } - err = node.RestartReplica() - if err != nil { - app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err) - return err + masterGtidSet := gtids.ParseGtidSet(masterState.MasterState.ExecutedGtidSet) + slaveGtidSet := gtids.ParseGtidSet(slaveState.SlaveState.ExecutedGtidSet) + + if gtids.IsSlaveAhead(slaveGtidSet, masterGtidSet) { + // we should restart only replicas ahead of the master + err = node.RestartReplica() + if err != nil { + app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err) + return err + } + } else { + err = node.RestartSlaveIOThread() + if err != nil { + app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err) + return err + } } + return nil } @@ -1859,12 +1872,13 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod } app.logger.Debugf("repair: %s GTID set = %v, new stream_from GTID set is %v", host, myGITIDs, candidateGTIDs) - if !isGTIDLessOrEqual(myGITIDs, candidateGTIDs) && !isGTIDLessOrEqual(candidateGTIDs, myGITIDs) { + // TODO: replace with IsSplitBrained + if gtids.IsSlaveAhead(myGITIDs, candidateGTIDs) && gtids.IsSlaveAhead(candidateGTIDs, myGITIDs) { app.logger.Errorf("repair: %s and %s are splitbrained...", host, upstreamCandidate) app.writeEmergeFile("cascade replica splitbain detected") return } - if isGTIDLessOrEqual(myGITIDs, candidateGTIDs) { + if gtids.IsSlaveBehindOrEqual(myGITIDs, candidateGTIDs) { app.logger.Infof("repair: new stream_from host GTID set is superset of our GTID set. Switching Master_Host, Starting replication") err = app.performChangeMaster(host, upstreamCandidate) if err != nil { diff --git a/internal/app/replication.go b/internal/app/replication.go index 29488c39..b7f6f32d 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -41,7 +41,7 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { newGtidSet := gtids.ParseGtidSet(status.GetExecutedGtidSet()) oldGtidSet := gtids.ParseGtidSet(replState.LastGTIDExecuted) - if !isGTIDLessOrEqual(newGtidSet, oldGtidSet) { + if gtids.IsSlaveAhead(newGtidSet, oldGtidSet) { delete(app.replRepairState, key) } } diff --git a/internal/app/util.go b/internal/app/util.go index 3e878b98..6a2feca3 100644 --- a/internal/app/util.go +++ b/internal/app/util.go @@ -4,8 +4,6 @@ import ( "fmt" "time" - gomysql "github.com/go-mysql-org/go-mysql/mysql" - "github.com/google/uuid" "github.com/yandex/mysync/internal/log" "github.com/yandex/mysync/internal/mysql" "github.com/yandex/mysync/internal/mysql/gtids" @@ -228,34 +226,8 @@ func isSlavePermanentlyLost(sstatus mysql.ReplicaStatus, masterGtidSet gtids.GTI return true } slaveGtidSet := gtids.ParseGtidSet(sstatus.GetExecutedGtidSet()) - return !isGTIDLessOrEqual(slaveGtidSet, masterGtidSet) -} - -func isGTIDLessOrEqual(slaveGtidSet, masterGtidSet gtids.GTIDSet) bool { - return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet) -} - -func isSplitBrained(slaveGtidSet, masterGtidSet gtids.GTIDSet, masterUUID uuid.UUID) bool { - mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet) - mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet) - for _, slaveSet := range mysqlSlaveGtidSet.Sets { - masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()] - if !ok { - return true - } - - if masterSet.Contain(slaveSet) { - continue - } - - if masterSet.SID == masterUUID { - continue - } - - return true - } - - return false + // TODO: why ahead = lost?.. + return gtids.IsSlaveAhead(slaveGtidSet, masterGtidSet) } func validatePriority(priority *int64) error { diff --git a/internal/app/util_test.go b/internal/app/util_test.go index efc21df0..409ed0c2 100644 --- a/internal/app/util_test.go +++ b/internal/app/util_test.go @@ -10,6 +10,7 @@ import ( gomysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/yandex/mysync/internal/log" "github.com/yandex/mysync/internal/mysql" + "github.com/yandex/mysync/internal/mysql/gtids" ) func mustGTIDSet(s string) gomysql.GTIDSet { @@ -338,34 +339,34 @@ func TestIsSplitBrained(t *testing.T) { slaveGTID := mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," + "09978591-5754-4710-BF67-062880ABE1B4:1-100," + "AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100") - ok := isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok := gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.False(t, ok) // the replica is lagging behind the master slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-99," + "09978591-5754-4710-BF67-062880ABE1B4:1-100," + "AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100") - ok = isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.False(t, ok) // the replica is lagging behind the new master slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," + "09978591-5754-4710-BF67-062880ABE1B4:1-100") - ok = isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.False(t, ok) // the replica applied the transaction from the master before the master slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-101," + "09978591-5754-4710-BF67-062880ABE1B4:1-100," + "AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100") - ok = isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.False(t, ok) // the replica applied a transaction not from the master slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," + "09978591-5754-4710-BF67-062880ABE1B4:1-100," + "AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-101") - ok = isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.True(t, ok) // the replica applied a new transaction not from the master @@ -373,6 +374,6 @@ func TestIsSplitBrained(t *testing.T) { "09978591-5754-4710-BF67-062880ABE1B4:1-100," + "AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100," + "BB6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100") - ok = isSplitBrained(slaveGTID, masterGTID, masterUUID) + ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID) require.True(t, ok) } diff --git a/internal/mysql/gtids/utils.go b/internal/mysql/gtids/utils.go new file mode 100644 index 00000000..b4ac98b1 --- /dev/null +++ b/internal/mysql/gtids/utils.go @@ -0,0 +1,37 @@ +package gtids + +import ( + gomysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/google/uuid" +) + +func IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet GTIDSet) bool { + return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet) +} + +func IsSlaveAhead(slaveGtidSet, masterGtidSet GTIDSet) bool { + return !IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet) +} + +func IsSplitBrained(slaveGtidSet, masterGtidSet GTIDSet, masterUUID uuid.UUID) bool { + mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet) + mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet) + for _, slaveSet := range mysqlSlaveGtidSet.Sets { + masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()] + if !ok { + return true + } + + if masterSet.Contain(slaveSet) { + continue + } + + if masterSet.SID == masterUUID { + continue + } + + return true + } + + return false +} diff --git a/internal/mysql/gtids/wrapper.go b/internal/mysql/gtids/wrapper.go index 8933d6a7..683e59dc 100644 --- a/internal/mysql/gtids/wrapper.go +++ b/internal/mysql/gtids/wrapper.go @@ -6,7 +6,7 @@ import ( type GTIDSet = mysql.GTIDSet -func ParseGtidSet(gtidset string) mysql.GTIDSet { +func ParseGtidSet(gtidset string) GTIDSet { parsed, err := mysql.ParseGTIDSet(mysql.MySQLFlavor, gtidset) if err != nil { panic(err)