Skip to content

Commit

Permalink
syncer: refactor ddl.go (pingcap#2090) (pingcap#2264)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 26, 2021
1 parent 540cdde commit b05cbcc
Show file tree
Hide file tree
Showing 14 changed files with 448 additions and 470 deletions.
248 changes: 71 additions & 177 deletions syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package syncer

import (
"fmt"
"strings"
"time"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -29,153 +30,82 @@ import (
"github.com/pingcap/dm/syncer/metrics"
)

// parseDDLResult represents the result of parseDDLSQL.
type parseDDLResult struct {
stmt ast.StmtNode
needSkip bool
isDDL bool
}

func (s *Syncer) parseDDLSQL(sql string, p *parser.Parser, schema string) (result parseDDLResult, err error) {
// check skip before parse (used to skip some un-supported DDLs)
needSkip, err := s.skipSQLByPattern(sql)
if err != nil || needSkip {
return parseDDLResult{
stmt: nil,
needSkip: needSkip,
isDDL: false,
}, err
}

func parseOneStmt(qec *queryEventContext) (stmt ast.StmtNode, err error) {
// We use Parse not ParseOneStmt here, because sometimes we got a commented out ddl which can't be parsed
// by ParseOneStmt(it's a limitation of tidb parser.)
stmts, err := parserpkg.Parse(p, sql, "", "")
qec.tctx.L().Info("parse ddl", zap.String("event", "query"), zap.Stringer("query event context", qec))
stmts, err := parserpkg.Parse(qec.p, qec.originSQL, "", "")
if err != nil {
// log error rather than fatal, so other defer can be executed
s.tctx.L().Error("parse ddl", zap.String("sql", sql))
return parseDDLResult{
stmt: nil,
needSkip: false,
isDDL: false,
}, terror.ErrSyncerParseDDL.Delegate(err, sql)
qec.tctx.L().Error("parse ddl", zap.String("event", "query"), zap.Stringer("query event context", qec))
return nil, terror.ErrSyncerParseDDL.Delegate(err, qec.originSQL)
}

if len(stmts) == 0 {
return parseDDLResult{
stmt: nil,
needSkip: false,
isDDL: false,
}, nil
}

stmt := stmts[0]
switch node := stmt.(type) {
case ast.DDLNode:
return parseDDLResult{
stmt: stmt,
needSkip: false,
isDDL: true,
}, nil
case ast.DMLNode:
// if DML can be ignored, we do not report an error
table, err2 := getTableByDML(node)
if err2 == nil {
if len(table.Schema) == 0 {
table.Schema = schema
}
needSkip, err2 := s.skipRowsEvent(table, replication.QUERY_EVENT)
if err2 == nil && needSkip {
return parseDDLResult{
stmt: nil,
needSkip: true,
isDDL: false,
}, nil
}
}
return parseDDLResult{
stmt: nil,
needSkip: false,
isDDL: false,
}, terror.Annotatef(terror.ErrSyncUnitDMLStatementFound.Generate(), "query %s", sql)
default:
// BEGIN statement is included here.
// let sqls be empty
return parseDDLResult{
stmt: nil,
needSkip: false,
isDDL: false,
}, nil
return nil, nil
}
return stmts[0], nil
}

// splitAndFilterDDL will split multi-schema change DDL into multiple one schema change DDL due to TiDB's limitation.
// the returned DDLs is split from `stmt`, or DDLs that online DDL component has saved before (which means the input
// `stmt` is a RENAME statement which often generated by online DDL tools).
// return @spilt sqls, @online ddl table names, @error.
func (s *Syncer) splitAndFilterDDL(
ec eventContext,
p *parser.Parser,
stmt ast.StmtNode,
schema string,
) (sqls []string, tableMap map[string]*filter.Table, err error) {
sqls, err = parserpkg.SplitDDL(stmt, schema)
// processOneDDL processes already split ddl as following step:
// 1. track ddl no matter whether skip it; (TODO: will implement in https://github.com/pingcap/dm/pull/1975)
// 2. skip sql by skipQueryEvent;
// 3. apply online ddl if onlineDDL is not nil:
// * specially, if skip, apply empty string;
func (s *Syncer) processOneDDL(qec *queryEventContext, sql string) ([]string, error) {
ddlInfo, err := s.routeDDL(qec.p, qec.ddlSchema, sql)
if err != nil {
return nil, nil, err
return nil, err
}

statements := make([]string, 0, len(sqls))
tableMap = make(map[string]*filter.Table)
// TODO: add track ddl
// will implement in https://github.com/pingcap/dm/pull/1975

if s.onlineDDL != nil {
if err = s.onlineDDL.CheckRegex(stmt, schema, s.SourceTableNamesFlavor); err != nil {
return nil, nil, err
if err = s.onlineDDL.CheckRegex(ddlInfo.originStmt, qec.ddlSchema, s.SourceTableNamesFlavor); err != nil {
return nil, err
}
}
for _, sql := range sqls {
stmt2, err2 := p.ParseOneStmt(sql, "", "")
if err2 != nil {
return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err2.Error()), "ddl %s", sql)
}

tables, err2 := parserpkg.FetchDDLTables(schema, stmt2, s.SourceTableNamesFlavor)
if err2 != nil {
return nil, nil, err2
}

// get real tableNames before apply block-allow list
if s.onlineDDL != nil {
for _, table := range tables {
table.Name = s.onlineDDL.RealName(table.Name)
}
}

needSkip, err2 := s.skipQueryEvent(tables, stmt2, sql)
if err2 != nil {
return nil, nil, err2
}
if needSkip {
metrics.SkipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", schema))
continue
}

// filter and store ghost table ddl, transform online ddl
ss, tableName, err2 := s.handleOnlineDDL(ec.tctx, p, schema, sql, stmt2)
if err2 != nil {
return nil, nil, err2
qec.tctx.L().Debug("will skip query event", zap.String("event", "query"), zap.String("statement", sql), zap.Stringer("ddlInfo", ddlInfo))
shouldSkip, err := s.skipQueryEvent(qec.originSQL, ddlInfo)
if err != nil {
return nil, err
}
if shouldSkip {
metrics.SkipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(qec.startTime).Seconds())
qec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.Stringer("query event context", qec))
if s.onlineDDL == nil || len(ddlInfo.originDDL) != 0 {
return nil, nil
}
}

if tableName != nil {
tableMap[tableName.String()] = tableName
}
if s.onlineDDL == nil {
return []string{ddlInfo.originDDL}, nil
}
// filter and save ghost table ddl
sqls, err := s.onlineDDL.Apply(qec.tctx, ddlInfo.sourceTables, ddlInfo.originDDL, ddlInfo.originStmt, qec.p)
if err != nil {
return nil, err
}
// represent saved in onlineDDL.Storage
if len(sqls) == 0 {
return nil, nil
}
// represent this sql is not online DDL.
if sqls[0] == sql {
return sqls, nil
}

statements = append(statements, ss...)
if qec.onlineDDLTable == nil {
qec.onlineDDLTable = ddlInfo.sourceTables[0]
} else if qec.onlineDDLTable.String() != ddlInfo.sourceTables[0].String() {
return nil, terror.ErrSyncerUnitOnlineDDLOnMultipleTable.Generate(qec.originSQL)
}
return statements, tableMap, nil
return sqls, nil
}

// routeDDL route DDL from sourceTables to targetTables.
func (s *Syncer) routeDDL(p *parser.Parser, schema, sql string) (*ddlInfo, error) {
s.tctx.L().Debug("route ddl", zap.String("event", "query"), zap.String("statement", sql))
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
Expand All @@ -194,62 +124,14 @@ func (s *Syncer) routeDDL(p *parser.Parser, schema, sql string) (*ddlInfo, error

routedDDL, err := parserpkg.RenameDDLTable(stmt, targetTables)
return &ddlInfo{
sql: routedDDL,
stmt: stmt,
originDDL: sql,
routedDDL: routedDDL,
originStmt: stmt,
sourceTables: sourceTables,
targetTables: targetTables,
}, err
}

// handleOnlineDDL checks if the input `sql` is came from online DDL tools.
// If so, it will save actual DDL or return the actual DDL depending on online DDL types of `sql`.
// If not, it returns original SQL and no table names.
func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string, stmt ast.StmtNode) ([]string, *filter.Table, error) {
if s.onlineDDL == nil {
return []string{sql}, nil, nil
}

tables, err := parserpkg.FetchDDLTables(schema, stmt, s.SourceTableNamesFlavor)
if err != nil {
return nil, nil, err
}

sqls, err := s.onlineDDL.Apply(tctx, tables, sql, stmt)
if err != nil {
return nil, nil, err
}

// skip or origin sqls
if len(sqls) == 0 || (len(sqls) == 1 && sqls[0] == sql) {
return sqls, nil, nil
}

// remove empty sqls which inserted because online DDL is filtered
end := 0
for _, sql2 := range sqls {
if sql2 != "" {
sqls[end] = sql2
end++
}
}
sqls = sqls[:end]

// tableNames[1:] is the real table name
targetTables := tables[1:2]
for i := range sqls {
stmt, err := p.ParseOneStmt(sqls[i], "", "")
if err != nil {
return nil, nil, terror.ErrSyncerUnitParseStmt.New(err.Error())
}

sqls[i], err = parserpkg.RenameDDLTable(stmt, targetTables)
if err != nil {
return nil, nil, err
}
}
return sqls, tables[0], nil
}

func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema string) error {
sources := make(map[string][]*filter.Table)
sgs := s.sgk.Groups()
Expand Down Expand Up @@ -313,8 +195,20 @@ func (s *Syncer) clearOnlineDDL(tctx *tcontext.Context, targetTable *filter.Tabl
}

type ddlInfo struct {
sql string
stmt ast.StmtNode
originDDL string
routedDDL string
originStmt ast.StmtNode
sourceTables []*filter.Table
targetTables []*filter.Table
}

func (d *ddlInfo) String() string {
sourceTables := make([]string, 0, len(d.sourceTables))
targetTables := make([]string, 0, len(d.targetTables))
for i := range d.sourceTables {
sourceTables = append(sourceTables, d.sourceTables[i].String())
targetTables = append(targetTables, d.targetTables[i].String())
}
return fmt.Sprintf("{originDDL: %s, routedDDL: %s, sourceTables: %s, targetTables: %s}",
d.originDDL, d.routedDDL, strings.Join(sourceTables, ","), strings.Join(targetTables, ","))
}
Loading

0 comments on commit b05cbcc

Please sign in to comment.