Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(dm): move downstream table info logic into sqlmodel #4885

Merged
merged 7 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 7 additions & 148 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -44,6 +43,7 @@ import (
dmterror "github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/sqlmodel"
)

const (
Expand Down Expand Up @@ -78,9 +78,8 @@ type downstreamTracker struct {

// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
WhereHandleCache *sqlmodel.WhereHandle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call this cache ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's computed once for many row changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the whole struct is cached in tr.dsTracker.tableInfos which should be called xxxCache.

prefer remove this suffix here.

}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -440,39 +439,15 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = GetDownStreamTI(downstreamTI, originTi)
dti = &DownstreamTableInfo{
TableInfo: downstreamTI,
WhereHandleCache: sqlmodel.GetWhereHandle(originTi, downstreamTI),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can rename downstreamTI to downstreamTi 😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TI = TableInfo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but another one called originTi 😂

}
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
}
}
return nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
func (tr *Tracker) RemoveDownstreamSchema(tctx *tcontext.Context, targetTables []*filter.Table) {
if len(targetTables) == 0 {
Expand Down Expand Up @@ -541,119 +516,3 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
tr.dsTracker.stmtParser = stmtParser
return nil
}

// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
// second check not null unique key
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}

// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

// redirectIndexKeys redirect index's columns offset in origin tableinfo.
func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo {
if index == nil || originTi == nil {
return nil
}

columns := make([]*model.IndexColumn, 0, len(index.Columns))
for _, key := range index.Columns {
originColumn := model.FindColumnInfo(originTi.Columns, key.Name.L)
if originColumn == nil {
return nil
}
column := &model.IndexColumn{
Name: key.Name,
Offset: originColumn.Offset,
Length: key.Length,
}
columns = append(columns, column)
}
return &model.IndexInfo{
Table: index.Table,
Unique: index.Unique,
Primary: index.Primary,
State: index.State,
Tp: index.Tp,
Columns: columns,
}
}

// handlePkExCase is handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
func handlePkExCase(ti *model.TableInfo) *model.IndexInfo {
if pk := ti.GetPkColInfo(); pk != nil {
return &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: pk.Name,
Offset: pk.Offset,
Length: types.UnspecifiedLength,
}},
}
}
return nil
}

// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'.
func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool {
for _, col := range index.Columns {
if !fn(col.Offset) {
return false
}
}
return true
}
Loading