Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do Not Merge] v16.x #78

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
38eccb9
add new lock syntax for mysql8
patrickcarnahan Aug 7, 2023
4a0e038
Add shutdown state in MySQL server plugin
davidpiegza Sep 25, 2023
43a0748
Merge branch 'release-16.0' of https://github.com/vitessio/vitess int…
arthurschreiber Nov 28, 2023
9376417
Expose the `--tablet_types_to_wait` flag in `vtcombo`.
arthurschreiber Feb 19, 2024
ea0e19c
Merge pull request #93 from github/arthur/vtcombo-tablet-type-counts
arthurschreiber Feb 21, 2024
bab4912
Backport https://github.com/vitessio/vitess/pull/15275
hmaurer Mar 21, 2024
a8bc7c0
Merge pull request #99 from github/hm/backport-vtexplain-fix
hmaurer Mar 21, 2024
97b8b90
Make connection killing resilient to MySQL hangs (#14500)
arthurschreiber Mar 4, 2024
7f33ae6
Merge pull request #105 from github/release-16.0
arthurschreiber Apr 19, 2024
c8a7b66
Merge branch 'release-16.0-github' of https://github.com/github/vites…
arthurschreiber Apr 19, 2024
d8df704
Merge pull request #104 from github/backport-mysql-hangs
arthurschreiber Apr 23, 2024
580806b
Do not load table stats when booting `vttablet`. (#15715)
arthurschreiber Jun 10, 2024
703fa1c
Merge pull request #109 from github/backport-15715-to-release-16.0-gi…
arthurschreiber Jun 11, 2024
dd81005
Fix `vtexplain` not handling `UNION` queries with `weight_string` res…
vitess-bot[bot] Jun 13, 2024
f6087bf
Fix flakiness in `vtexplain` unit test case.
arthurschreiber Jun 13, 2024
62ee42d
Merge pull request #110 from github/backport-16129-16159-to-release-1…
arthurschreiber Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtcombo"
"vitess.io/vitess/go/vt/vtctld"
Expand All @@ -63,15 +64,18 @@ var (
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")
plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.")

tpb vttestpb.VTTestTopology
ts *topo.Server
resilientServer *srvtopo.ResilientServer
tpb vttestpb.VTTestTopology
ts *topo.Server
resilientServer *srvtopo.ResilientServer
tabletTypesToWait []topodatapb.TabletType
)

func init() {
flags.Var(vttest.TextTopoData(&tpb), "proto_topo", "vttest proto definition of the topology, encoded in compact text format. See vttest.proto for more information.")
flags.Var(vttest.JSONTopoData(&tpb), "json_topo", "vttest proto definition of the topology, encoded in json format. See vttest.proto for more information.")

flags.Var((*topoproto.TabletTypeListFlag)(&tabletTypesToWait), "tablet_types_to_wait", "Wait till connected for specified tablet types during Gateway initialization. Should be provided as a comma-separated set of tablet types.")

servenv.RegisterDefaultFlags()
servenv.RegisterFlags()
servenv.RegisterGRPCServerFlags()
Expand Down Expand Up @@ -273,18 +277,29 @@ func main() {

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(ts, "ResilientSrvTopoServer")
tabletTypesToWait := []topodatapb.TabletType{
topodatapb.TabletType_PRIMARY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
for _, tt := range tabletTypesToWait {
if topoproto.IsServingType(tt) {
tabletTypes = append(tabletTypes, tt)
}
}

if len(tabletTypes) == 0 {
log.Exitf("tablet_types_to_wait should contain at least one serving tablet type")
}
} else {
tabletTypes = append(tabletTypes, topodatapb.TabletType_PRIMARY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY)
}

plannerVersion, _ := plancontext.PlannerNameToVersion(*plannerName)

vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
// pass nil for healthcheck, it will get created
vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypesToWait, plannerVersion)
vtg := vtgate.Init(context.Background(), nil, resilientServer, tpb.Cells[0], tabletTypes, plannerVersion)

// vtctld configuration and init
err = vtctld.InitVtctld(ts)
Expand Down
29 changes: 29 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ type Conn struct {
// enableQueryInfo controls whether we parse the INFO field in QUERY_OK packets
// See: ConnParams.EnableQueryInfo
enableQueryInfo bool

// mu protects the fields below
mu sync.Mutex
// this is used to mark the connection to be closed so that the command phase for the connection can be stopped and
// the connection gets closed.
closing bool
}

// splitStatementFunciton is the function that is used to split the statement in case of a multi-statement query.
Expand Down Expand Up @@ -899,6 +905,11 @@ func (c *Conn) handleNextCommand(handler Handler) bool {
return false
}

// before continue to process the packet, check if the connection should be closed or not.
if c.IsMarkedForClose() {
return false
}

switch data[0] {
case ComQuit:
c.recycleReadPacket()
Expand Down Expand Up @@ -1634,3 +1645,21 @@ func (c *Conn) IsUnixSocket() bool {
func (c *Conn) GetRawConn() net.Conn {
return c.conn
}

// MarkForClose marks the connection for close.
func (c *Conn) MarkForClose() {
c.mu.Lock()
defer c.mu.Unlock()
c.closing = true
}

// IsMarkedForClose return true if the connection should be closed.
func (c *Conn) IsMarkedForClose() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.closing
}

func (c *Conn) IsShuttingDown() bool {
return c.listener.isShutdown()
}
11 changes: 9 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
key := strings.ToLower(query)
db.mu.Lock()
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)
// Check if we should close the connection and provoke errno 2013.
if db.shouldClose.Load() {
defer db.mu.Unlock()
c.Close()

//log error
Expand All @@ -393,7 +393,9 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
// The driver may send this at connection time, and we don't want it to
// interfere.
if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") {
//log error
defer db.mu.Unlock()

// log error
if err := callback(&sqltypes.Result{}); err != nil {
log.Errorf("callback failed : %v", err)
}
Expand All @@ -402,12 +404,14 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R

// check if we should reject it.
if err, ok := db.rejectedData[key]; ok {
db.mu.Unlock()
return err
}

// Check explicit queries from AddQuery().
result, ok := db.data[key]
if ok {
db.mu.Unlock()
if f := result.BeforeFunc; f != nil {
f()
}
Expand All @@ -418,6 +422,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
db.mu.Unlock()
if ok {
userCallback(query)
}
Expand All @@ -428,6 +433,8 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
}
}

defer db.mu.Unlock()

if db.neverFail.Load() {
return callback(&sqltypes.Result{})
}
Expand Down
11 changes: 10 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,18 @@ func (mysqlFlavor) disableBinlogPlaybackCommand() string {

// baseShowTables is part of the Flavor interface.
func (mysqlFlavor) baseShowTables() string {
return "SELECT table_name, table_type, unix_timestamp(create_time), table_comment FROM information_schema.tables WHERE table_schema = database()"
return BaseShowTables
}

const BaseShowTables = `SELECT t.table_name,
t.table_type,
UNIX_TIMESTAMP(t.create_time),
t.table_comment
FROM information_schema.tables t
WHERE
t.table_schema = database()
`

// TablesWithSize56 is a query to select table along with size for mysql 5.6
const TablesWithSize56 = `SELECT table_name,
table_type,
Expand Down
15 changes: 11 additions & 4 deletions go/mysql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,21 @@ var BaseShowTablesFields = []*querypb.Field{{
ColumnLength: 6144,
Charset: collations.CollationUtf8ID,
Flags: uint32(querypb.MySqlFlag_NOT_NULL_FLAG),
}, {
}}

var BaseShowTablesWithSizesFields = append(BaseShowTablesFields, &querypb.Field{
Name: "i.file_size",
Type: querypb.Type_INT64,
ColumnLength: 11,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG),
}, {
}, &querypb.Field{
Name: "i.allocated_size",
Type: querypb.Type_INT64,
ColumnLength: 11,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_BINARY_FLAG | querypb.MySqlFlag_NUM_FLAG),
}}
})

// BaseShowTablesRow returns the fields from a BaseShowTables or
// BaseShowTablesForTable command.
Expand All @@ -192,9 +194,14 @@ func BaseShowTablesRow(tableName string, isView bool, comment string) []sqltypes
sqltypes.MakeTrusted(sqltypes.VarChar, []byte(tableType)),
sqltypes.MakeTrusted(sqltypes.Int64, []byte("1427325875")), // unix_timestamp(create_time)
sqltypes.MakeTrusted(sqltypes.VarChar, []byte(comment)),
}
}

func BaseShowTablesWithSizesRow(tableName string, isView bool, comment string) []sqltypes.Value {
return append(BaseShowTablesRow(tableName, isView, comment),
sqltypes.MakeTrusted(sqltypes.Int64, []byte("100")), // file_size
sqltypes.MakeTrusted(sqltypes.Int64, []byte("150")), // allocated_size
}
)
}

// ShowPrimaryFields contains the fields for a BaseShowPrimary.
Expand Down
3 changes: 2 additions & 1 deletion go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Ti

for {
kontinue := c.handleNextCommand(l.handler)
if !kontinue {
// before going for next command check if the connection should be closed or not.
if !kontinue || c.IsMarkedForClose() {
return
}
}
Expand Down
18 changes: 6 additions & 12 deletions go/vt/mysqlctl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ func (mysqld *Mysqld) executeSchemaCommands(sql string) error {
return mysqld.executeMysqlScript(params, strings.NewReader(sql))
}

func encodeEntityName(name string) string {
var buf strings.Builder
sqltypes.NewVarChar(name).EncodeSQL(&buf)
return buf.String()
}

// tableListSQL returns an IN clause "('t1', 't2'...) for a list of tables."
func tableListSQL(tables []string) (string, error) {
if len(tables) == 0 {
Expand All @@ -74,7 +68,7 @@ func tableListSQL(tables []string) (string, error) {

encodedTables := make([]string, len(tables))
for i, tableName := range tables {
encodedTables[i] = encodeEntityName(tableName)
encodedTables[i] = sqltypes.EncodeStringSQL(tableName)
}

return "(" + strings.Join(encodedTables, ", ") + ")", nil
Expand Down Expand Up @@ -304,9 +298,9 @@ func GetColumnsList(dbName, tableName string, exec func(string, int, bool) (*sql
if dbName == "" {
dbName2 = "database()"
} else {
dbName2 = encodeEntityName(dbName)
dbName2 = sqltypes.EncodeStringSQL(dbName)
}
query := fmt.Sprintf(GetColumnNamesQuery, dbName2, encodeEntityName(sqlescape.UnescapeID(tableName)))
query := fmt.Sprintf(GetColumnNamesQuery, dbName2, sqltypes.EncodeStringSQL(sqlescape.UnescapeID(tableName)))
qr, err := exec(query, -1, true)
if err != nil {
return "", err
Expand Down Expand Up @@ -393,7 +387,7 @@ func (mysqld *Mysqld) getPrimaryKeyColumns(ctx context.Context, dbName string, t
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME IN %s AND LOWER(INDEX_NAME) = 'primary'
ORDER BY table_name, SEQ_IN_INDEX`
sql = fmt.Sprintf(sql, encodeEntityName(dbName), tableList)
sql = fmt.Sprintf(sql, sqltypes.EncodeStringSQL(dbName), tableList)
qr, err := conn.ExecuteFetch(sql, len(tables)*100, true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -622,8 +616,8 @@ func (mysqld *Mysqld) GetPrimaryKeyEquivalentColumns(ctx context.Context, dbName
) AS pke ON index_cols.INDEX_NAME = pke.INDEX_NAME
WHERE index_cols.TABLE_SCHEMA = %s AND index_cols.TABLE_NAME = %s AND NON_UNIQUE = 0 AND NULLABLE != 'YES'
ORDER BY SEQ_IN_INDEX ASC`
encodedDbName := encodeEntityName(dbName)
encodedTable := encodeEntityName(table)
encodedDbName := sqltypes.EncodeStringSQL(dbName)
encodedTable := sqltypes.EncodeStringSQL(table)
sql = fmt.Sprintf(sql, encodedDbName, encodedTable, encodedDbName, encodedTable, encodedDbName, encodedTable)
qr, err := conn.ExecuteFetch(sql, 1000, true)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,16 @@ func (lock Lock) ToString() string {
return NoLockStr
case ForUpdateLock:
return ForUpdateStr
case ForUpdateLockNoWait:
return ForUpdateNoWaitStr
case ForUpdateLockSkipLocked:
return ForUpdateSkipLockedStr
case ForShareLock:
return ForShareStr
case ForShareLockNoWait:
return ForShareNoWaitStr
case ForShareLockSkipLocked:
return ForShareSkipLockedStr
case ShareModeLock:
return ShareModeStr
default:
Expand Down
16 changes: 13 additions & 3 deletions go/vt/sqlparser/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ const (
SQLCalcFoundRowsStr = "sql_calc_found_rows "

// Select.Lock
NoLockStr = ""
ForUpdateStr = " for update"
ShareModeStr = " lock in share mode"
NoLockStr = ""
ForUpdateStr = " for update"
ForUpdateNoWaitStr = " for update nowait"
ForUpdateSkipLockedStr = " for update skip locked"
ForShareStr = " for share"
ForShareNoWaitStr = " for share nowait"
ForShareSkipLockedStr = " for share skip locked"
ShareModeStr = " lock in share mode"

// Select.Cache
SQLCacheStr = "sql_cache "
Expand Down Expand Up @@ -466,6 +471,11 @@ const (
NoLock Lock = iota
ForUpdateLock
ShareModeLock
ForShareLock
ForShareLockNoWait
ForShareLockSkipLocked
ForUpdateLockNoWait
ForUpdateLockSkipLocked
)

// Constants for Enum Type - TrimType
Expand Down
3 changes: 3 additions & 0 deletions go/vt/sqlparser/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ var keywords = []keyword{
{"localtimestamp", LOCALTIMESTAMP},
{"locate", LOCATE},
{"lock", LOCK},
{"locked", LOCKED},
{"logs", LOGS},
{"long", UNUSED},
{"longblob", LONGBLOB},
Expand Down Expand Up @@ -448,6 +449,7 @@ var keywords = []keyword{
{"none", NONE},
{"not", NOT},
{"now", NOW},
{"nowait", NOWAIT},
{"no_write_to_binlog", NO_WRITE_TO_BINLOG},
{"nth_value", NTH_VALUE},
{"ntile", NTILE},
Expand Down Expand Up @@ -563,6 +565,7 @@ var keywords = []keyword{
{"signal", UNUSED},
{"signed", SIGNED},
{"simple", SIMPLE},
{"skip", SKIP},
{"slow", SLOW},
{"smallint", SMALLINT},
{"snapshot", SNAPSHOT},
Expand Down
10 changes: 10 additions & 0 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,18 @@ var (
input: "select /* distinct */ distinct 1 from t",
}, {
input: "select /* straight_join */ straight_join 1 from t",
}, {
input: "select /* for share */ 1 from t for share",
}, {
input: "select /* for share */ 1 from t for share nowait",
}, {
input: "select /* for share */ 1 from t for share skip locked",
}, {
input: "select /* for update */ 1 from t for update",
}, {
input: "select /* for update */ 1 from t for update nowait",
}, {
input: "select /* for update */ 1 from t for update skip locked",
}, {
input: "select /* lock in share mode */ 1 from t lock in share mode",
}, {
Expand Down
Loading
Loading