Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport schema reload performance fix #85

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type flavor interface {
enableBinlogPlaybackCommand() string
disableBinlogPlaybackCommand() string

baseShowTables() string
baseShowTablesWithSizes() string

supportsCapability(serverVersion string, capability FlavorCapability) (bool, error)
Expand Down Expand Up @@ -571,8 +572,13 @@ func (c *Conn) DisableBinlogPlaybackCommand() string {
return c.flavor.disableBinlogPlaybackCommand()
}

// BaseShowTables returns a query that shows tables and their sizes
// BaseShowTables returns a query that shows tables
func (c *Conn) BaseShowTables() string {
return c.flavor.baseShowTables()
}

// BaseShowTablesWithSizes returns a query that shows tables and their sizes
func (c *Conn) BaseShowTablesWithSizes() string {
return c.flavor.baseShowTablesWithSizes()
}

Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ func (*filePosFlavor) disableBinlogPlaybackCommand() string {
return ""
}

// baseShowTables is part of the Flavor interface.
func (*filePosFlavor) baseShowTables() string {
return mysqlFlavor{}.baseShowTables()
}

// baseShowTablesWithSizes is part of the Flavor interface.
func (*filePosFlavor) baseShowTablesWithSizes() string {
return TablesWithSize56
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mariadb_binlog_playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (mariadbFlavor) disableBinlogPlaybackCommand() string {
return ""
}

// baseShowTables is part of the Flavor interface.
func (mariadbFlavor) baseShowTables() string {
return mysqlFlavor{}.baseShowTables()
}

// baseShowTablesWithSizes is part of the Flavor interface.
func (mariadbFlavor101) baseShowTablesWithSizes() string {
return TablesWithSize56
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ func (mysqlFlavor) disableBinlogPlaybackCommand() string {
return ""
}

// baseShowTables is part of the Flavor interface.
func (mysqlFlavor) baseShowTables() string {
return "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()"
}

// TablesWithSize56 is a query to select table along with size for mysql 5.6
const TablesWithSize56 = `SELECT table_name, table_type, unix_timestamp(create_time), table_comment, SUM( data_length + index_length), SUM( data_length + index_length)
FROM information_schema.tables WHERE table_schema = database() group by table_name`
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (mysqlGRFlavor) primaryStatus(c *Conn) (PrimaryStatus, error) {
return mysqlFlavor{}.primaryStatus(c)
}

func (mysqlGRFlavor) baseShowTables() string {
return mysqlFlavor{}.baseShowTables()
}

func (mysqlGRFlavor) baseShowTablesWithSizes() string {
return TablesWithSize80
}
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,16 @@ func (dbc *DBConn) ID() int64 {
return dbc.conn.ID()
}

// BaseShowTables returns a query that shows tables and their sizes
// BaseShowTables returns a query that shows tables
func (dbc *DBConn) BaseShowTables() string {
return dbc.conn.BaseShowTables()
}

// BaseShowTablesWithSizes returns a query that shows tables and their sizes
func (dbc *DBConn) BaseShowTablesWithSizes() string {
return dbc.conn.BaseShowTablesWithSizes()
}

func (dbc *DBConn) reconnect(ctx context.Context) error {
dbc.conn.Close()
// Reuse MySQLTimings from dbc.conn.
Expand Down
10 changes: 9 additions & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,15 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
}

defer func() {
if err := qre.tsv.se.Reload(qre.ctx); err != nil {
// Call se.Reload() with includeStats=false as obtaining table
// size stats involves joining `information_schema.tables`,
// which can be very costly on systems with a large number of
// tables.
//
// Instead of synchronously recalculating table size stats
// after every DDL, let them be outdated until the periodic
// schema reload fixes it.
if err := qre.tsv.se.ReloadAtEx(qre.ctx, mysql.Position{}, false); err != nil {
log.Errorf("failed to reload schema %v", err)
}
}()
Expand Down
56 changes: 41 additions & 15 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (se *Engine) Open() error {
}
se.notifiers = make(map[string]notifier)

if err := se.reload(ctx); err != nil {
if err := se.reload(ctx, true); err != nil {
return err
}
if !se.SkipMetaCheck {
Expand Down Expand Up @@ -285,6 +285,8 @@ func (se *Engine) EnableHistorian(enabled bool) error {

// Reload reloads the schema info from the db.
// Any tables that have changed since the last load are updated.
// The includeStats argument controls whether table size statistics should be
// emitted, as they can be expensive to calculate for a large number of tables
func (se *Engine) Reload(ctx context.Context) error {
return se.ReloadAt(ctx, mysql.Position{})
}
Expand All @@ -294,25 +296,35 @@ func (se *Engine) Reload(ctx context.Context) error {
// It maintains the position at which the schema was reloaded and if the same position is provided
// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error {
return se.ReloadAtEx(ctx, pos, true)
}

// ReloadAtEx reloads the schema info from the db.
// Any tables that have changed since the last load are updated.
// It maintains the position at which the schema was reloaded and if the same position is provided
// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
// The includeStats argument controls whether table size statistics should be
// emitted, as they can be expensive to calculate for a large number of tables
func (se *Engine) ReloadAtEx(ctx context.Context, pos mysql.Position, includeStats bool) error {
se.mu.Lock()
defer se.mu.Unlock()
if !se.isOpen {
log.Warning("Schema reload called for an engine that is not yet open")
return nil
}
if !pos.IsZero() && se.reloadAtPos.AtLeast(pos) {
log.V(2).Infof("ReloadAt: found cached schema at %s", mysql.EncodePosition(pos))
log.V(2).Infof("ReloadAtEx: found cached schema at %s", mysql.EncodePosition(pos))
return nil
}
if err := se.reload(ctx); err != nil {
if err := se.reload(ctx, includeStats); err != nil {
return err
}
se.reloadAtPos = pos
return nil
}

// reload reloads the schema. It can also be used to initialize it.
func (se *Engine) reload(ctx context.Context) error {
func (se *Engine) reload(ctx context.Context, includeStats bool) error {
defer func() {
se.env.LogError()
}()
Expand All @@ -332,7 +344,14 @@ func (se *Engine) reload(ctx context.Context) error {
if se.SkipMetaCheck {
return nil
}
tableData, err := conn.Exec(ctx, conn.BaseShowTables(), maxTableCount, false)

var showTablesQuery string
if includeStats {
showTablesQuery = conn.BaseShowTablesWithSizes()
} else {
showTablesQuery = conn.BaseShowTables()
}
tableData, err := conn.Exec(ctx, showTablesQuery, maxTableCount, false)
if err != nil {
return err
}
Expand All @@ -353,12 +372,15 @@ func (se *Engine) reload(ctx context.Context) error {
tableName := row[0].ToString()
curTables[tableName] = true
createTime, _ := evalengine.ToInt64(row[2])
fileSize, _ := evalengine.ToUint64(row[4])
allocatedSize, _ := evalengine.ToUint64(row[5])

// publish the size metrics
se.tableFileSizeGauge.Set(tableName, int64(fileSize))
se.tableAllocatedSizeGauge.Set(tableName, int64(allocatedSize))
var fileSize, allocatedSize uint64

if includeStats {
fileSize, _ = evalengine.ToUint64(row[4])
allocatedSize, _ = evalengine.ToUint64(row[5])
// publish the size metrics
se.tableFileSizeGauge.Set(tableName, int64(fileSize))
se.tableAllocatedSizeGauge.Set(tableName, int64(allocatedSize))
}

// Table schemas are cached by tabletserver. For each table we cache `information_schema.tables.create_time` (`tbl.CreateTime`).
// We also record the last time the schema was loaded (`se.lastChange`). Both are in seconds. We reload a table only when:
Expand All @@ -372,8 +394,10 @@ func (se *Engine) reload(ctx context.Context) error {
// #1 will not identify the renamed table as a changed one.
tbl, isInTablesMap := se.tables[tableName]
if isInTablesMap && createTime == tbl.CreateTime && createTime < se.lastChange {
tbl.FileSize = fileSize
tbl.AllocatedSize = allocatedSize
if includeStats {
tbl.FileSize = fileSize
tbl.AllocatedSize = allocatedSize
}
continue
}

Expand All @@ -389,8 +413,10 @@ func (se *Engine) reload(ctx context.Context) error {
rec.RecordError(vterrors.Wrapf(err, "in Engine.reload(), reading table %s", tableName))
continue
}
table.FileSize = fileSize
table.AllocatedSize = allocatedSize
if includeStats {
table.FileSize = fileSize
table.AllocatedSize = allocatedSize
}
table.CreateTime = createTime
changedTables[tableName] = table
if isInTablesMap {
Expand Down
Loading