Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support async replication #88

Merged
merged 43 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
55dd18b
support async replication
Mar 14, 2024
4dfafc3
fix a_sync->async, add async in tests yaml
Mar 17, 2024
afefe5c
fix mysync.yaml syntax err
Mar 18, 2024
3a4dcb0
fix docker compose cfg
Mar 18, 2024
62a430a
fix waitForCatchUp return in async mode
Mar 19, 2024
71f4403
set master online on switchover: phase 5
Mar 19, 2024
091024c
fix PriorityChoiceMaxLag in async mode
Mar 20, 2024
b685481
fix queryCalcMdbReplMonTsDelay query
Mar 22, 2024
2bfb126
async mode: add tests, fix linters
Mar 27, 2024
e88784d
async mode: add tests, fix linters
Mar 27, 2024
a155b32
async mode: add tests, fix linters
Mar 28, 2024
27b7834
async replication refactoring
May 6, 2024
00bdd06
async replication test fix
May 6, 2024
81a0f60
async replication test fix
May 6, 2024
3f44fc7
async replication test fix
May 7, 2024
c741698
Merge branch 'master' into async-replication
teem0n May 7, 2024
9b8d7a3
async replication test fix
May 14, 2024
302ff31
Merge branch 'async-replication' of github.com:yandex/mysync into asy…
May 14, 2024
b8a90db
async replication test fix
May 14, 2024
7425ad9
add mysync-repl-mon feature
May 22, 2024
3481208
add refactor mdb_repl_mon table name to custom configuring name
May 22, 2024
51782c9
add refactor mdb_repl_mon table name to custom configuring name
May 22, 2024
6f9a218
repl_mon fixes
May 27, 2024
5efe22e
repl_mon fixes
May 28, 2024
928e140
fix typo
May 28, 2024
de6a36e
add repl_mon tests
May 28, 2024
26efe42
fix async tests
May 28, 2024
9e9a083
fix async tests
May 29, 2024
7a878b3
fix async tests, add repl_mon.feature launch
May 29, 2024
4faacd9
fix async tests
May 29, 2024
428b272
fix async tests, fix repl_mon tests
May 30, 2024
31cfffb
add switch_helper
Jun 3, 2024
b7b0b32
linters fix
Jun 3, 2024
c5e08fb
fix " too many arguments "
Jun 3, 2024
68c8a96
linters fix
Jun 3, 2024
2de37a7
linters fix
Jun 3, 2024
24f2840
linters fix
Jun 3, 2024
7fddbdd
Merge branch 'master' into async-replication
teem0n Jun 10, 2024
d9ba04f
Merge branch 'master' into async-replication
teem0n Jun 11, 2024
9fd2bd6
Merge branch 'master' into async-replication
teem0n Jun 11, 2024
723d4db
fix async tests
Jun 13, 2024
1468bb5
fix async tests
Jun 13, 2024
6937a8b
refactor switch_helper
Jun 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/docker-tests-8.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
command:
- 'VERSION=8.0 GODOG_FEATURE=active_nodes.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=async.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=async_setting.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=cascade_replicas.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=CLI.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=crash_recovery.feature make test'
Expand All @@ -57,6 +58,7 @@ jobs:
- 'VERSION=8.0 GODOG_FEATURE=readonly_filesystem.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=recovery.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=repair.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=repl_mon.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=statefile.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=switchover_from.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=switchover_to.feature make test'
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/docker-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ jobs:
matrix:
command:
- 'GODOG_FEATURE=active_nodes.feature make test'
- 'GODOG_FEATURE=async.feature make test'
- 'GODOG_FEATURE=cascade_replicas.feature make test'
- 'GODOG_FEATURE=async.feature make test'
- 'GODOG_FEATURE=cascade_replicas.feature make test'
- 'GODOG_FEATURE=CLI.feature make test'
- 'GODOG_FEATURE=crash_recovery.feature make test'
- 'GODOG_FEATURE=events_reenable.feature make test'
Expand All @@ -55,8 +55,9 @@ jobs:
- 'GODOG_FEATURE=priority.feature make test'
- 'GODOG_FEATURE=readonly_filesystem.feature make test'
- 'GODOG_FEATURE=recovery.feature make test'
- 'GODOG_FEATURE=repair.feature make test'
- 'GODOG_FEATURE=statefile.feature make test'
- 'GODOG_FEATURE=repair.feature make test'
- 'GODOG_FEATURE=repl_mon.feature make test'
- 'GODOG_FEATURE=statefile.feature make test'
- 'GODOG_FEATURE=switchover_from.feature make test'
- 'GODOG_FEATURE=switchover_to.feature make test'
- 'GODOG_FEATURE=zk_failure.feature make test'
Expand Down
68 changes: 66 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type App struct {
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
externalReplication mysql.IExternalReplication
switchHelper mysql.ISwitchHelper
}

