From 74806d061bb22cfb62e41e230d57c2ee5273a4c4 Mon Sep 17 00:00:00 2001 From: pupu Date: Mon, 14 Nov 2022 10:45:31 +0000 Subject: [PATCH] Speedup DDLs by not reloading table size stats (#11601) Currently, obtaining table sizes from mysql involves joining `information_schema.tables`, which can be very costly on systems with a large number of tables. My tests on a system with 13k tables took around 20s without this patch, and only 4s with it. Instead of synchronously recalculating table size stats after every DDL, let them be outdated until the periodic schema reload fixes it. Signed-off-by: pupu Signed-off-by: pupu --- go/mysql/flavor.go | 8 ++- go/mysql/flavor_filepos.go | 5 ++ go/mysql/flavor_mariadb_binlog_playback.go | 5 ++ go/mysql/flavor_mysql.go | 5 ++ go/mysql/flavor_mysqlgr.go | 4 ++ .../vttablet/tabletserver/connpool/dbconn.go | 7 ++- go/vt/vttablet/tabletserver/query_executor.go | 10 +++- go/vt/vttablet/tabletserver/schema/engine.go | 56 ++++++++++++++----- 8 files changed, 82 insertions(+), 18 deletions(-) diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 85c1247e678..268a0d5e601 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -155,6 +155,7 @@ type flavor interface { enableBinlogPlaybackCommand() string disableBinlogPlaybackCommand() string + baseShowTables() string baseShowTablesWithSizes() string supportsCapability(serverVersion string, capability FlavorCapability) (bool, error) @@ -571,8 +572,13 @@ func (c *Conn) DisableBinlogPlaybackCommand() string { return c.flavor.disableBinlogPlaybackCommand() } -// BaseShowTables returns a query that shows tables and their sizes +// BaseShowTables returns a query that shows tables func (c *Conn) BaseShowTables() string { + return c.flavor.baseShowTables() +} + +// BaseShowTablesWithSizes returns a query that shows tables and their sizes +func (c *Conn) BaseShowTablesWithSizes() string { return c.flavor.baseShowTablesWithSizes() } diff --git a/go/mysql/flavor_filepos.go b/go/mysql/flavor_filepos.go index a66af7f9f3d..85e76a92c6b 100644 --- a/go/mysql/flavor_filepos.go +++ b/go/mysql/flavor_filepos.go @@ -326,6 +326,11 @@ func (*filePosFlavor) disableBinlogPlaybackCommand() string { return "" } +// baseShowTables is part of the Flavor interface. +func (*filePosFlavor) baseShowTables() string { + return mysqlFlavor{}.baseShowTables() +} + // baseShowTablesWithSizes is part of the Flavor interface. func (*filePosFlavor) baseShowTablesWithSizes() string { return TablesWithSize56 diff --git a/go/mysql/flavor_mariadb_binlog_playback.go b/go/mysql/flavor_mariadb_binlog_playback.go index e862e744d04..f8ce0053b56 100644 --- a/go/mysql/flavor_mariadb_binlog_playback.go +++ b/go/mysql/flavor_mariadb_binlog_playback.go @@ -30,6 +30,11 @@ func (mariadbFlavor) disableBinlogPlaybackCommand() string { return "" } +// baseShowTables is part of the Flavor interface. +func (mariadbFlavor) baseShowTables() string { + return mysqlFlavor{}.baseShowTables() +} + // baseShowTablesWithSizes is part of the Flavor interface. func (mariadbFlavor101) baseShowTablesWithSizes() string { return TablesWithSize56 diff --git a/go/mysql/flavor_mysql.go b/go/mysql/flavor_mysql.go index 8e7ed44b957..08523321741 100644 --- a/go/mysql/flavor_mysql.go +++ b/go/mysql/flavor_mysql.go @@ -308,6 +308,11 @@ func (mysqlFlavor) disableBinlogPlaybackCommand() string { return "" } +// baseShowTables is part of the Flavor interface. +func (mysqlFlavor) baseShowTables() string { + return "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()" +} + // TablesWithSize56 is a query to select table along with size for mysql 5.6 const TablesWithSize56 = `SELECT table_name, table_type, unix_timestamp(create_time), table_comment, SUM( data_length + index_length), SUM( data_length + index_length) FROM information_schema.tables WHERE table_schema = database() group by table_name` diff --git a/go/mysql/flavor_mysqlgr.go b/go/mysql/flavor_mysqlgr.go index 0094c563b7b..f8e3fd16abf 100644 --- a/go/mysql/flavor_mysqlgr.go +++ b/go/mysql/flavor_mysqlgr.go @@ -239,6 +239,10 @@ func (mysqlGRFlavor) primaryStatus(c *Conn) (PrimaryStatus, error) { return mysqlFlavor{}.primaryStatus(c) } +func (mysqlGRFlavor) baseShowTables() string { + return mysqlFlavor{}.baseShowTables() +} + func (mysqlGRFlavor) baseShowTablesWithSizes() string { return TablesWithSize80 } diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 971e09c77cf..c40e58d8941 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -442,11 +442,16 @@ func (dbc *DBConn) ID() int64 { return dbc.conn.ID() } -// BaseShowTables returns a query that shows tables and their sizes +// BaseShowTables returns a query that shows tables func (dbc *DBConn) BaseShowTables() string { return dbc.conn.BaseShowTables() } +// BaseShowTablesWithSizes returns a query that shows tables and their sizes +func (dbc *DBConn) BaseShowTablesWithSizes() string { + return dbc.conn.BaseShowTablesWithSizes() +} + func (dbc *DBConn) reconnect(ctx context.Context) error { dbc.conn.Close() // Reuse MySQLTimings from dbc.conn. diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 4c85c947bac..f751d819364 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -510,7 +510,15 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e } defer func() { - if err := qre.tsv.se.Reload(qre.ctx); err != nil { + // Call se.Reload() with includeStats=false as obtaining table + // size stats involves joining `information_schema.tables`, + // which can be very costly on systems with a large number of + // tables. + // + // Instead of synchronously recalculating table size stats + // after every DDL, let them be outdated until the periodic + // schema reload fixes it. + if err := qre.tsv.se.ReloadAtEx(qre.ctx, mysql.Position{}, false); err != nil { log.Errorf("failed to reload schema %v", err) } }() diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index a23dbfe1277..c8fdb0365c4 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -194,7 +194,7 @@ func (se *Engine) Open() error { } se.notifiers = make(map[string]notifier) - if err := se.reload(ctx); err != nil { + if err := se.reload(ctx, true); err != nil { return err } if !se.SkipMetaCheck { @@ -285,6 +285,8 @@ func (se *Engine) EnableHistorian(enabled bool) error { // Reload reloads the schema info from the db. // Any tables that have changed since the last load are updated. +// The includeStats argument controls whether table size statistics should be +// emitted, as they can be expensive to calculate for a large number of tables func (se *Engine) Reload(ctx context.Context) error { return se.ReloadAt(ctx, mysql.Position{}) } @@ -294,6 +296,16 @@ func (se *Engine) Reload(ctx context.Context) error { // It maintains the position at which the schema was reloaded and if the same position is provided // (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { + return se.ReloadAtEx(ctx, pos, true) +} + +// ReloadAtEx reloads the schema info from the db. +// Any tables that have changed since the last load are updated. +// It maintains the position at which the schema was reloaded and if the same position is provided +// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema +// The includeStats argument controls whether table size statistics should be +// emitted, as they can be expensive to calculate for a large number of tables +func (se *Engine) ReloadAtEx(ctx context.Context, pos mysql.Position, includeStats bool) error { se.mu.Lock() defer se.mu.Unlock() if !se.isOpen { @@ -301,10 +313,10 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { return nil } if !pos.IsZero() && se.reloadAtPos.AtLeast(pos) { - log.V(2).Infof("ReloadAt: found cached schema at %s", mysql.EncodePosition(pos)) + log.V(2).Infof("ReloadAtEx: found cached schema at %s", mysql.EncodePosition(pos)) return nil } - if err := se.reload(ctx); err != nil { + if err := se.reload(ctx, includeStats); err != nil { return err } se.reloadAtPos = pos @@ -312,7 +324,7 @@ func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { } // reload reloads the schema. It can also be used to initialize it. -func (se *Engine) reload(ctx context.Context) error { +func (se *Engine) reload(ctx context.Context, includeStats bool) error { defer func() { se.env.LogError() }() @@ -332,7 +344,14 @@ func (se *Engine) reload(ctx context.Context) error { if se.SkipMetaCheck { return nil } - tableData, err := conn.Exec(ctx, conn.BaseShowTables(), maxTableCount, false) + + var showTablesQuery string + if includeStats { + showTablesQuery = conn.BaseShowTablesWithSizes() + } else { + showTablesQuery = conn.BaseShowTables() + } + tableData, err := conn.Exec(ctx, showTablesQuery, maxTableCount, false) if err != nil { return err } @@ -353,12 +372,15 @@ func (se *Engine) reload(ctx context.Context) error { tableName := row[0].ToString() curTables[tableName] = true createTime, _ := evalengine.ToInt64(row[2]) - fileSize, _ := evalengine.ToUint64(row[4]) - allocatedSize, _ := evalengine.ToUint64(row[5]) - - // publish the size metrics - se.tableFileSizeGauge.Set(tableName, int64(fileSize)) - se.tableAllocatedSizeGauge.Set(tableName, int64(allocatedSize)) + var fileSize, allocatedSize uint64 + + if includeStats { + fileSize, _ = evalengine.ToUint64(row[4]) + allocatedSize, _ = evalengine.ToUint64(row[5]) + // publish the size metrics + se.tableFileSizeGauge.Set(tableName, int64(fileSize)) + se.tableAllocatedSizeGauge.Set(tableName, int64(allocatedSize)) + } // Table schemas are cached by tabletserver. For each table we cache `information_schema.tables.create_time` (`tbl.CreateTime`). // We also record the last time the schema was loaded (`se.lastChange`). Both are in seconds. We reload a table only when: @@ -372,8 +394,10 @@ func (se *Engine) reload(ctx context.Context) error { // #1 will not identify the renamed table as a changed one. tbl, isInTablesMap := se.tables[tableName] if isInTablesMap && createTime == tbl.CreateTime && createTime < se.lastChange { - tbl.FileSize = fileSize - tbl.AllocatedSize = allocatedSize + if includeStats { + tbl.FileSize = fileSize + tbl.AllocatedSize = allocatedSize + } continue } @@ -389,8 +413,10 @@ func (se *Engine) reload(ctx context.Context) error { rec.RecordError(vterrors.Wrapf(err, "in Engine.reload(), reading table %s", tableName)) continue } - table.FileSize = fileSize - table.AllocatedSize = allocatedSize + if includeStats { + table.FileSize = fileSize + table.AllocatedSize = allocatedSize + } table.CreateTime = createTime changedTables[tableName] = table if isInTablesMap {