Skip to content

Commit

Permalink
WIP ShowReplicationStatus with context support
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Nov 8, 2023
1 parent 1a9119d commit d325dce
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 19 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtbackup/cli/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

lastStatus = status
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatusWithContext(ctx)
if statusErr != nil {
log.Warningf("Error getting replication status: %v", statusErr)
continue
Expand Down Expand Up @@ -557,7 +557,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}

// Did we make any progress?
status, statusErr = mysqld.ReplicationStatus()
status, statusErr = mysqld.ReplicationStatusWithContext(ctx)
if statusErr != nil {
return fmt.Errorf("can't get replication status: %v", err)
}
Expand Down
21 changes: 21 additions & 0 deletions go/mysql/endtoend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,28 @@ func TestReplicationStatus(t *testing.T) {

status, err := conn.ShowReplicationStatus()
assert.Equal(t, mysql.ErrNotReplica, err, "Got unexpected result for ShowReplicationStatus: %v %v", status, err)
}

func TestReplicationStatusWithMysqlHang(t *testing.T) {
params := connParams
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

conn, err := mysql.Connect(ctx, &params)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

err = cluster.SimulateMySQLHang()
require.NoError(t, err)

defer cluster.StopSimulateMySQLHang()

status, err := conn.ShowReplicationStatusWithContext(ctx)
assert.Equal(t, ctx.Err().Error(), "context deadline exceeded")
assert.Equal(t, ctx.Err(), err, "Got unexpected result for ShowReplicationStatus: %v %v", status, err)
assert.True(t, conn.IsClosed())
}

func TestSessionTrackGTIDs(t *testing.T) {
Expand Down
12 changes: 11 additions & 1 deletion go/mysql/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

var (
connParams mysql.ConnParams
cluster vttest.LocalCluster
)

// assertSQLError makes sure we get the right error.
Expand Down Expand Up @@ -200,8 +201,17 @@ ssl-key=%v/server-key.pem
OnlyMySQL: true,
ExtraMyCnf: []string{extraMyCnf, maxPacketMyCnf},
}
cluster := vttest.LocalCluster{

env, err := vttest.NewLocalTestEnv(0)
if err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}
env.EnableToxiproxy = true

cluster = vttest.LocalCluster{
Config: cfg,
Env: env,
}
if err := cluster.Setup(); err != nil {
fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err)
Expand Down
30 changes: 29 additions & 1 deletion go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,35 @@ func resultToMap(qr *sqltypes.Result) (map[string]string, error) {
// ShowReplicationStatus executes the right command to fetch replication status,
// and returns a parsed Position with other fields.
func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) {
return c.flavor.status(c)
return c.ShowReplicationStatusWithContext(context.TODO())
}

func (c *Conn) ShowReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) {
result := make(chan replication.ReplicationStatus, 1)
errors := make(chan error, 1)

go func() {
res, err := c.flavor.status(c)
if err != nil {
errors <- err
} else {
result <- res
}
}()

for {
select {
case <-ctx.Done():
c.Close()
return replication.ReplicationStatus{}, ctx.Err()

case err := <-errors:
return replication.ReplicationStatus{}, err

case res := <-result:
return res, nil
}
}
}

// ShowPrimaryStatus executes the right SHOW MASTER STATUS command,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac

// See if we need to restart replication after backup.
params.Logger.Infof("getting current replication status")
replicaStatus, err := params.Mysqld.ReplicationStatus()
replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx)
switch err {
case nil:
replicaStartRequired = replicaStatus.Healthy() && !DisableActiveReparents
Expand Down Expand Up @@ -436,7 +436,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac
if err := params.Mysqld.StopReplication(params.HookExtraEnv); err != nil {
return false, vterrors.Wrapf(err, "can't stop replica")
}
replicaStatus, err := params.Mysqld.ReplicationStatus()
replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx)
if err != nil {
return false, vterrors.Wrap(err, "can't get replica status")
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac
if err := ctx.Err(); err != nil {
return usable, err
}
status, err := params.Mysqld.ReplicationStatus()
status, err := params.Mysqld.ReplicationStatusWithContext(ctx)
if err != nil {
return usable, err
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (replication.ReplicationStatus,
}, nil
}

func (fmd *FakeMysqlDaemon) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) {
return fmd.ReplicationStatus()
}

// PrimaryStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) {
if fmd.PrimaryStatusError != nil {
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type MysqlDaemon interface {
StopReplication(hookExtraEnv map[string]string) error
StopIOThread(ctx context.Context) error
ReplicationStatus() (replication.ReplicationStatus, error)
ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error)
PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error)
GetGTIDPurged(ctx context.Context) (replication.Position, error)
SetSemiSyncEnabled(source, replica bool) error
Expand Down
28 changes: 25 additions & 3 deletions go/vt/mysqlctl/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,42 @@ func getPoolReconnect(ctx context.Context, pool *dbconnpool.ConnectionPool) (*db
if err != nil {
return conn, err
}
// Run a test query to see if this connection is still good.
if _, err := conn.Conn.ExecuteFetch("SELECT 1", 1, false); err != nil {

errChan := make(chan error, 1)
resultChan := make(chan *sqltypes.Result, 1)

go func() {
result, err := conn.Conn.ExecuteFetch("SELECT 1", 1, false)
if err != nil {
errChan <- err
} else {
resultChan <- result
}
}()

select {
case <-ctx.Done():
conn.Close()
conn.Recycle()
return nil, ctx.Err()

case err := <-errChan:
// If we get a connection error, try to reconnect.
if sqlErr, ok := err.(*sqlerror.SQLError); ok && (sqlErr.Number() == sqlerror.CRServerGone || sqlErr.Number() == sqlerror.CRServerLost) {
if err := conn.Conn.Reconnect(ctx); err != nil {
conn.Recycle()
return nil, err
}

return conn, nil
}

conn.Recycle()
return nil, err

case <-resultChan:
return conn, nil
}
return conn, nil
}

// ExecuteSuperQuery allows the user to execute a query as a super user.
Expand Down
10 changes: 7 additions & 3 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,17 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P

// ReplicationStatus returns the server replication status
func (mysqld *Mysqld) ReplicationStatus() (replication.ReplicationStatus, error) {
conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool)
return mysqld.ReplicationStatusWithContext(context.TODO())
}

func (mysqld *Mysqld) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) {
conn, err := getPoolReconnect(ctx, mysqld.dbaPool)
if err != nil {
return replication.ReplicationStatus{}, err
}
defer conn.Recycle()

return conn.Conn.ShowReplicationStatus()
defer conn.Recycle()
return conn.Conn.ShowReplicationStatusWithContext(ctx)
}

// PrimaryStatus returns the primary replication statuses
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (tm *TabletManager) startReplication(ctx context.Context, pos replication.P
if err := ctx.Err(); err != nil {
return err
}
status, err := tm.MysqlDaemon.ReplicationStatus()
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err != nil {
return vterrors.Wrap(err, "can't get replication status")
}
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

// ReplicationStatus returns the replication status
func (tm *TabletManager) ReplicationStatus(ctx context.Context) (*replicationdatapb.Status, error) {
status, err := tm.MysqlDaemon.ReplicationStatus()
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +61,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
}

// Replication status - "SHOW REPLICA STATUS"
replicationStatus, err := tm.MysqlDaemon.ReplicationStatus()
replicationStatus, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
var replicationStatusProto *replicationdatapb.Status
if err != nil && err != mysql.ErrNotReplica {
return nil, err
Expand Down Expand Up @@ -635,7 +635,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
// See if we were replicating at all, and should be replicating.
wasReplicating := false
shouldbeReplicating := false
status, err := tm.MysqlDaemon.ReplicationStatus()
status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err == mysql.ErrNotReplica {
// This is a special error that means we actually succeeded in reading
// the status, but the status is empty because replication is not
Expand Down Expand Up @@ -758,7 +758,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe
// Get the status before we stop replication.
// Doing this first allows us to return the status in the case that stopping replication
// returns an error, so a user can optionally inspect the status before a stop was called.
rs, err := tm.MysqlDaemon.ReplicationStatus()
rs, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err != nil {
return StopReplicationAndGetStatusResponse{}, vterrors.Wrap(err, "before status failed")
}
Expand Down Expand Up @@ -800,7 +800,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe
}

// Get the status after we stop replication so we have up to date position and relay log positions.
rsAfter, err := tm.MysqlDaemon.ReplicationStatus()
rsAfter, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx)
if err != nil {
return StopReplicationAndGetStatusResponse{
Status: &replicationdatapb.StopReplicationStatus{
Expand Down

0 comments on commit d325dce

Please sign in to comment.