From 4851511b250d9bd287cf0537d4a71e0dafd9f2d7 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Wed, 16 Oct 2024 03:50:09 -0700 Subject: [PATCH] backport logical backups Signed-off-by: 'Renan Rangel' --- go/test/fuzzing/tablet_manager_fuzzer.go | 4 +- go/vt/mysqlctl/backup.go | 40 +- go/vt/mysqlctl/backup_test.go | 29 + go/vt/mysqlctl/backupengine.go | 1 + go/vt/mysqlctl/builtinbackupengine.go | 5 + go/vt/mysqlctl/builtinbackupengine_test.go | 3 +- .../{fakemysqldaemon => }/fakemysqldaemon.go | 44 +- go/vt/mysqlctl/mysql_daemon.go | 13 +- go/vt/mysqlctl/mysqld.go | 7 +- go/vt/mysqlctl/mysqlshellbackupengine.go | 552 ++++++++++++++++++ go/vt/mysqlctl/mysqlshellbackupengine_test.go | 278 +++++++++ go/vt/mysqlctl/query.go | 37 ++ go/vt/mysqlctl/replication.go | 66 ++- go/vt/mysqlctl/xtrabackupengine.go | 22 +- .../vttablet/tabletmanager/rpc_query_test.go | 4 +- .../vttablet/tabletmanager/rpc_replication.go | 4 +- .../tabletmanager/rpc_replication_test.go | 6 +- go/vt/vttablet/tabletmanager/tm_init_test.go | 8 +- go/vt/vttablet/tabletmanager/tm_state_test.go | 4 +- .../vreplication/controller_test.go | 12 +- .../tabletmanager/vreplication/engine_test.go | 22 +- .../tabletserver/repltracker/poller_test.go | 4 +- .../repltracker/repltracker_test.go | 6 +- go/vt/wrangler/fake_tablet_test.go | 6 +- go/vt/wrangler/testlib/fake_tablet.go | 6 +- 25 files changed, 1086 insertions(+), 97 deletions(-) rename go/vt/mysqlctl/{fakemysqldaemon => }/fakemysqldaemon.go (94%) create mode 100644 go/vt/mysqlctl/mysqlshellbackupengine.go create mode 100644 go/vt/mysqlctl/mysqlshellbackupengine_test.go diff --git a/go/test/fuzzing/tablet_manager_fuzzer.go b/go/test/fuzzing/tablet_manager_fuzzer.go index 0e6b6aaece7..316cf75fb82 100644 --- a/go/test/fuzzing/tablet_manager_fuzzer.go +++ b/go/test/fuzzing/tablet_manager_fuzzer.go @@ -22,7 +22,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/vttablet/tabletmanager" "vitess.io/vitess/go/vt/vttablet/tabletservermock" @@ -42,7 +42,7 @@ func FuzzTabletManagerExecuteFetchAsDba(data []byte) int { cp := mysql.ConnParams{} db := fakesqldb.New(t) db.AddQueryPattern(".*", &sqltypes.Result{}) - daemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + daemon := mysqlctl.NewFakeMysqlDaemon(db) dbName := "dbname" tm := &tabletmanager.TabletManager{ diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 7d62681c5dc..0547f6f104d 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -17,16 +17,18 @@ limitations under the License. package mysqlctl import ( + "bufio" "errors" "fmt" + "io" "os" "path/filepath" - "strconv" "strings" "time" "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" "context" @@ -352,21 +354,6 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) return nil, err } - // We disable super_read_only, in case it is in the default MySQL startup - // parameters and will be blocking the writes we need to do in - // PopulateMetadataTables(). We do it blindly, since - // this will fail on MariaDB, which doesn't have super_read_only - // This is safe, since we're restarting MySQL after the restore anyway - params.Logger.Infof("Restore: disabling super_read_only") - if err := params.Mysqld.SetSuperReadOnly(false); err != nil { - if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { - params.Logger.Warningf("Restore: server does not know about super_read_only, continuing anyway...") - } else { - params.Logger.Errorf("Restore: unexpected error while trying to set super_read_only: %v", err) - return nil, err - } - } - params.Logger.Infof("Restore: running mysql_upgrade") if err := params.Mysqld.RunMysqlUpgrade(); err != nil { return nil, vterrors.Wrap(err, "mysql_upgrade failed") @@ -403,3 +390,24 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) restoreDuration.Set(int64(time.Since(startTs).Seconds())) return manifest, nil } + +// scanLinesToLogger scans full lines from the given Reader and sends them to +// the given Logger until EOF. +func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) { + defer doneFunc() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + logger.Infof("%s: %s", prefix, line) + } + if err := scanner.Err(); err != nil { + // This is usually run in a background goroutine, so there's no point + // returning an error. Just log it. + logger.Warningf("error scanning lines from %s: %v", prefix, err) + } +} + +func FormatRFC3339(t time.Time) string { + return t.Format(time.RFC3339) +} diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go index 08d5e31a116..d68af0e044a 100644 --- a/go/vt/mysqlctl/backup_test.go +++ b/go/vt/mysqlctl/backup_test.go @@ -17,13 +17,19 @@ limitations under the License. package mysqlctl import ( + "fmt" + "io" "os" "path" "reflect" "sort" + "sync" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/logutil" ) func TestFindFilesToBackupWithoutRedoLog(t *testing.T) { @@ -214,3 +220,26 @@ type forTest []FileEntry func (f forTest) Len() int { return len(f) } func (f forTest) Swap(i, j int) { f[i], f[j] = f[j], f[i] } func (f forTest) Less(i, j int) bool { return f[i].Base+f[i].Name < f[j].Base+f[j].Name } + +func TestScanLinesToLogger(t *testing.T) { + reader, writer := io.Pipe() + logger := logutil.NewMemoryLogger() + var wg sync.WaitGroup + + wg.Add(1) + go scanLinesToLogger("test", reader, logger, wg.Done) + + for i := range [100]int{} { + _, err := writer.Write([]byte(fmt.Sprintf("foobar %d\n", i))) + require.NoError(t, err) + } + + writer.Close() + wg.Wait() + + require.Equal(t, 100, len(logger.Events)) + + for i, event := range logger.Events { + require.Equal(t, fmt.Sprintf("test: foobar %d", i), event.Value) + } +} diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index 2b0b08f7e14..68f5db069df 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -99,6 +99,7 @@ type RestoreParams struct { // Returns the manifest of a backup if successful, otherwise returns an error type RestoreEngine interface { ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) + ShouldStartMySQLAfterRestore() bool } // BackupRestoreEngine is a combination of BackupEngine and RestoreEngine. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f4c0a0b5161..cd5ee811c5d 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -770,6 +770,11 @@ func (be *BuiltinBackupEngine) ShouldDrainForBackup() bool { return true } +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +func (be *BuiltinBackupEngine) ShouldStartMySQLAfterRestore() bool { + return true +} + func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server, keyspace, shard string) (mysql.Position, error) { si, err := ts.GetShard(ctx, keyspace, shard) if err != nil { diff --git a/go/vt/mysqlctl/builtinbackupengine_test.go b/go/vt/mysqlctl/builtinbackupengine_test.go index b6837380db7..280de5ac18f 100644 --- a/go/vt/mysqlctl/builtinbackupengine_test.go +++ b/go/vt/mysqlctl/builtinbackupengine_test.go @@ -19,7 +19,6 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" @@ -104,7 +103,7 @@ func TestExecuteBackup(t *testing.T) { // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: // "STOP SLAVE", "START SLAVE", in that order. - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} // mysqld.ShutdownTime = time.Minute diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go similarity index 94% rename from go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go rename to go/vt/mysqlctl/fakemysqldaemon.go index 3effa9309c2..dab1f6b07f2 100644 --- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fakemysqldaemon +package mysqlctl import ( + "errors" "fmt" "reflect" "strings" @@ -30,7 +31,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/dbconnpool" - "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/tmutils" querypb "vitess.io/vitess/go/vt/proto/query" @@ -171,6 +171,9 @@ type FakeMysqlDaemon struct { // TimeoutHook is a func that can be called at the beginning of any method to fake a timeout. // all a test needs to do is make it { return context.DeadlineExceeded } TimeoutHook func() error + + // Version is the version that will be returned by GetVersionString. + Version string } // NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears @@ -181,6 +184,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon { db: db, Running: true, IOThreadRunning: true, + Version: "8.0.32", } if db != nil { result.appPool = dbconnpool.NewConnectionPool("AppConnPool", 5, time.Minute, 0) @@ -190,7 +194,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon { } // Start is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysqldArgs ...string) error { +func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error { if fmd.Running { return fmt.Errorf("fake mysql daemon already running") } @@ -208,7 +212,7 @@ func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysq } // Shutdown is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *mysqlctl.Mycnf, waitForMysqld bool) error { +func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *Mycnf, waitForMysqld bool) error { if !fmd.Running { return fmt.Errorf("fake mysql daemon not running") } @@ -231,17 +235,17 @@ func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error { } // ReinitConfig is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *Mycnf) error { return nil } // RefreshConfig is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *Mycnf) error { return nil } // Wait is part of the MysqlDaemon interface. -func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *mysqlctl.Mycnf) error { +func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *Mycnf) error { return nil } @@ -345,17 +349,22 @@ func (fmd *FakeMysqlDaemon) IsReadOnly() (bool, error) { return fmd.ReadOnly, nil } +// IsSuperReadOnly is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) IsSuperReadOnly(ctx context.Context) (bool, error) { + return fmd.SuperReadOnly, nil +} + // SetReadOnly is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) SetReadOnly(on bool) error { fmd.ReadOnly = on return nil } -// SetSuperReadOnly is part of the MysqlDaemon interface -func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) error { +// SetSuperReadOnly is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) { fmd.SuperReadOnly = on fmd.ReadOnly = on - return nil + return nil, nil } // StartReplication is part of the MysqlDaemon interface. @@ -467,6 +476,11 @@ func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Posit return fmd.PromoteResult, nil } +// ExecuteSuperQuery is part of the MysqlDaemon interface +func (fmd *FakeMysqlDaemon) ExecuteSuperQuery(ctx context.Context, query string) error { + return fmd.ExecuteSuperQueryList(ctx, []string{query}) +} + // ExecuteSuperQueryList is part of the MysqlDaemon interface func (fmd *FakeMysqlDaemon) ExecuteSuperQueryList(ctx context.Context, queryList []string) error { for _, query := range queryList { @@ -667,3 +681,13 @@ func (fmd *FakeMysqlDaemon) GetVersionString() string { func (fmd *FakeMysqlDaemon) GetVersionComment(ctx context.Context) string { return "" } + +// AcquireGlobalReadLock is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error { + return errors.New("not implemented") +} + +// ReleaseGlobalReadLock is part of the MysqlDaemon interface. +func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error { + return errors.New("not implemented") +} diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index ec96eee7b2e..185aac327ac 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -70,8 +70,9 @@ type MysqlDaemon interface { ResetReplication(ctx context.Context) error PrimaryPosition() (mysql.Position, error) IsReadOnly() (bool, error) + IsSuperReadOnly(ctx context.Context) (bool, error) SetReadOnly(on bool) error - SetSuperReadOnly(on bool) error + SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) SetReplicationPosition(ctx context.Context, pos mysql.Position) error SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error @@ -103,6 +104,9 @@ type MysqlDaemon interface { // GetVersionComment returns the version comment GetVersionComment(ctx context.Context) string + // ExecuteSuperQuery executes a single query, no result + ExecuteSuperQuery(ctx context.Context, query string) error + // ExecuteSuperQueryList executes a list of queries, no result ExecuteSuperQueryList(ctx context.Context, queryList []string) error @@ -115,6 +119,13 @@ type MysqlDaemon interface { // DisableBinlogPlayback disable playback of binlog events DisableBinlogPlayback() error + // AcquireGlobalReadLock acquires a global read lock and keeps the connection so + // as to release it with the function below. + AcquireGlobalReadLock(ctx context.Context) error + + // ReleaseGlobalReadLock release a lock acquired with the connection from the above function. + ReleaseGlobalReadLock(ctx context.Context) error + // Close will close this instance of Mysqld. It will wait for all dba // queries to be finished. Close() diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 45ec3b7bd73..a3df3172aa5 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -87,9 +87,10 @@ const maxLogFileSampleSize = 4096 // Mysqld is the object that represents a mysqld daemon running on this server. type Mysqld struct { - dbcfgs *dbconfigs.DBConfigs - dbaPool *dbconnpool.ConnectionPool - appPool *dbconnpool.ConnectionPool + dbcfgs *dbconfigs.DBConfigs + dbaPool *dbconnpool.ConnectionPool + appPool *dbconnpool.ConnectionPool + lockConn *dbconnpool.PooledDBConnection capabilities capabilitySet diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go new file mode 100644 index 00000000000..02c82b66e25 --- /dev/null +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -0,0 +1,552 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path" + "slices" + "strings" + "sync" + "time" + + "github.com/spf13/pflag" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/vterrors" +) + +var ( + // location to store the mysql shell backup + mysqlShellBackupLocation = "" + // flags passed to the mysql shell utility, used both on dump/restore + mysqlShellFlags = "--defaults-file=/dev/null --js -h localhost" + // flags passed to the Dump command, as a JSON string + mysqlShellDumpFlags = `{"threads": 4}` + // flags passed to the Load command, as a JSON string + mysqlShellLoadFlags = `{"threads": 4, "loadUsers": true, "updateGtidSet": "replace", "skipBinlog": true, "progressFile": ""}` + // drain a tablet when taking a backup + mysqlShellBackupShouldDrain = false + // disable redo logging and double write buffer + mysqlShellSpeedUpRestore = false + + // use when checking if we need to create the directory on the local filesystem or not. + knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"} + + MySQLShellPreCheckError = errors.New("MySQLShellPreCheckError") + + // internal databases not backed up by MySQL Shell + internalDBs = []string{ + "information_schema", "mysql", "ndbinfo", "performance_schema", "sys", + } + // reserved MySQL users https://dev.mysql.com/doc/refman/8.0/en/reserved-accounts.html + reservedUsers = []string{ + "mysql.sys@localhost", "mysql.session@localhost", "mysql.infoschema@localhost", + } +) + +// MySQLShellBackupManifest represents a backup. +type MySQLShellBackupManifest struct { + // BackupManifest is an anonymous embedding of the base manifest struct. + // Note that the manifest itself doesn't fill the Position field, as we have + // no way of fetching that information from mysqlsh at the moment. + BackupManifest + + // Location of the backup directory + BackupLocation string + // Params are the parameters that backup was created with + Params string +} + +func init() { + BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{} + + for _, cmd := range []string{"vtcombo", "vttablet", "vtbackup", "vttestserver", "vtctldclient"} { + servenv.OnParseFor(cmd, registerMysqlShellBackupEngineFlags) + } +} + +func registerMysqlShellBackupEngineFlags(fs *pflag.FlagSet) { + fs.StringVar(&mysqlShellBackupLocation, "mysql-shell-backup-location", mysqlShellBackupLocation, "location where the backup will be stored") + fs.StringVar(&mysqlShellFlags, "mysql-shell-flags", mysqlShellFlags, "execution flags to pass to mysqlsh binary to be used during dump/load") + fs.StringVar(&mysqlShellDumpFlags, "mysql-shell-dump-flags", mysqlShellDumpFlags, "flags to pass to mysql shell dump utility. This should be a JSON string and will be saved in the MANIFEST") + fs.StringVar(&mysqlShellLoadFlags, "mysql-shell-load-flags", mysqlShellLoadFlags, "flags to pass to mysql shell load utility. This should be a JSON string") + fs.BoolVar(&mysqlShellBackupShouldDrain, "mysql-shell-should-drain", mysqlShellBackupShouldDrain, "decide if we should drain while taking a backup or continue to serving traffic") + fs.BoolVar(&mysqlShellSpeedUpRestore, "mysql-shell-speedup-restore", mysqlShellSpeedUpRestore, "speed up restore by disabling redo logging and double write buffer during the restore process") +} + +// MySQLShellBackupEngine encapsulates the logic to implement the restoration +// of a mysql-shell based backup. +type MySQLShellBackupEngine struct { +} + +const ( + mysqlShellBackupBinaryName = "mysqlsh" + mysqlShellBackupEngineName = "mysqlshell" +) + +func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result bool, finalErr error) { + params.Logger.Infof("Starting ExecuteBackup in %s", params.TabletAlias) + + location := path.Join(mysqlShellBackupLocation, bh.Directory(), bh.Name()) + + err := be.backupPreCheck(location) + if err != nil { + return false, vterrors.Wrap(err, "failed backup precheck") + } + + args := []string{} + if mysqlShellFlags != "" { + args = append(args, strings.Fields(mysqlShellFlags)...) + } + + args = append(args, "-e", fmt.Sprintf("util.dumpInstance(%q, %s)", + location, + mysqlShellDumpFlags, + )) + + // to be able to get the consistent GTID sets, we will acquire a global read lock before starting mysql shell. + // oncce we have the lock, we start it and wait unti it has acquired and release its global read lock, which + // should guarantee that both use and mysql shell are seeing the same executed GTID sets. + // after this we release the lock so that replication can continue. this usually should take just a few seconds. + params.Logger.Infof("acquiring a global read lock before fetching the executed GTID sets") + err = params.Mysqld.AcquireGlobalReadLock(ctx) + if err != nil { + return false, vterrors.Wrap(err, "failed to acquire read lock to start backup") + } + lockAcquired := time.Now() // we will report how long we hold the lock for + + posBeforeBackup, err := params.Mysqld.PrimaryPosition() + if err != nil { + return false, vterrors.Wrap(err, "failed to fetch position") + } + + cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...) + + params.Logger.Infof("running %s", cmd.String()) + + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return false, vterrors.Wrap(err, "cannot create stdout pipe") + } + cmdOriginalErr, err := cmd.StderrPipe() + if err != nil { + return false, vterrors.Wrap(err, "cannot create stderr pipe") + } + if err := cmd.Start(); err != nil { + return false, vterrors.Wrap(err, "can't start mysqlshell") + } + + pipeReader, pipeWriter := io.Pipe() + cmdErr := io.TeeReader(cmdOriginalErr, pipeWriter) + + cmdWg := &sync.WaitGroup{} + cmdWg.Add(3) + go releaseReadLock(ctx, pipeReader, params, cmdWg, lockAcquired) + go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done) + go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done) + + // Get exit status. + if err := cmd.Wait(); err != nil { + return false, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") + } + + // close the pipeWriter and wait for the goroutines to have read all the logs + pipeWriter.Close() + cmdWg.Wait() + + // open the MANIFEST + params.Logger.Infof("Writing backup MANIFEST") + mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + if err != nil { + return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) + } + defer closeFile(mwc, backupManifestFileName, params.Logger, &finalErr) + + // JSON-encode and write the MANIFEST + bm := &MySQLShellBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupMethod: mysqlShellBackupEngineName, + // the position is empty here because we have no way of capturing it from mysqlsh + // we will capture it when doing the restore as mysqlsh can replace the GTIDs with + // what it has stored in the backup. + Position: posBeforeBackup, + // PurgedPosition: posBeforeBackup, + BackupTime: FormatRFC3339(params.BackupTime.UTC()), + FinishedTime: FormatRFC3339(time.Now().UTC()), + // ServerUUID: serverUUID, + // TabletAlias: params.TabletAlias, + // Keyspace: params.Keyspace, + // Shard: params.Shard, + // MySQLVersion: mysqlVersion, + // UpgradeSafe: true, + }, + + // mysql shell backup specific fields + BackupLocation: location, + Params: mysqlShellLoadFlags, + } + + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return false, vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) + } + if _, err := mwc.Write([]byte(data)); err != nil { + return false, vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) + } + + params.Logger.Infof("Backup completed") + return true, nil +} + +func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) { + params.Logger.Infof("Calling ExecuteRestore for %s (DeleteBeforeRestore: %v)", params.DbName, params.DeleteBeforeRestore) + + shouldDeleteUsers, err := be.restorePreCheck(ctx, params) + if err != nil { + return nil, vterrors.Wrap(err, "failed restore precheck") + } + + var bm MySQLShellBackupManifest + if err := getBackupManifestInto(ctx, bh, &bm); err != nil { + return nil, err + } + + // mark restore as in progress + if err := createStateFile(params.Cnf); err != nil { + return nil, err + } + + // make sure semi-sync is disabled, otherwise we will wait forever for acknowledgements + err = params.Mysqld.SetSemiSyncEnabled(false, false) + if err != nil { + return nil, vterrors.Wrap(err, "disable semi-sync failed") + } + + params.Logger.Infof("restoring on an existing tablet, so dropping database %q", params.DbName) + + readonly, err := params.Mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("checking if mysqld has super_read_only=enable: %v", err)) + } + + if readonly { + resetFunc, err := params.Mysqld.SetSuperReadOnly(ctx, false) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("unable to disable super-read-only: %v", err)) + } + + defer func() { + err := resetFunc() + if err != nil { + params.Logger.Errorf("Not able to set super_read_only to its original value after restore") + } + }() + } + + err = cleanupMySQL(ctx, params, shouldDeleteUsers) + if err != nil { + log.Errorf(err.Error()) + // time.Sleep(time.Minute * 2) + return nil, vterrors.Wrap(err, "error cleaning MySQL") + } + + // we need to get rid of all the current replication information on the host. + err = params.Mysqld.ResetReplication(ctx) + if err != nil { + return nil, vterrors.Wrap(err, "unable to reset replication") + } + + // this is required so we can load the backup generated by MySQL Shell. we will disable it afterwards. + err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=1") + if err != nil { + return nil, vterrors.Wrap(err, "unable to set local_infile=1") + } + + if mysqlShellSpeedUpRestore { + // disable redo logging and double write buffer if we are configured to do so. + err = params.Mysqld.ExecuteSuperQuery(ctx, "ALTER INSTANCE DISABLE INNODB REDO_LOG") + if err != nil { + return nil, vterrors.Wrap(err, "unable to disable REDO_LOG") + } + params.Logger.Infof("Disabled REDO_LOG") + + defer func() { // re-enable once we are done with the restore. + err := params.Mysqld.ExecuteSuperQuery(ctx, "ALTER INSTANCE ENABLE INNODB REDO_LOG") + if err != nil { + params.Logger.Errorf("unable to re-enable REDO_LOG: %v", err) + } else { + params.Logger.Infof("Disabled REDO_LOG") + } + }() + } + + // we need to disable SuperReadOnly otherwise we won't be able to restore the backup properly. + // once the backups is complete, we will restore it to its previous state. + resetFunc, err := be.handleSuperReadOnly(ctx, params) + if err != nil { + return nil, vterrors.Wrap(err, "unable to disable super-read-only") + } + defer resetFunc() + + args := []string{} + + if mysqlShellFlags != "" { + args = append(args, strings.Fields(mysqlShellFlags)...) + } + + args = append(args, "-e", fmt.Sprintf("util.loadDump(%q, %s)", + bm.BackupLocation, + mysqlShellLoadFlags, + )) + + cmd := exec.CommandContext(ctx, "mysqlsh", args...) + + params.Logger.Infof("running %s", cmd.String()) + + cmdOut, err := cmd.StdoutPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create stdout pipe") + } + cmdErr, err := cmd.StderrPipe() + if err != nil { + return nil, vterrors.Wrap(err, "cannot create stderr pipe") + } + if err := cmd.Start(); err != nil { + return nil, vterrors.Wrap(err, "can't start xbstream") + } + + cmdWg := &sync.WaitGroup{} + cmdWg.Add(2) + go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done) + go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done) + cmdWg.Wait() + + // Get the exit status. + if err := cmd.Wait(); err != nil { + return nil, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") + } + params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName) + + // disable local_infile now that the restore is done. + err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=0") + if err != nil { + return nil, vterrors.Wrap(err, "unable to set local_infile=0") + } + params.Logger.Infof("set local_infile=0") + + params.Logger.Infof("Restore completed") + + return &bm.BackupManifest, nil +} + +// ShouldDrainForBackup satisfies the BackupEngine interface +// MySQL Shell backups can be taken while MySQL is running so we can control this via a flag. +func (be *MySQLShellBackupEngine) ShouldDrainForBackup() bool { + return mysqlShellBackupShouldDrain +} + +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +// Since MySQL Shell operates on a live MySQL instance, there is no need to start it once the restore is completed +func (be *MySQLShellBackupEngine) ShouldStartMySQLAfterRestore() bool { + return false +} + +func (be *MySQLShellBackupEngine) backupPreCheck(location string) error { + if mysqlShellBackupLocation == "" { + return fmt.Errorf("%w: no backup location set via --mysql-shell-backup-location", MySQLShellPreCheckError) + } + + if mysqlShellFlags == "" || !strings.Contains(mysqlShellFlags, "--js") { + return fmt.Errorf("%w: at least the --js flag is required in the value of the flag --mysql-shell-flags", MySQLShellPreCheckError) + } + + // make sure the targe directory exists if the target location for the backup is not an object store + // (e.g. is the local filesystem) as MySQL Shell doesn't create the entire path beforehand: + isObjectStorage := false + for _, objStore := range knownObjectStoreParams { + if strings.Contains(mysqlShellDumpFlags, objStore) { + isObjectStorage = true + break + } + } + + if !isObjectStorage { + err := os.MkdirAll(location, 0o750) + if err != nil { + return fmt.Errorf("failure creating directory %s: %w", location, err) + } + } + + return nil +} + +func (be *MySQLShellBackupEngine) restorePreCheck(ctx context.Context, params RestoreParams) (shouldDeleteUsers bool, err error) { + if mysqlShellFlags == "" { + return shouldDeleteUsers, fmt.Errorf("%w: at least the --js flag is required in the value of the flag --mysql-shell-flags", MySQLShellPreCheckError) + } + + loadFlags := map[string]interface{}{} + err = json.Unmarshal([]byte(mysqlShellLoadFlags), &loadFlags) + if err != nil { + return false, fmt.Errorf("%w: unable to parse JSON of load flags", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["updateGtidSet"]; !ok || val != "replace" { + return false, fmt.Errorf("%w: mysql-shell needs to restore with updateGtidSet set to \"replace\" to work with Vitess", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["progressFile"]; !ok || val != "" { + return false, fmt.Errorf("%w: \"progressFile\" needs to be empty as vitess always starts a restore from scratch", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["skipBinlog"]; !ok || val != true { + return false, fmt.Errorf("%w: \"skipBinlog\" needs to set to true", MySQLShellPreCheckError) + } + + if val, ok := loadFlags["loadUsers"]; ok && val == true { + shouldDeleteUsers = true + } + + return shouldDeleteUsers, nil +} + +func (be *MySQLShellBackupEngine) handleSuperReadOnly(ctx context.Context, params RestoreParams) (func(), error) { + readonly, err := params.Mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("checking if mysqld has super_read_only=enable: %v", err)) + } + + params.Logger.Infof("Is Super Read Only: %v", readonly) + + if readonly { + resetFunc, err := params.Mysqld.SetSuperReadOnly(ctx, false) + if err != nil { + return nil, vterrors.Wrap(err, fmt.Sprintf("unable to disable super-read-only: %v", err)) + } + + return func() { + err := resetFunc() + if err != nil { + params.Logger.Errorf("Not able to set super_read_only to its original value after restore") + } + }, nil + } + + return func() {}, nil +} + +// releaseReadLock will keep reading the MySQL Shell STDERR waiting until the point it has acquired its lock +func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, wg *sync.WaitGroup, lockAcquired time.Time) { + defer wg.Done() + + scanner := bufio.NewScanner(reader) + released := false + for scanner.Scan() { + line := scanner.Text() + + if !released { + + if !strings.Contains(line, "Global read lock has been released") { + continue + } + released = true + + params.Logger.Infof("mysql shell released its global read lock, doing the same") + + err := params.Mysqld.ReleaseGlobalReadLock(ctx) + if err != nil { + params.Logger.Errorf("unable to release global read lock: %v", err) + } + + params.Logger.Infof("global read lock released after %v", time.Since(lockAcquired)) + } + } + if err := scanner.Err(); err != nil { + params.Logger.Errorf("error reading from reader: %v", err) + } +} + +func cleanupMySQL(ctx context.Context, params RestoreParams, shouldDeleteUsers bool) error { + params.Logger.Infof("Cleaning up MySQL ahead of a restore") + result, err := params.Mysqld.FetchSuperQuery(ctx, "SHOW DATABASES") + if err != nil { + return err + } + + // drop all databases + for _, row := range result.Rows { + dbName := row[0].ToString() + if slices.Contains(internalDBs, dbName) { + continue // not dropping internal DBs + } + + params.Logger.Infof("Dropping DB %q", dbName) + err = params.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", row[0].ToString())) + if err != nil { + return fmt.Errorf("error droppping database %q: %w", row[0].ToString(), err) + } + } + + if shouldDeleteUsers { + // get current user + var currentUser string + result, err = params.Mysqld.FetchSuperQuery(ctx, "SELECT user()") + if err != nil { + return fmt.Errorf("error fetching current user: %w", err) + } + + for _, row := range result.Rows { + currentUser = row[0].ToString() + } + + // drop all users except reserved ones + result, err = params.Mysqld.FetchSuperQuery(ctx, "SELECT user, host FROM mysql.user") + if err != nil { + return err + } + + for _, row := range result.Rows { + user := fmt.Sprintf("%s@%s", row[0].ToString(), row[1].ToString()) + + if user == currentUser { + continue // we don't drop the current user + } + if slices.Contains(reservedUsers, user) { + continue // we skip reserved MySQL users + } + + params.Logger.Infof("Dropping User %q", user) + err = params.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("DROP USER '%s'@'%s'", row[0].ToString(), row[1].ToString())) + if err != nil { + return fmt.Errorf("error droppping user %q: %w", user, err) + } + } + } + + return err +} diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go new file mode 100644 index 00000000000..8713e5e5a9b --- /dev/null +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -0,0 +1,278 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "context" + "fmt" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/logutil" +) + +func TestMySQLShellBackupBackupPreCheck(t *testing.T) { + originalLocation := mysqlShellBackupLocation + originalFlags := mysqlShellFlags + defer func() { + mysqlShellBackupLocation = originalLocation + mysqlShellFlags = originalFlags + }() + + engine := MySQLShellBackupEngine{} + tests := []struct { + name string + location string + flags string + err error + }{ + { + "empty flags", + "", + `{}`, + MySQLShellPreCheckError, + }, + { + "only location", + "/dev/null", + "", + MySQLShellPreCheckError, + }, + { + "only flags", + "", + "--js", + MySQLShellPreCheckError, + }, + { + "both values present but without --js", + "", + "-h localhost", + MySQLShellPreCheckError, + }, + { + "supported values", + t.TempDir(), + "--js -h localhost", + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + mysqlShellBackupLocation = tt.location + mysqlShellFlags = tt.flags + assert.ErrorIs(t, engine.backupPreCheck(path.Join(mysqlShellBackupLocation, "test")), tt.err) + }) + } + +} + +func TestMySQLShellBackupRestorePreCheck(t *testing.T) { + original := mysqlShellLoadFlags + defer func() { mysqlShellLoadFlags = original }() + + engine := MySQLShellBackupEngine{} + tests := []struct { + name string + flags string + err error + shouldDeleteUsers bool + }{ + { + "empty load flags", + `{}`, + MySQLShellPreCheckError, + false, + }, + { + "only updateGtidSet", + `{"updateGtidSet": "replace"}`, + MySQLShellPreCheckError, + false, + }, + { + "only progressFile", + `{"progressFile": ""}`, + MySQLShellPreCheckError, + false, + }, + { + "both values but unsupported values", + `{"updateGtidSet": "append", "progressFile": "/tmp/test1"}`, + MySQLShellPreCheckError, + false, + }, + { + "supported values", + `{"updateGtidSet": "replace", "progressFile": "", "skipBinlog": true, "loadUsers": false}`, + nil, + false, + }, + { + "should delete users", + `{"updateGtidSet": "replace", "progressFile": "", "skipBinlog": true, "loadUsers": true}`, + nil, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mysqlShellLoadFlags = tt.flags + shouldDeleteUsers, err := engine.restorePreCheck(context.Background(), RestoreParams{}) + assert.ErrorIs(t, err, tt.err) + assert.Equal(t, tt.shouldDeleteUsers, shouldDeleteUsers) + }) + } + +} + +func TestShouldDrainForBackupMySQLShell(t *testing.T) { + original := mysqlShellBackupShouldDrain + defer func() { mysqlShellBackupShouldDrain = original }() + + engine := MySQLShellBackupEngine{} + + mysqlShellBackupShouldDrain = false + + assert.False(t, engine.ShouldDrainForBackup()) + + mysqlShellBackupShouldDrain = true + + assert.True(t, engine.ShouldDrainForBackup()) +} + +func TestCleanupMySQL(t *testing.T) { + type userRecord struct { + user, host string + } + + tests := []struct { + name string + existingDBs []string + expectedDropDBs []string + currentUser string + existingUsers []userRecord + expectedDropUsers []string + shouldDeleteUsers bool + }{ + { + name: "testing only specific DBs", + existingDBs: []string{"_vt", "vt_test"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + }, + { + name: "testing with internal dbs", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + }, + { + name: "with users but without delete", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{}, + shouldDeleteUsers: false, + }, + { + name: "with users and delete", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{"'test'@'localhost'", "'app'@'10.0.0.1'"}, + shouldDeleteUsers: true, + }, + { + name: "with reserved users", + existingDBs: []string{"_vt", "mysql", "vt_test", "performance_schema"}, + expectedDropDBs: []string{"_vt", "vt_test"}, + existingUsers: []userRecord{ + {"mysql.sys", "localhost"}, + {"mysql.infoschema", "localhost"}, + {"mysql.session", "localhost"}, + {"test", "localhost"}, + {"app", "10.0.0.1"}, + }, + expectedDropUsers: []string{"'test'@'localhost'", "'app'@'10.0.0.1'"}, + shouldDeleteUsers: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysql := NewFakeMysqlDaemon(fakedb) + defer mysql.Close() + + databases := [][]sqltypes.Value{} + for _, db := range tt.existingDBs { + databases = append(databases, []sqltypes.Value{sqltypes.NewVarChar(db)}) + } + + users := [][]sqltypes.Value{} + for _, record := range tt.existingUsers { + users = append(users, []sqltypes.Value{sqltypes.NewVarChar(record.user), sqltypes.NewVarChar(record.host)}) + } + + mysql.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW DATABASES": {Rows: databases}, + "SELECT user()": {Rows: [][]sqltypes.Value{{sqltypes.NewVarChar(tt.currentUser)}}}, + "SELECT user, host FROM mysql.user": {Rows: users}, + } + + for _, drop := range tt.expectedDropDBs { + mysql.ExpectedExecuteSuperQueryList = append(mysql.ExpectedExecuteSuperQueryList, + fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", drop), + ) + } + + if tt.shouldDeleteUsers { + for _, drop := range tt.expectedDropUsers { + mysql.ExpectedExecuteSuperQueryList = append(mysql.ExpectedExecuteSuperQueryList, + fmt.Sprintf("DROP USER %s", drop), + ) + } + } + + params := RestoreParams{ + Mysqld: mysql, + Logger: logutil.NewMemoryLogger(), + } + + err := cleanupMySQL(context.Background(), params, tt.shouldDeleteUsers) + require.NoError(t, err) + + require.Equal(t, len(tt.expectedDropDBs)+len(tt.expectedDropUsers), mysql.ExpectedExecuteSuperQueryCurrent, + "unexpected number of queries executed") + }) + } + +} diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go index 311828b4535..69799e7c717 100644 --- a/go/vt/mysqlctl/query.go +++ b/go/vt/mysqlctl/query.go @@ -17,6 +17,7 @@ limitations under the License. package mysqlctl import ( + "errors" "fmt" "strings" "time" @@ -228,6 +229,42 @@ func (mysqld *Mysqld) fetchStatuses(ctx context.Context, pattern string) (map[st return varMap, nil } +// ExecuteSuperQuery allows the user to execute a query as a super user. +func (mysqld *Mysqld) AcquireGlobalReadLock(ctx context.Context) error { + if mysqld.lockConn != nil { + return errors.New("lock already acquired") + } + + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) + if err != nil { + return err + } + + err = mysqld.executeSuperQueryListConn(ctx, conn, []string{"FLUSH TABLES WITH READ LOCK"}) + if err != nil { + conn.Recycle() + return err + } + + mysqld.lockConn = conn + return nil +} + +func (mysqld *Mysqld) ReleaseGlobalReadLock(ctx context.Context) error { + if mysqld.lockConn == nil { + return errors.New("no read locks acquired yet") + } + + err := mysqld.executeSuperQueryListConn(ctx, mysqld.lockConn, []string{"UNLOCK TABLES"}) + if err != nil { + return err + } + + mysqld.lockConn.Recycle() + mysqld.lockConn = nil + return nil +} + const ( masterPasswordStart = " MASTER_PASSWORD = '" masterPasswordEnd = "',\n" diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 3c866019c63..28970e1362d 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -38,6 +38,8 @@ import ( "vitess.io/vitess/go/vt/log" ) +type ResetSuperReadOnlyFunc func() error + // WaitForReplicationStart waits until the deadline for replication to start. // This validates the current primary is correct and can be connected to. func WaitForReplicationStart(mysqld MysqlDaemon, replicaStartDeadline int) error { @@ -231,6 +233,23 @@ func (mysqld *Mysqld) IsReadOnly() (bool, error) { return false, nil } +// IsSuperReadOnly return true if the instance is super read only +func (mysqld *Mysqld) IsSuperReadOnly(ctx context.Context) (bool, error) { + qr, err := mysqld.FetchSuperQuery(ctx, "SELECT @@global.super_read_only") + if err != nil { + return false, err + } + + if len(qr.Rows) == 1 { + sro := qr.Rows[0][0].ToString() + if sro == "1" || sro == "ON" { + return true, nil + } + } + + return false, nil +} + // SetReadOnly set/unset the read_only flag func (mysqld *Mysqld) SetReadOnly(on bool) error { query := "SET GLOBAL read_only = " @@ -242,15 +261,52 @@ func (mysqld *Mysqld) SetReadOnly(on bool) error { return mysqld.ExecuteSuperQuery(context.TODO(), query) } -// SetSuperReadOnly set/unset the super_read_only flag -func (mysqld *Mysqld) SetSuperReadOnly(on bool) error { +// SetSuperReadOnly set/unset the super_read_only flag. +// Returns a function which is called to set super_read_only back to its original value. +func (mysqld *Mysqld) SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) { + // return function for switching `OFF` super_read_only + var resetFunc ResetSuperReadOnlyFunc + var disableFunc = func() error { + query := "SET GLOBAL super_read_only = 'OFF'" + err := mysqld.ExecuteSuperQuery(context.Background(), query) + return err + } + + // return function for switching `ON` super_read_only. + var enableFunc = func() error { + query := "SET GLOBAL super_read_only = 'ON'" + err := mysqld.ExecuteSuperQuery(context.Background(), query) + return err + } + + superReadOnlyEnabled, err := mysqld.IsSuperReadOnly(ctx) + if err != nil { + return nil, err + } + + // If non-idempotent then set the right call-back. + // We are asked to turn on super_read_only but original value is false, + // therefore return disableFunc, that can be used as defer by caller. + if on && !superReadOnlyEnabled { + resetFunc = disableFunc + } + // We are asked to turn off super_read_only but original value is true, + // therefore return enableFunc, that can be used as defer by caller. + if !on && superReadOnlyEnabled { + resetFunc = enableFunc + } + query := "SET GLOBAL super_read_only = " if on { - query += "ON" + query += "'ON'" } else { - query += "OFF" + query += "'OFF'" } - return mysqld.ExecuteSuperQuery(context.TODO(), query) + if err := mysqld.ExecuteSuperQuery(context.Background(), query); err != nil { + return nil, err + } + + return resetFunc, nil } // WaitSourcePos lets replicas wait to given replication position diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 38f29cee699..77250afce39 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -719,23 +719,6 @@ func findReplicationPosition(input, flavor string, logger logutil.Logger) (mysql return replicationPosition, nil } -// scanLinesToLogger scans full lines from the given Reader and sends them to -// the given Logger until EOF. -func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) { - defer doneFunc() - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - line := scanner.Text() - logger.Infof("%s: %s", prefix, line) - } - if err := scanner.Err(); err != nil { - // This is usually run in a background goroutine, so there's no point - // returning an error. Just log it. - logger.Warningf("error scanning lines from %s: %v", prefix, err) - } -} - func stripeFileName(baseFileName string, index int) string { return fmt.Sprintf("%s-%03d", baseFileName, index) } @@ -899,6 +882,11 @@ func (be *XtrabackupEngine) ShouldDrainForBackup() bool { return false } +// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed. +func (be *XtrabackupEngine) ShouldStartMySQLAfterRestore() bool { + return true +} + func init() { BackupRestoreEngineMap[xtrabackupEngineName] = &XtrabackupEngine{} } diff --git a/go/vt/vttablet/tabletmanager/rpc_query_test.go b/go/vt/vttablet/tabletmanager/rpc_query_test.go index f6167e24917..87a64b2d8b7 100644 --- a/go/vt/vttablet/tabletmanager/rpc_query_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_query_test.go @@ -27,7 +27,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/vttablet/tabletservermock" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -38,7 +38,7 @@ func TestTabletManager_ExecuteFetchAsDba(t *testing.T) { cp := mysql.ConnParams{} db := fakesqldb.New(t) db.AddQueryPattern(".*", &sqltypes.Result{}) - daemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + daemon := mysqlctl.NewFakeMysqlDaemon(db) dbName := " escap`e me " tm := &TabletManager{ diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 2adde1a1bed..b6c54d6a526 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -357,7 +357,7 @@ func (tm *TabletManager) InitPrimary(ctx context.Context, semiSync bool) (string if setSuperReadOnly { // Setting super_read_only off so that we can run the DDL commands - if err := tm.MysqlDaemon.SetSuperReadOnly(false); err != nil { + if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, false); err != nil { if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { log.Warningf("server does not know about super_read_only, continuing anyway...") } else { @@ -561,7 +561,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // idempotent. if setSuperReadOnly { // Setting super_read_only also sets read_only - if err := tm.MysqlDaemon.SetSuperReadOnly(true); err != nil { + if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, true); err != nil { if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) { log.Warningf("server does not know about super_read_only, continuing anyway...") } else { diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 0aac8c971ec..d69a8414927 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo/memorytopo" ) @@ -46,7 +46,7 @@ func TestPromoteReplicaReplicationManagerSuccess(t *testing.T) { numTicksRan++ }) // Set the promotion lag to a second and then run PromoteReplica - tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteLag = time.Second + tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon).PromoteLag = time.Second _, err := tm.PromoteReplica(ctx, false) require.NoError(t, err) // At the end we expect the replication manager to be stopped. @@ -68,7 +68,7 @@ func TestPromoteReplicaReplicationManagerFailure(t *testing.T) { require.True(t, tm.replManager.ticks.Running()) // Set the promotion lag to a second and then run PromoteReplica - tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteError = fmt.Errorf("promote error") + tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon).PromoteError = fmt.Errorf("promote error") _, err := tm.PromoteReplica(ctx, false) require.Error(t, err) // At the end we expect the replication manager to be stopped. diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go index 56ebbbd32e1..af93b6b73fd 100644 --- a/go/vt/vttablet/tabletmanager/tm_init_test.go +++ b/go/vt/vttablet/tabletmanager/tm_init_test.go @@ -33,7 +33,7 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -379,7 +379,7 @@ func TestCheckPrimaryShip(t *testing.T) { tablet.Type = topodatapb.TabletType_REPLICA tablet.PrimaryTermStartTime = nil // Get the fakeMySQL and set it up to expect a set replication source command - fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon) + fakeMysql := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort)) fakeMysql.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE", @@ -638,7 +638,7 @@ func TestGetBuildTags(t *testing.T) { } } -func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon { +func newTestMysqlDaemon(t *testing.T, port int32) *mysqlctl.FakeMysqlDaemon { t.Helper() db := fakesqldb.New(t) @@ -659,7 +659,7 @@ func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaem db.AddQueryPattern("UPDATE _vt\\.(local|shard)_metadata SET db_name='.+' WHERE db_name=''", &sqltypes.Result{}) db.AddQueryPattern("INSERT INTO _vt\\.local_metadata \\(.+\\) VALUES \\(.+\\) ON DUPLICATE KEY UPDATE value ?= ?'.+'.*", &sqltypes.Result{}) - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db) + mysqld := mysqlctl.NewFakeMysqlDaemon(db) mysqld.MysqlPort = sync2.NewAtomicInt32(port) return mysqld diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go index 48e2123554f..537580d4853 100644 --- a/go/vt/vttablet/tabletmanager/tm_state_test.go +++ b/go/vt/vttablet/tabletmanager/tm_state_test.go @@ -28,7 +28,7 @@ import ( "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/faketopo" @@ -105,7 +105,7 @@ func TestStateDenyList(t *testing.T) { tm := newTestTM(t, ts, 1, "ks", "0") defer tm.Stop() - fmd := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon) + fmd := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon) fmd.Schema = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ Name: "t1", diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index ebea9b8225f..4886d595d10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -23,12 +23,12 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" "vitess.io/vitess/go/vt/mysqlctl/tmutils" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -87,7 +87,7 @@ func TestControllerKeyRange(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) @@ -124,7 +124,7 @@ func TestControllerTables(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{ + mysqld := &mysqlctl.FakeMysqlDaemon{ MysqlPort: sync2.NewAtomicInt32(3306), Schema: &tabletmanagerdatapb.SchemaDefinition{ DatabaseSchema: "", @@ -219,7 +219,7 @@ func TestControllerOverrides(t *testing.T) { dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) @@ -291,7 +291,7 @@ func TestControllerRetry(t *testing.T) { dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil) dbClient.ExpectRequest("commit", nil, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, vre) @@ -350,7 +350,7 @@ func TestControllerStopPosition(t *testing.T) { dbClient.ExpectRequest("update _vt.vreplication set state='Stopped', message='Reached stopping position, done playing logs' where id=1", testDMLResponse, nil) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(nil, wantTablet.GetAlias().Cell, mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, vre) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index 4c57f21dca1..0b61d732ce9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -32,7 +32,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" ) func TestEngineOpen(t *testing.T) { @@ -42,7 +42,7 @@ func TestEngineOpen(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) require.False(t, vre.IsOpen()) @@ -82,7 +82,7 @@ func TestEngineOpenRetry(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -143,7 +143,7 @@ func TestEngineExec(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} // Test Insert @@ -305,7 +305,7 @@ func TestEngineBadInsert(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -333,7 +333,7 @@ func TestEngineSelect(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -366,7 +366,7 @@ func TestWaitForPos(t *testing.T) { waitRetryTime = 10 * time.Millisecond dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -394,7 +394,7 @@ func TestWaitForPos(t *testing.T) { func TestWaitForPosError(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -430,7 +430,7 @@ func TestWaitForPosError(t *testing.T) { func TestWaitForPosCancel(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} dbClientFactory := func() binlogplayer.DBClient { return dbClient } vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil) @@ -474,7 +474,7 @@ func TestCreateDBAndTable(t *testing.T) { resetBinlogClient() dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} // Test Insert @@ -570,7 +570,7 @@ func TestGetDBClient(t *testing.T) { dbClientFactoryDba := func() binlogplayer.DBClient { return dbClientDba } dbClientFactoryFiltered := func() binlogplayer.DBClient { return dbClientFiltered } - mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} + mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)} vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactoryFiltered, dbClientFactoryDba, dbClientDba.DBName(), nil) shouldBeDbaClient := vre.getDBClient(true /*runAsAdmin*/) diff --git a/go/vt/vttablet/tabletserver/repltracker/poller_test.go b/go/vt/vttablet/tabletserver/repltracker/poller_test.go index 3dc27c771ca..e0734118160 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller_test.go @@ -23,12 +23,12 @@ import ( "github.com/stretchr/testify/assert" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" ) func TestPoller(t *testing.T) { poller := &poller{} - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) poller.InitDBConfig(mysqld) mysqld.ReplicationStatusError = errors.New("err") diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go index 0695a079b82..33fe1a39146 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go @@ -25,7 +25,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -47,7 +47,7 @@ func TestReplTracker(t *testing.T) { Uid: 1, } target := &querypb.Target{} - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) rt := NewReplTracker(env, alias) rt.InitDBConfig(target, mysqld) @@ -143,7 +143,7 @@ func TestStatusHeartbeatFallBack(t *testing.T) { Cell: "cell", Uid: 1, } - mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil) + mysqld := mysqlctl.NewFakeMysqlDaemon(nil) mysqld.ReplicationLagSeconds = theCase.mysqldLag mysqld.Replicating = true mysqld.ReplicationStatusError = theCase.mysqldErr diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go index 254e1813d8d..37aeeed6fa3 100644 --- a/go/vt/wrangler/fake_tablet_test.go +++ b/go/vt/wrangler/fake_tablet_test.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/grpctmserver" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -75,7 +75,7 @@ type fakeTablet struct { // We also create the RPCServer, so users can register more services // before calling StartActionLoop(). Tablet *topodatapb.Tablet - FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon + FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon RPCServer *grpc.Server // The following fields are created when we start the event loop for @@ -144,7 +144,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy } // create a FakeMysqlDaemon with the right information by default - fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db) fakeMysqlDaemon.MysqlPort.Set(mysqlPort) return &fakeTablet{ diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go index 99c43015d3e..5c85e37d43a 100644 --- a/go/vt/wrangler/testlib/fake_tablet.go +++ b/go/vt/wrangler/testlib/fake_tablet.go @@ -34,7 +34,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/grpctmserver" @@ -69,7 +69,7 @@ type FakeTablet struct { // We also create the RPCServer, so users can register more services // before calling StartActionLoop(). Tablet *topodatapb.Tablet - FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon + FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon RPCServer *grpc.Server // The following fields are created when we start the event loop for @@ -159,7 +159,7 @@ func NewFakeTablet(t *testing.T, wr *wrangler.Wrangler, cell string, uid uint32, } // create a FakeMysqlDaemon with the right information by default - fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db) + fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db) fakeMysqlDaemon.MysqlPort.Set(mysqlPort) return &FakeTablet{