Skip to content

Commit

Permalink
backport logical backups
Browse files Browse the repository at this point in the history
Signed-off-by: 'Renan Rangel' <[email protected]>
  • Loading branch information
rvrangel committed Oct 16, 2024
1 parent f552c6f commit 4851511
Show file tree
Hide file tree
Showing 25 changed files with 1,086 additions and 97 deletions.
4 changes: 2 additions & 2 deletions go/test/fuzzing/tablet_manager_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"

Expand All @@ -42,7 +42,7 @@ func FuzzTabletManagerExecuteFetchAsDba(data []byte) int {
cp := mysql.ConnParams{}
db := fakesqldb.New(t)
db.AddQueryPattern(".*", &sqltypes.Result{})
daemon := fakemysqldaemon.NewFakeMysqlDaemon(db)
daemon := mysqlctl.NewFakeMysqlDaemon(db)

dbName := "dbname"
tm := &tabletmanager.TabletManager{
Expand Down
40 changes: 24 additions & 16 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package mysqlctl

import (
"bufio"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"

"context"
Expand Down Expand Up @@ -352,21 +354,6 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
return nil, err
}

// We disable super_read_only, in case it is in the default MySQL startup
// parameters and will be blocking the writes we need to do in
// PopulateMetadataTables(). We do it blindly, since
// this will fail on MariaDB, which doesn't have super_read_only
// This is safe, since we're restarting MySQL after the restore anyway
params.Logger.Infof("Restore: disabling super_read_only")
if err := params.Mysqld.SetSuperReadOnly(false); err != nil {
if strings.Contains(err.Error(), strconv.Itoa(mysql.ERUnknownSystemVariable)) {
params.Logger.Warningf("Restore: server does not know about super_read_only, continuing anyway...")
} else {
params.Logger.Errorf("Restore: unexpected error while trying to set super_read_only: %v", err)
return nil, err
}
}

params.Logger.Infof("Restore: running mysql_upgrade")
if err := params.Mysqld.RunMysqlUpgrade(); err != nil {
return nil, vterrors.Wrap(err, "mysql_upgrade failed")
Expand Down Expand Up @@ -403,3 +390,24 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
restoreDuration.Set(int64(time.Since(startTs).Seconds()))
return manifest, nil
}

// scanLinesToLogger scans full lines from the given Reader and sends them to
// the given Logger until EOF.
func scanLinesToLogger(prefix string, reader io.Reader, logger logutil.Logger, doneFunc func()) {
defer doneFunc()

scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
logger.Infof("%s: %s", prefix, line)
}
if err := scanner.Err(); err != nil {
// This is usually run in a background goroutine, so there's no point
// returning an error. Just log it.
logger.Warningf("error scanning lines from %s: %v", prefix, err)
}
}

func FormatRFC3339(t time.Time) string {
return t.Format(time.RFC3339)
}
29 changes: 29 additions & 0 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ limitations under the License.
package mysqlctl

import (
"fmt"
"io"
"os"
"path"
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/logutil"
)

func TestFindFilesToBackupWithoutRedoLog(t *testing.T) {
Expand Down Expand Up @@ -214,3 +220,26 @@ type forTest []FileEntry
func (f forTest) Len() int { return len(f) }
func (f forTest) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
func (f forTest) Less(i, j int) bool { return f[i].Base+f[i].Name < f[j].Base+f[j].Name }

func TestScanLinesToLogger(t *testing.T) {
reader, writer := io.Pipe()
logger := logutil.NewMemoryLogger()
var wg sync.WaitGroup

wg.Add(1)
go scanLinesToLogger("test", reader, logger, wg.Done)

for i := range [100]int{} {
_, err := writer.Write([]byte(fmt.Sprintf("foobar %d\n", i)))
require.NoError(t, err)
}

writer.Close()
wg.Wait()

require.Equal(t, 100, len(logger.Events))

for i, event := range logger.Events {
require.Equal(t, fmt.Sprintf("test: foobar %d", i), event.Value)
}
}
1 change: 1 addition & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type RestoreParams struct {
// Returns the manifest of a backup if successful, otherwise returns an error
type RestoreEngine interface {
ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error)
ShouldStartMySQLAfterRestore() bool
}

// BackupRestoreEngine is a combination of BackupEngine and RestoreEngine.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,11 @@ func (be *BuiltinBackupEngine) ShouldDrainForBackup() bool {
return true
}

// ShouldStartMySQLAfterRestore signifies if this backup engine needs to restart MySQL once the restore is completed.
func (be *BuiltinBackupEngine) ShouldStartMySQLAfterRestore() bool {
return true
}

func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, ts *topo.Server, keyspace, shard string) (mysql.Position, error) {
si, err := ts.GetShard(ctx, keyspace, shard)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions go/vt/mysqlctl/builtinbackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/mysqlctl/filebackupstorage"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
Expand Down Expand Up @@ -104,7 +103,7 @@ func TestExecuteBackup(t *testing.T) {

// Spin up a fake daemon to be used in backups. It needs to be allowed to receive:
// "STOP SLAVE", "START SLAVE", in that order.
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(fakesqldb.New(t))
mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t))
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"}
// mysqld.ShutdownTime = time.Minute

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fakemysqldaemon
package mysqlctl

