Skip to content

Commit

Permalink
drainer: history job sorted by schema version (#444) (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and july2993 committed Jan 24, 2019
1 parent 2109109 commit 4ff2707
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
10 changes: 7 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *Schema) CreateSchema(db *model.DBInfo) error {
s.schemas[db.ID] = db
s.schemaNameToID[db.Name.O] = db.ID

log.Debugf("create schema %s, schema id %d", db.Name.O, db.ID)
return nil
}

Expand All @@ -150,6 +151,8 @@ func (s *Schema) DropTable(id int64) (string, error) {

delete(s.tables, id)
delete(s.tableIDToName, id)

log.Debugf("drop table %s, table id %d", table.Name.O, id)
return table.Name.O, nil
}

Expand All @@ -168,6 +171,7 @@ func (s *Schema) CreateTable(schema *model.DBInfo, table *model.TableInfo) error
s.tables[table.ID] = table
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}

log.Debugf("create table %s.%s, table id %d", schema.Name.O, table.Name.O, table.ID)
return nil
}

Expand Down Expand Up @@ -213,11 +217,13 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if skipJob(s.jobs[i]) {
log.Debugf("skip ddl job %v", s.jobs[i])
continue
}

if s.jobs[i].BinlogInfo.SchemaVersion <= version {
if s.jobs[i].BinlogInfo.SchemaVersion <= s.currentVersion {
log.Warnf("ddl job %v schema version is less than current version %d, skip this ddl job", s.jobs[i], s.currentVersion)
continue
}

Expand All @@ -230,7 +236,7 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {

_, _, _, err = s.handleDDL(s.jobs[i])
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
Expand All @@ -252,7 +258,6 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
return "", "", "", nil
}

// log.Infof("ddl query %s", job.Query)
sql := job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
Expand Down Expand Up @@ -370,7 +375,6 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
return schema.Name.O, table.Name.O, sql, nil

default:
log.Infof("get unknown ddl type %v", job.Type)
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
return "", "", "", errors.NotFoundf("table %d", job.TableID)
Expand Down
23 changes: 23 additions & 0 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/url"
"os"
"path"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -101,6 +102,11 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}

// jobs from GetAllHistoryDDLJobs are sorted by job id, need sorted by schema version
sorter := &jobsSorter{jobs: jobs}
sort.Sort(sorter)

return jobs, nil
}

Expand Down Expand Up @@ -179,3 +185,20 @@ func filterIgnoreSchema(schema *model.DBInfo, ignoreSchemaNames map[string]struc
_, ok := ignoreSchemaNames[schema.Name.L]
return ok
}

// jobsSorter implements the sort.Interface interface.
type jobsSorter struct {
jobs []*model.Job
}

func (s *jobsSorter) Swap(i, j int) {
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
}

func (s *jobsSorter) Len() int {
return len(s.jobs)
}

func (s *jobsSorter) Less(i, j int) bool {
return s.jobs[i].BinlogInfo.SchemaVersion < s.jobs[j].BinlogInfo.SchemaVersion
}

0 comments on commit 4ff2707

Please sign in to comment.