Skip to content

Commit

Permalink
Make connection killing resilient to MySQL hangs (vitessio#14500)
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Schreiber <[email protected]>
Signed-off-by: Vicent Marti <[email protected]>
Signed-off-by: Daniel Joos <[email protected]>
Co-authored-by: Vicent Marti <[email protected]>
Co-authored-by: Daniel Joos <[email protected]>
  • Loading branch information
3 people authored and andyedison committed Apr 18, 2024
1 parent a8bc7c0 commit 97b8b90
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 75 deletions.
11 changes: 9 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
key := strings.ToLower(query)
db.mu.Lock()
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)
// Check if we should close the connection and provoke errno 2013.
if db.shouldClose.Load() {
defer db.mu.Unlock()
c.Close()

//log error
Expand All @@ -393,7 +393,9 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
// The driver may send this at connection time, and we don't want it to
// interfere.
if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") {
//log error
defer db.mu.Unlock()

// log error
if err := callback(&sqltypes.Result{}); err != nil {
log.Errorf("callback failed : %v", err)
}
Expand All @@ -402,12 +404,14 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R

// check if we should reject it.
if err, ok := db.rejectedData[key]; ok {
db.mu.Unlock()
return err
}

// Check explicit queries from AddQuery().
result, ok := db.data[key]
if ok {
db.mu.Unlock()
if f := result.BeforeFunc; f != nil {
f()
}
Expand All @@ -418,6 +422,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
db.mu.Unlock()
if ok {
userCallback(query)
}
Expand All @@ -428,6 +433,8 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
}

defer db.mu.Unlock()

if db.neverFail.Load() {
return callback(&sqltypes.Result{})
}
Expand Down
163 changes: 90 additions & 73 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/pools"
Expand All @@ -30,7 +31,6 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
Expand All @@ -41,6 +41,8 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const defaultKillTimeout = 5 * time.Second

// DBConn is a db connection for tabletserver.
// It performs automatic reconnects as needed.
// Its Execute function has a timeout that can kill
Expand All @@ -52,14 +54,16 @@ type DBConn struct {
pool *Pool
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current sync2.AtomicString
current atomic.Pointer[string]
timeCreated time.Time
setting string
resetSetting string

// err will be set if a query is killed through a Kill.
errmu sync.Mutex
err error

killTimeout time.Duration
}

// NewDBConn creates a new DBConn. It triggers a CheckMySQL if creation fails.
Expand All @@ -78,6 +82,7 @@ func NewDBConn(ctx context.Context, cp *Pool, appParams dbconfigs.Connector) (*D
info: appParams,
pool: cp,
dbaPool: cp.dbaPool,
killTimeout: defaultKillTimeout,
timeCreated: time.Now(),
stats: cp.env.Stats(),
}, nil
Expand All @@ -95,6 +100,7 @@ func NewDBConnNoPool(ctx context.Context, params dbconfigs.Connector, dbaPool *d
dbaPool: dbaPool,
pool: nil,
timeCreated: time.Now(),
killTimeout: defaultKillTimeout,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
}
if setting == nil {
Expand Down Expand Up @@ -157,8 +163,8 @@ func (dbc *DBConn) Exec(ctx context.Context, query string, maxrows int, wantfiel
}

func (dbc *DBConn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
dbc.current.Set(query)
defer dbc.current.Set("")
dbc.current.Store(&query)
defer dbc.current.Store(nil)

// Check if the context is already past its deadline before
// trying to execute the query.
Expand All @@ -168,19 +174,33 @@ func (dbc *DBConn) execOnce(ctx context.Context, query string, maxrows int, want
default:
}

defer dbc.stats.MySQLTimings.Record("Exec", time.Now())

done, wg := dbc.setDeadline(ctx)
qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
now := time.Now()
defer dbc.stats.MySQLTimings.Record("Exec", now)

if done != nil {
close(done)
wg.Wait()
type execResult struct {
result *sqltypes.Result
err error
}
if dbcerr := dbc.Err(); dbcerr != nil {
return nil, dbcerr

ch := make(chan execResult)
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
ch <- execResult{result, err}
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return nil, dbc.Err()
case r := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return nil, dbcErr
}
return r.result, r.err
}
return qr, err
}

// ExecOnce executes the specified query, but does not retry on connection errors.
Expand Down Expand Up @@ -260,22 +280,30 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt
}

func (dbc *DBConn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {
defer dbc.stats.MySQLTimings.Record("ExecStream", time.Now())
dbc.current.Store(&query)
defer dbc.current.Store(nil)

dbc.current.Set(query)
defer dbc.current.Set("")
now := time.Now()
defer dbc.stats.MySQLTimings.Record("ExecStream", now)

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
ch := make(chan error)
go func() {
ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
}()

if done != nil {
close(done)
wg.Wait()
}
if dbcerr := dbc.Err(); dbcerr != nil {
return dbcerr
select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return dbc.Err()
case err := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return dbcErr
}
return err
}
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
Expand Down Expand Up @@ -414,6 +442,16 @@ func (dbc *DBConn) Taint() {
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *DBConn) Kill(reason string, elapsed time.Duration) error {
return dbc.KillWithContext(context.Background(), reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *DBConn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}
dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

Expand All @@ -424,25 +462,43 @@ func (dbc *DBConn) Kill(reason string, elapsed time.Duration) error {
dbc.conn.Close()

// Server side action. Kill the session.
killConn, err := dbc.dbaPool.Get(context.TODO())
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill %d", dbc.conn.ID())
_, err = killConn.ExecuteFetch(sql, 10000, false)
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(),
dbc.CurrentForLogging(), err)
return err
go func() {
_, err := killConn.ExecuteFetch(sql, -1, false)
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())

return context.Cause(ctx)
case err := <-ch:
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
return nil
}

// Current returns the currently executing query.
func (dbc *DBConn) Current() string {
return dbc.current.Get()
if q := dbc.current.Load(); q != nil {
return *q
}
return ""
}

// ID returns the connection id.
Expand Down Expand Up @@ -480,45 +536,6 @@ func (dbc *DBConn) reconnect(ctx context.Context) error {
return nil
}

// setDeadline starts a goroutine that will kill the currently executing query
// if the deadline is exceeded. It returns a channel and a waitgroup. After the
// query is done executing, the caller is required to close the done channel
// and wait for the waitgroup to make sure that the necessary cleanup is done.
func (dbc *DBConn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) {
if ctx.Done() == nil {
return nil, nil
}
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
startTime := time.Now()
select {
case <-ctx.Done():
dbc.Kill(ctx.Err().Error(), time.Since(startTime))
case <-done:
return
}
elapsed := time.Since(startTime)

// Give 2x the elapsed time and some buffer as grace period
// for the query to get killed.
tmr2 := time.NewTimer(2*elapsed + 5*time.Second)
defer tmr2.Stop()
select {
case <-tmr2.C:
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())
case <-done:
return
}
<-done
log.Warningf("Hung query returned")
}()
return done, &wg
}

// CurrentForLogging applies transformations to the query making it suitable to log.
// It applies sanitization rules based on tablet settings and limits the max length of
// queries.
Expand Down
Loading

0 comments on commit 97b8b90

Please sign in to comment.