diff --git a/go/cmd/vtbackup/cli/vtbackup.go b/go/cmd/vtbackup/cli/vtbackup.go index 121ba39b8c5..9d9138c5756 100644 --- a/go/cmd/vtbackup/cli/vtbackup.go +++ b/go/cmd/vtbackup/cli/vtbackup.go @@ -520,7 +520,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } lastStatus = status - status, statusErr = mysqld.ReplicationStatus() + status, statusErr = mysqld.ReplicationStatusWithContext(ctx) if statusErr != nil { log.Warningf("Error getting replication status: %v", statusErr) continue @@ -557,7 +557,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back } // Did we make any progress? - status, statusErr = mysqld.ReplicationStatus() + status, statusErr = mysqld.ReplicationStatusWithContext(ctx) if statusErr != nil { return fmt.Errorf("can't get replication status: %v", err) } diff --git a/go/mysql/endtoend/client_test.go b/go/mysql/endtoend/client_test.go index 6591c454e8a..65d20c11801 100644 --- a/go/mysql/endtoend/client_test.go +++ b/go/mysql/endtoend/client_test.go @@ -287,7 +287,28 @@ func TestReplicationStatus(t *testing.T) { status, err := conn.ShowReplicationStatus() assert.Equal(t, mysql.ErrNotReplica, err, "Got unexpected result for ShowReplicationStatus: %v %v", status, err) +} + +func TestReplicationStatusWithMysqlHang(t *testing.T) { + params := connParams + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + conn, err := mysql.Connect(ctx, ¶ms) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + err = cluster.SimulateMySQLHang() + require.NoError(t, err) + + defer cluster.StopSimulateMySQLHang() + status, err := conn.ShowReplicationStatusWithContext(ctx) + assert.Equal(t, ctx.Err().Error(), "context deadline exceeded") + assert.Equal(t, ctx.Err(), err, "Got unexpected result for ShowReplicationStatus: %v %v", status, err) + assert.True(t, conn.IsClosed()) } func TestSessionTrackGTIDs(t *testing.T) { diff --git a/go/mysql/endtoend/main_test.go b/go/mysql/endtoend/main_test.go index 466735c02e4..c641332b565 100644 --- a/go/mysql/endtoend/main_test.go +++ b/go/mysql/endtoend/main_test.go @@ -40,6 +40,7 @@ import ( var ( connParams mysql.ConnParams + cluster vttest.LocalCluster ) // assertSQLError makes sure we get the right error. @@ -200,8 +201,17 @@ ssl-key=%v/server-key.pem OnlyMySQL: true, ExtraMyCnf: []string{extraMyCnf, maxPacketMyCnf}, } - cluster := vttest.LocalCluster{ + + env, err := vttest.NewLocalTestEnv(0) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + env.EnableToxiproxy = true + + cluster = vttest.LocalCluster{ Config: cfg, + Env: env, } if err := cluster.Setup(); err != nil { fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index edb64913c31..7cfa4d8c37e 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -438,7 +438,35 @@ func resultToMap(qr *sqltypes.Result) (map[string]string, error) { // ShowReplicationStatus executes the right command to fetch replication status, // and returns a parsed Position with other fields. func (c *Conn) ShowReplicationStatus() (replication.ReplicationStatus, error) { - return c.flavor.status(c) + return c.ShowReplicationStatusWithContext(context.TODO()) +} + +func (c *Conn) ShowReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) { + result := make(chan replication.ReplicationStatus, 1) + errors := make(chan error, 1) + + go func() { + res, err := c.flavor.status(c) + if err != nil { + errors <- err + } else { + result <- res + } + }() + + for { + select { + case <-ctx.Done(): + c.Close() + return replication.ReplicationStatus{}, ctx.Err() + + case err := <-errors: + return replication.ReplicationStatus{}, err + + case res := <-result: + return res, nil + } + } } // ShowPrimaryStatus executes the right SHOW MASTER STATUS command, diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index e46932bcd51..b56f93bfd22 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -388,7 +388,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac // See if we need to restart replication after backup. params.Logger.Infof("getting current replication status") - replicaStatus, err := params.Mysqld.ReplicationStatus() + replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx) switch err { case nil: replicaStartRequired = replicaStatus.Healthy() && !DisableActiveReparents @@ -436,7 +436,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac if err := params.Mysqld.StopReplication(params.HookExtraEnv); err != nil { return false, vterrors.Wrapf(err, "can't stop replica") } - replicaStatus, err := params.Mysqld.ReplicationStatus() + replicaStatus, err := params.Mysqld.ReplicationStatusWithContext(ctx) if err != nil { return false, vterrors.Wrap(err, "can't get replica status") } @@ -534,7 +534,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac if err := ctx.Err(); err != nil { return usable, err } - status, err := params.Mysqld.ReplicationStatus() + status, err := params.Mysqld.ReplicationStatusWithContext(ctx) if err != nil { return usable, err } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 791b43da583..ee513d63da0 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -313,6 +313,10 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (replication.ReplicationStatus, }, nil } +func (fmd *FakeMysqlDaemon) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) { + return fmd.ReplicationStatus() +} + // PrimaryStatus is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) { if fmd.PrimaryStatusError != nil { diff --git a/go/vt/mysqlctl/mysql_daemon.go b/go/vt/mysqlctl/mysql_daemon.go index f50d368ed7d..0269d16654a 100644 --- a/go/vt/mysqlctl/mysql_daemon.go +++ b/go/vt/mysqlctl/mysql_daemon.go @@ -56,6 +56,7 @@ type MysqlDaemon interface { StopReplication(hookExtraEnv map[string]string) error StopIOThread(ctx context.Context) error ReplicationStatus() (replication.ReplicationStatus, error) + ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) PrimaryStatus(ctx context.Context) (replication.PrimaryStatus, error) GetGTIDPurged(ctx context.Context) (replication.Position, error) SetSemiSyncEnabled(source, replica bool) error diff --git a/go/vt/mysqlctl/query.go b/go/vt/mysqlctl/query.go index 5e21913c617..e4b26a1607b 100644 --- a/go/vt/mysqlctl/query.go +++ b/go/vt/mysqlctl/query.go @@ -35,20 +35,42 @@ func getPoolReconnect(ctx context.Context, pool *dbconnpool.ConnectionPool) (*db if err != nil { return conn, err } - // Run a test query to see if this connection is still good. - if _, err := conn.Conn.ExecuteFetch("SELECT 1", 1, false); err != nil { + + errChan := make(chan error, 1) + resultChan := make(chan *sqltypes.Result, 1) + + go func() { + result, err := conn.Conn.ExecuteFetch("SELECT 1", 1, false) + if err != nil { + errChan <- err + } else { + resultChan <- result + } + }() + + select { + case <-ctx.Done(): + conn.Close() + conn.Recycle() + return nil, ctx.Err() + + case err := <-errChan: // If we get a connection error, try to reconnect. if sqlErr, ok := err.(*sqlerror.SQLError); ok && (sqlErr.Number() == sqlerror.CRServerGone || sqlErr.Number() == sqlerror.CRServerLost) { if err := conn.Conn.Reconnect(ctx); err != nil { conn.Recycle() return nil, err } + return conn, nil } + conn.Recycle() return nil, err + + case <-resultChan: + return conn, nil } - return conn, nil } // ExecuteSuperQuery allows the user to execute a query as a super user. diff --git a/go/vt/mysqlctl/replication.go b/go/vt/mysqlctl/replication.go index 23b19669f16..90793a1abd1 100644 --- a/go/vt/mysqlctl/replication.go +++ b/go/vt/mysqlctl/replication.go @@ -385,13 +385,17 @@ func (mysqld *Mysqld) WaitSourcePos(ctx context.Context, targetPos replication.P // ReplicationStatus returns the server replication status func (mysqld *Mysqld) ReplicationStatus() (replication.ReplicationStatus, error) { - conn, err := getPoolReconnect(context.TODO(), mysqld.dbaPool) + return mysqld.ReplicationStatusWithContext(context.TODO()) +} + +func (mysqld *Mysqld) ReplicationStatusWithContext(ctx context.Context) (replication.ReplicationStatus, error) { + conn, err := getPoolReconnect(ctx, mysqld.dbaPool) if err != nil { return replication.ReplicationStatus{}, err } - defer conn.Recycle() - return conn.Conn.ShowReplicationStatus() + defer conn.Recycle() + return conn.Conn.ShowReplicationStatusWithContext(ctx) } // PrimaryStatus returns the primary replication statuses diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 4512b546f2c..335302902be 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -649,7 +649,7 @@ func (tm *TabletManager) startReplication(ctx context.Context, pos replication.P if err := ctx.Err(); err != nil { return err } - status, err := tm.MysqlDaemon.ReplicationStatus() + status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) if err != nil { return vterrors.Wrap(err, "can't get replication status") } diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 9981219e4a2..b11156f8a6c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -39,7 +39,7 @@ import ( // ReplicationStatus returns the replication status func (tm *TabletManager) ReplicationStatus(ctx context.Context) (*replicationdatapb.Status, error) { - status, err := tm.MysqlDaemon.ReplicationStatus() + status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) if err != nil { return nil, err } @@ -61,7 +61,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful } // Replication status - "SHOW REPLICA STATUS" - replicationStatus, err := tm.MysqlDaemon.ReplicationStatus() + replicationStatus, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) var replicationStatusProto *replicationdatapb.Status if err != nil && err != mysql.ErrNotReplica { return nil, err @@ -635,7 +635,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA // See if we were replicating at all, and should be replicating. wasReplicating := false shouldbeReplicating := false - status, err := tm.MysqlDaemon.ReplicationStatus() + status, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) if err == mysql.ErrNotReplica { // This is a special error that means we actually succeeded in reading // the status, but the status is empty because replication is not @@ -758,7 +758,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe // Get the status before we stop replication. // Doing this first allows us to return the status in the case that stopping replication // returns an error, so a user can optionally inspect the status before a stop was called. - rs, err := tm.MysqlDaemon.ReplicationStatus() + rs, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) if err != nil { return StopReplicationAndGetStatusResponse{}, vterrors.Wrap(err, "before status failed") } @@ -800,7 +800,7 @@ func (tm *TabletManager) StopReplicationAndGetStatus(ctx context.Context, stopRe } // Get the status after we stop replication so we have up to date position and relay log positions. - rsAfter, err := tm.MysqlDaemon.ReplicationStatus() + rsAfter, err := tm.MysqlDaemon.ReplicationStatusWithContext(ctx) if err != nil { return StopReplicationAndGetStatusResponse{ Status: &replicationdatapb.StopReplicationStatus{