import (
"errors"
"fmt"
"reflect"
"strings"
Expand All @@ -30,7 +31,6 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -171,6 +171,9 @@ type FakeMysqlDaemon struct {
// TimeoutHook is a func that can be called at the beginning of any method to fake a timeout.
// all a test needs to do is make it { return context.DeadlineExceeded }
TimeoutHook func() error

// Version is the version that will be returned by GetVersionString.
Version string
}

// NewFakeMysqlDaemon returns a FakeMysqlDaemon where mysqld appears
Expand All @@ -181,6 +184,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
db: db,
Running: true,
IOThreadRunning: true,
Version: "8.0.32",
}
if db != nil {
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", 5, time.Minute, 0)
Expand All @@ -190,7 +194,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
}

// Start is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysqldArgs ...string) error {
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
return fmt.Errorf("fake mysql daemon already running")
}
Expand All @@ -208,7 +212,7 @@ func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysq
}

// Shutdown is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *mysqlctl.Mycnf, waitForMysqld bool) error {
func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *Mycnf, waitForMysqld bool) error {
if !fmd.Running {
return fmt.Errorf("fake mysql daemon not running")
}
Expand All @@ -231,17 +235,17 @@ func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error {
}

// ReinitConfig is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error {
func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *Mycnf) error {
return nil
}

// RefreshConfig is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error {
func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *Mycnf) error {
return nil
}

// Wait is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *mysqlctl.Mycnf) error {
func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *Mycnf) error {
return nil
}

Expand Down Expand Up @@ -345,17 +349,22 @@ func (fmd *FakeMysqlDaemon) IsReadOnly() (bool, error) {
return fmd.ReadOnly, nil
}

// IsSuperReadOnly is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) IsSuperReadOnly(ctx context.Context) (bool, error) {
return fmd.SuperReadOnly, nil
}

// SetReadOnly is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) SetReadOnly(on bool) error {
fmd.ReadOnly = on
return nil
}

// SetSuperReadOnly is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) error {
// SetSuperReadOnly is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error) {
fmd.SuperReadOnly = on
fmd.ReadOnly = on
return nil
return nil, nil
}

// StartReplication is part of the MysqlDaemon interface.
Expand Down Expand Up @@ -467,6 +476,11 @@ func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Posit
return fmd.PromoteResult, nil
}

// ExecuteSuperQuery is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ExecuteSuperQuery(ctx context.Context, query string) error {
return fmd.ExecuteSuperQueryList(ctx, []string{query})
}

// ExecuteSuperQueryList is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ExecuteSuperQueryList(ctx context.Context, queryList []string) error {
for _, query := range queryList {
Expand Down Expand Up @@ -667,3 +681,13 @@ func (fmd *FakeMysqlDaemon) GetVersionString() string {
func (fmd *FakeMysqlDaemon) GetVersionComment(ctx context.Context) string {
return ""
}

// AcquireGlobalReadLock is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error {
return errors.New("not implemented")
}

// ReleaseGlobalReadLock is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error {
return errors.New("not implemented")
}
13 changes: 12 additions & 1 deletion go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type MysqlDaemon interface {
ResetReplication(ctx context.Context) error
PrimaryPosition() (mysql.Position, error)
IsReadOnly() (bool, error)
IsSuperReadOnly(ctx context.Context) (bool, error)
SetReadOnly(on bool) error
SetSuperReadOnly(on bool) error
SetSuperReadOnly(ctx context.Context, on bool) (ResetSuperReadOnlyFunc, error)
SetReplicationPosition(ctx context.Context, pos mysql.Position) error
SetReplicationSource(ctx context.Context, host string, port int, stopReplicationBefore bool, startReplicationAfter bool) error
WaitForReparentJournal(ctx context.Context, timeCreatedNS int64) error
Expand Down Expand Up @@ -103,6 +104,9 @@ type MysqlDaemon interface {
// GetVersionComment returns the version comment
GetVersionComment(ctx context.Context) string

// ExecuteSuperQuery executes a single query, no result
ExecuteSuperQuery(ctx context.Context, query string) error

// ExecuteSuperQueryList executes a list of queries, no result
ExecuteSuperQueryList(ctx context.Context, queryList []string) error

Expand All @@ -115,6 +119,13 @@ type MysqlDaemon interface {
// DisableBinlogPlayback disable playback of binlog events
DisableBinlogPlayback() error

// AcquireGlobalReadLock acquires a global read lock and keeps the connection so
// as to release it with the function below.
AcquireGlobalReadLock(ctx context.Context) error

// ReleaseGlobalReadLock release a lock acquired with the connection from the above function.
ReleaseGlobalReadLock(ctx context.Context) error

// Close will close this instance of Mysqld. It will wait for all dba
// queries to be finished.
Close()
Expand Down
7 changes: 4 additions & 3 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ const maxLogFileSampleSize = 4096

// Mysqld is the object that represents a mysqld daemon running on this server.
type Mysqld struct {
dbcfgs *dbconfigs.DBConfigs
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool
dbcfgs *dbconfigs.DBConfigs
dbaPool *dbconnpool.ConnectionPool
appPool *dbconnpool.ConnectionPool
lockConn *dbconnpool.PooledDBConnection

capabilities capabilitySet

Expand Down
Loading

0 comments on commit 4851511

Please sign in to comment.