Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(dm): move downstream table info logic into sqlmodel #4885

Merged
merged 7 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 8 additions & 149 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -44,6 +43,7 @@ import (
dmterror "github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/sqlmodel"
)

const (
Expand Down Expand Up @@ -78,9 +78,8 @@ type downstreamTracker struct {

// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
WhereHandle *sqlmodel.WhereHandle
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -430,7 +429,7 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {

// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTI *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
Expand All @@ -440,39 +439,15 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = GetDownStreamTI(downstreamTI, originTi)
dti = &DownstreamTableInfo{
TableInfo: downstreamTI,
WhereHandle: sqlmodel.GetWhereHandle(originTI, downstreamTI),
}
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
}
}
return nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
func (tr *Tracker) RemoveDownstreamSchema(tctx *tcontext.Context, targetTables []*filter.Table) {
if len(targetTables) == 0 {
Expand Down Expand Up @@ -541,119 +516,3 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
tr.dsTracker.stmtParser = stmtParser
return nil
}

// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
// second check not null unique key
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}

// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

// redirectIndexKeys redirect index's columns offset in origin tableinfo.
func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo {
if index == nil || originTi == nil {
return nil
}

columns := make([]*model.IndexColumn, 0, len(index.Columns))
for _, key := range index.Columns {
originColumn := model.FindColumnInfo(originTi.Columns, key.Name.L)
if originColumn == nil {
return nil
}
column := &model.IndexColumn{
Name: key.Name,
Offset: originColumn.Offset,
Length: key.Length,
}
columns = append(columns, column)
}
return &model.IndexInfo{
Table: index.Table,
Unique: index.Unique,
Primary: index.Primary,
State: index.State,
Tp: index.Tp,
Columns: columns,
}
}

// handlePkExCase is handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
func handlePkExCase(ti *model.TableInfo) *model.IndexInfo {
if pk := ti.GetPkColInfo(); pk != nil {
return &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: pk.Name,
Offset: pk.Offset,
Length: types.UnspecifiedLength,
}},
}
}
return nil
}

// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'.
func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool {
for _, col := range index.Columns {
if !fn(col.Offset) {
return false
}
}
return true
}
Loading