// NewApp returns new App. Suddenly.
Expand All @@ -62,6 +63,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
if err != nil {
return nil, err
}
switchHelper := mysql.NewSwitchHelper(config)
app := &App{
state: stateFirstRun,
config: config,
Expand All @@ -70,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
externalReplication: externalReplication,
switchHelper: switchHelper,
}
return app, nil
}
Expand Down Expand Up @@ -256,6 +259,51 @@ func (app *App) externalCAFileChecker(ctx context.Context) {
}
}

func (app *App) replMonWriter(ctx context.Context) {
ticker := time.NewTicker(app.config.ReplMonWriteInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
sstatus, err := localNode.GetReplicaStatus()
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while checking replica status", err)
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if sstatus != nil {
app.logger.Infof("repl mon writer: host is replica")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
readOnly, _, err := localNode.IsReadOnly()
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while checking read only status", err)
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if readOnly {
app.logger.Infof("repl mon writer: host is read only")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
if mysql.IsErrorTableDoesNotExists(err) {
err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while creating repl mon table", err)
}
continue
}
app.logger.Errorf("repl mon writer: got error %v while writing in repl mon table", err)
}
case <-ctx.Done():
return
}
}
}

func (app *App) SetResetupStatus() {
err := app.setResetupStatus(app.cluster.Local().Host(), app.doesResetupFileExist())
if err != nil {
Expand Down Expand Up @@ -710,6 +758,11 @@ func (app *App) stateManager() appState {
app.logger.Errorf("failed to update active nodes in dcs: %v", err)
}

err = app.updateReplMonTS(master)
if err != nil {
app.logger.Errorf("failed to update repl_mon timestamp: %v", err)
}

return stateManager
}

Expand Down Expand Up @@ -1246,7 +1299,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
} else if switchover.From != "" {
positions2 := filterOutNodeFromPositions(positions, switchover.From)
// we ignore splitbrain flag as it should be handled during searching most recent host
newMaster, err = getMostDesirableNode(app.logger, positions2, app.config.PriorityChoiceMaxLag)
newMaster, err = getMostDesirableNode(app.logger, positions2, app.switchHelper.GetPriorityChoiceMaxLag())
if err != nil {
return fmt.Errorf("switchover: error while looking for highest priority node: %s", switchover.From)
}
Expand Down Expand Up @@ -1297,6 +1350,10 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode

// turn slaves to the new master
app.logger.Info("switchover: phase 5: turn to the new master")
err = app.cluster.Get(newMaster).SetOnline()
if err != nil {
return fmt.Errorf("got error on setting new master %s online %v", newMaster, err)
}
errs = util.RunParallel(func(host string) error {
if host == newMaster || !clusterState[host].PingOk {
return nil
Expand Down Expand Up @@ -2145,9 +2202,13 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout
if gtidExecuted.Contain(gtidset) {
return true, nil
}
if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound {
switchover := new(Switchover)
if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound {
return false, nil
}
if app.CheckAsyncSwitchAllowed(node, switchover) {
return true, nil
}
time.Sleep(sleep)
if time.Now().After(deadline) {
break
Expand Down Expand Up @@ -2288,6 +2349,9 @@ func (app *App) Run() int {
if app.config.ExternalReplicationType != util.Disabled {
go app.externalCAFileChecker(ctx)
}
if app.config.ReplMon {
go app.replMonWriter(ctx)
}

handlers := map[appState](func() appState){
stateFirstRun: app.stateFirstRun,
Expand Down
21 changes: 21 additions & 0 deletions internal/app/app_dcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,24 @@ func (app *App) GetMasterHostFromDcs() (string, error) {
}
return "", nil
}

func (app *App) SetReplMonTS(ts string) error {
err := app.dcs.Create(pathMasterReplMonTS, ts)
if err != nil && err != dcs.ErrExists {
return err
}
err = app.dcs.Set(pathMasterReplMonTS, ts)
if err != nil {
return err
}
return nil
}

func (app *App) GetReplMonTS() (string, error) {
var ts string
err := app.dcs.Get(pathMasterReplMonTS, &ts)
if errors.Is(err, dcs.ErrNotFound) {
return "", nil
}
return ts, err
}
38 changes: 38 additions & 0 deletions internal/app/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package app

import (
"fmt"

"github.com/yandex/mysync/internal/mysql"
)

func (app *App) CheckAsyncSwitchAllowed(node *mysql.Node, switchover *Switchover) bool {
if app.config.ASync && switchover.Cause == CauseAuto && app.config.AsyncAllowedLag > 0 {
app.logger.Infof("async mode is active and this is auto switch so we checking new master delay")
ts, err := app.GetReplMonTS()
if err != nil {
app.logger.Errorf("failed to get mdb repl mon ts: %v", err)
return false
}
delay, err := node.CalcReplMonTSDelay(app.config.ReplMonSchemeName, app.config.ReplMonTableName, ts)
if err != nil {
app.logger.Errorf("failed to calc mdb repl mon ts: %v", err)
return false
}
if delay < app.config.AsyncAllowedLag {
app.logger.Infof("async allowed lag is %d and current lag on host %s is %d, so we don't wait for catch up any more",
app.config.AsyncAllowedLag, node.Host(), delay)
return true
}
}
return false
}

func (app *App) updateReplMonTS(master string) error {
masterNode := app.cluster.Get(master)
ts, err := masterNode.GetReplMonTS(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
return fmt.Errorf("failed to get master repl_mon timestamp: %v", err)
}
return app.SetReplMonTS(ts)
}
2 changes: 1 addition & 1 deletion internal/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
app.logger.Errorf(err.Error())
return 1
}
toHost, err = getMostDesirableNode(app.logger, positions, app.config.PriorityChoiceMaxLag)
toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag())
if err != nil {
app.logger.Errorf(err.Error())
return 1
Expand Down
3 changes: 3 additions & 0 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
pathResetupStatus = "resetup_status"

pathLastShutdownNodeTime = "last_shutdown_node_time"

// last known timestamp from repl_mon table
pathMasterReplMonTS = "master_repl_mon_ts"
)

var (
Expand Down
19 changes: 19 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ type Config struct {
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"`
ASync bool `config:"async" yaml:"async"`
AsyncAllowedLag int64 `config:"async_allowed_lag" yaml:"async_allowed_lag"`
ReplMon bool `config:"repl_mon" yaml:"repl_mon"`
ReplMonSchemeName string `config:"repl_mon_scheme_name" yaml:"repl_mon_scheme_name"`
ReplMonTableName string `config:"repl_mon_table_name" yaml:"repl_mon_table_name"`
ReplMonWriteInterval time.Duration `config:"repl_mon_write_interval" yaml:"repl_mon_write_interval"`
ReplMonErrorWaitInterval time.Duration `config:"repl_mon_error_wait_interval" yaml:"repl_mon_error_wait_interval"`
ReplMonSlaveWaitInterval time.Duration `config:"repl_mon_slave_wait_interval" yaml:"repl_mon_slave_wait_interval"`
}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -164,6 +172,14 @@ func DefaultConfig() (Config, error) {
ReplicationChannel: "",
ExternalReplicationChannel: "external",
ExternalReplicationType: util.Disabled,
ASync: false,
AsyncAllowedLag: 0,
ReplMon: false,
ReplMonSchemeName: "mysql",
ReplMonTableName: "mysync_repl_mon",
ReplMonWriteInterval: 1 * time.Second,
ReplMonErrorWaitInterval: 10 * time.Second,
ReplMonSlaveWaitInterval: 10 * time.Second,
}
return config, nil
}
Expand Down Expand Up @@ -205,5 +221,8 @@ func (cfg *Config) Validate() error {
if cfg.NotCriticalDiskUsage > cfg.CriticalDiskUsage {
return fmt.Errorf("not_critical_disk_usage should be <= critical_disk_usage")
}
if cfg.SemiSync && cfg.ASync {
return fmt.Errorf("can't run in both semisync and async mode")
}
return nil
}
8 changes: 8 additions & 0 deletions internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ type Version struct {
PatchVersion int `db:"PatchVersion"`
}

type ReplMonTS struct {
Timestamp string `db:"ts"`
}

type ReplMonTSDelay struct {
Delay int64 `db:"delay"`
}

const (
Version80Major = 8
Version80Minor = 0
Expand Down
41 changes: 41 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]inter
return err
}

func (n *Node) queryRowMogrify(queryName string, arg map[string]interface{}, result interface{}) error {
return n.queryRowMogrifyWithTimeout(queryName, arg, result, n.config.DBTimeout)
}

// IsRunning checks if daemon process is running
func (n *Node) IsRunning() (bool, error) {
if !n.IsLocal() {
Expand Down Expand Up @@ -1013,3 +1017,40 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {
}
return nil
}

func (n *Node) GetReplMonTS(replMonSchemeName string, replMonTable string) (string, error) {
result := new(ReplMonTS)
err := n.queryRowMogrify(queryGetReplMonTS,
map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
},
result)
return result.Timestamp, err
}

func (n *Node) CalcReplMonTSDelay(replMonSchemeName string, replMonTable string, ts string) (int64, error) {
result := new(ReplMonTSDelay)
err := n.queryRowMogrify(queryCalcReplMonTSDelay,
map[string]interface{}{
"ts": ts,
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
},
result)
return result.Delay, err
}

func (n *Node) CreateReplMonTable(replMonSchemeName string, replMonTable string) error {
return n.execMogrify(queryCreateReplMonTable, map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
})
}

func (n *Node) UpdateReplMonTable(replMonSchemeName string, replMonTable string) error {
return n.execMogrify(queryUpdateReplMon, map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
})
}
17 changes: 17 additions & 0 deletions internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
queryGetReplMonTS = "get_repl_mon_ts"
queryCalcReplMonTSDelay = "calc_repl_mon_ts_delay"
queryCreateReplMonTable = "create_repl_mon_table"
queryUpdateReplMon = "update_repl_mon"
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -125,4 +129,17 @@ var DefaultQueries = map[string]string{
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM :replMonSchemeName.:replMonTable`,
queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM :replMonSchemeName.:replMonTable`,
queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS :replMonSchemeName.:replMonTable(
id INT NOT NULL PRIMARY KEY,
ts TIMESTAMP(3)
)
ENGINE=INNODB`,
queryUpdateReplMon: `INSERT INTO :replMonSchemeName.:replMonTable(id, ts)
(
SELECT 1, CURRENT_TIMESTAMP(3)
WHERE @@read_only = 0
)
ON DUPLICATE KEY UPDATE ts = CURRENT_TIMESTAMP(3)`,
}
Loading
Loading