Skip to content

Commit

Permalink
Merge branch 'master' into improve-column
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Sep 24, 2024
2 parents d7758c0 + 25676cf commit 5b3d086
Show file tree
Hide file tree
Showing 117 changed files with 590 additions and 603 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/pingcap/tidb/pkg/executor"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
Expand Down Expand Up @@ -269,7 +269,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
// we can update the tidb config here
})
session.SetSchemaLease(0)
session.SetSchemaLease(time.Second)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
require.Nil(t, err)
Expand Down
19 changes: 9 additions & 10 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
timeta "github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -1084,28 +1084,27 @@ func (s *snapshot) alterPartitioning(job *timodel.Job) error {
}

func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []*timodel.CIStr
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames)
args, err := timodel.GetRenameTablesArgs(job)
if err != nil {
return errors.Trace(err)
}
if len(job.BinlogInfo.MultipleTableInfos) < len(newTableNames) {
if len(job.BinlogInfo.MultipleTableInfos) < len(args.RenameTableInfos) {
return cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}
// NOTE: should handle failures in halfway better.
for _, tableID := range oldTableIDs {
if err := s.dropTable(tableID, currentTs); err != nil {
for _, info := range args.RenameTableInfos {
if err := s.dropTable(info.TableID, currentTs); err != nil {
return errors.Trace(err)
}
}
for i, tableInfo := range job.BinlogInfo.MultipleTableInfos {
newSchema, ok := s.schemaByID(newSchemaIDs[i])
info := args.RenameTableInfos[i]
newSchema, ok := s.schemaByID(info.NewSchemaID)
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(info.NewSchemaID)
}
newSchemaName := newSchema.Name.O
tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
tbInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
err = s.createTable(tbInfo, currentTs)
if err != nil {
return errors.Trace(err)
Expand Down
11 changes: 6 additions & 5 deletions cdc/entry/schema/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"fmt"
"testing"

timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -79,15 +80,15 @@ func TestSchema(t *testing.T) {
snap := NewEmptySnapshot(true)

// createSchema fails if the schema ID or name already exist.
dbName := timodel.CIStr{O: "DB_1", L: "db_1"}
dbName := pmodel.CIStr{O: "DB_1", L: "db_1"}
require.Nil(t, snap.inner.createSchema(&timodel.DBInfo{ID: 1, Name: dbName}, 100))
require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 100))
require.Error(t, snap.inner.createSchema(&timodel.DBInfo{ID: 1}, 110))
require.Error(t, snap.inner.createSchema(&timodel.DBInfo{ID: 2, Name: dbName}, 120))
snap1 := snap.Copy()

// replaceSchema only success if the schema ID exists.
dbName = timodel.CIStr{O: "DB_2", L: "db_2"}
dbName = pmodel.CIStr{O: "DB_2", L: "db_2"}
require.Error(t, snap.inner.replaceSchema(&timodel.DBInfo{ID: 2}, 130))
require.Nil(t, snap.inner.replaceSchema(&timodel.DBInfo{ID: 1, Name: dbName}, 140))
snap2 := snap.Copy()
Expand Down Expand Up @@ -354,7 +355,7 @@ func TestDrop(t *testing.T) {
func newDBInfo(id int64) *timodel.DBInfo {
return &timodel.DBInfo{
ID: id,
Name: timodel.CIStr{
Name: pmodel.CIStr{
O: fmt.Sprintf("DB_%d", id),
L: fmt.Sprintf("db_%d", id),
},
Expand All @@ -367,7 +368,7 @@ func newTbInfo(schemaID int64, schemaName string, tableID int64) *model.TableInf
return &model.TableInfo{
TableInfo: &timodel.TableInfo{
ID: tableID,
Name: timodel.CIStr{
Name: pmodel.CIStr{
O: fmt.Sprintf("TB_%d", tableID),
L: fmt.Sprintf("TB_%d", tableID),
},
Expand Down
40 changes: 23 additions & 17 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -471,28 +471,33 @@ func (s *schemaStorage) BuildDDLEvents(
return ddlEvents, nil
}

// GetNewJobWithArgs returns a new job with the given args
func GetNewJobWithArgs(job *timodel.Job, args timodel.JobArgs) (*timodel.Job, error) {
job.FillArgs(args)
bytes, err := job.Encode(true)
if err != nil {
return nil, errors.Trace(err)
}
encodedJob := &timodel.Job{}
if err = encodedJob.Decode(bytes); err != nil {
return nil, errors.Trace(err)
}
return encodedJob, nil
}

// TODO: find a better way to refactor this function.
// buildRenameEvents gets a list of DDLEvent from a rename tables DDL job.
func (s *schemaStorage) buildRenameEvents(
ctx context.Context, job *timodel.Job,
) ([]*model.DDLEvent, error) {
var (
oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
newTableNames, oldSchemaNames []*timodel.CIStr
ddlEvents []*model.DDLEvent
)
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs,
&newTableNames, &oldTableIDs, &oldSchemaNames)
var ddlEvents []*model.DDLEvent
args, err := timodel.GetRenameTablesArgs(job)
if err != nil {
return nil, errors.Trace(err)
}

multiTableInfos := job.BinlogInfo.MultipleTableInfos
if len(multiTableInfos) != len(oldSchemaIDs) ||
len(multiTableInfos) != len(newSchemaIDs) ||
len(multiTableInfos) != len(newTableNames) ||
len(multiTableInfos) != len(oldTableIDs) ||
len(multiTableInfos) != len(oldSchemaNames) {
if len(multiTableInfos) != len(args.RenameTableInfos) {
return nil, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}

Expand All @@ -502,21 +507,22 @@ func (s *schemaStorage) buildRenameEvents(
}

for i, tableInfo := range multiTableInfos {
newSchema, ok := preSnap.SchemaByID(newSchemaIDs[i])
info := args.RenameTableInfos[i]
newSchema, ok := preSnap.SchemaByID(info.NewSchemaID)
if !ok {
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
newSchemaIDs[i])
info.NewSchemaID)
}
newSchemaName := newSchema.Name.O
oldSchemaName := oldSchemaNames[i].O
oldSchemaName := info.OldSchemaName.O
event := new(model.DDLEvent)
preTableInfo, ok := preSnap.PhysicalTableByID(tableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(
job.TableID)
}

tableInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName,
tableInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName,
job.BinlogInfo.FinishedTS, tableInfo)
event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName)
ddlEvents = append(ddlEvents, event)
Expand Down
54 changes: 29 additions & 25 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -26,7 +27,8 @@ import (
"github.com/pingcap/tidb/pkg/domain"
tidbkv "github.com/pingcap/tidb/pkg/kv"
timeta "github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand All @@ -45,7 +47,7 @@ import (
)

func TestSchema(t *testing.T) {
dbName := timodel.NewCIStr("Test")
dbName := pmodel.NewCIStr("Test")
// db and ignoreDB info
dbInfo := &timodel.DBInfo{
ID: 1,
Expand Down Expand Up @@ -113,10 +115,10 @@ func TestSchema(t *testing.T) {

func TestTable(t *testing.T) {
var jobs []*timodel.Job
dbName := timodel.NewCIStr("Test")
tbName := timodel.NewCIStr("T")
colName := timodel.NewCIStr("A")
idxName := timodel.NewCIStr("idx")
dbName := pmodel.NewCIStr("Test")
tbName := pmodel.NewCIStr("T")
colName := pmodel.NewCIStr("A")
idxName := pmodel.NewCIStr("idx")
// column info
colInfo := &timodel.ColumnInfo{
ID: 1,
Expand Down Expand Up @@ -293,10 +295,10 @@ func TestTable(t *testing.T) {

func TestHandleDDL(t *testing.T) {
snap := schema.NewEmptySnapshot(false)
dbName := timodel.NewCIStr("Test")
colName := timodel.NewCIStr("A")
tbName := timodel.NewCIStr("T")
newTbName := timodel.NewCIStr("RT")
dbName := pmodel.NewCIStr("Test")
colName := pmodel.NewCIStr("A")
tbName := pmodel.NewCIStr("T")
newTbName := pmodel.NewCIStr("RT")

// db info
dbInfo := &timodel.DBInfo{
Expand Down Expand Up @@ -398,7 +400,7 @@ func TestHandleRenameTables(t *testing.T) {
for i = 1; i < 3; i++ {
dbInfo := &timodel.DBInfo{
ID: i,
Name: timodel.NewCIStr(fmt.Sprintf("db_%d", i)),
Name: pmodel.NewCIStr(fmt.Sprintf("db_%d", i)),
State: timodel.StatePublic,
}
job := &timodel.Job{
Expand All @@ -415,7 +417,7 @@ func TestHandleRenameTables(t *testing.T) {
for i = 1; i < 3; i++ {
tblInfo := &timodel.TableInfo{
ID: 10 + i,
Name: timodel.NewCIStr(fmt.Sprintf("table_%d", i)),
Name: pmodel.NewCIStr(fmt.Sprintf("table_%d", i)),
State: timodel.StatePublic,
}
job := &timodel.Job{
Expand All @@ -435,12 +437,14 @@ func TestHandleRenameTables(t *testing.T) {
oldSchemaIDs := []int64{1, 2}
newSchemaIDs := []int64{2, 1}
oldTableIDs := []int64{11, 12}
newTableNames := []timodel.CIStr{timodel.NewCIStr("x"), timodel.NewCIStr("y")}
oldSchemaNames := []timodel.CIStr{timodel.NewCIStr("db_1"), timodel.NewCIStr("db_2")}
args := []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames}
newTableNames := []pmodel.CIStr{pmodel.NewCIStr("x"), pmodel.NewCIStr("y")}
oldTableNames := []pmodel.CIStr{pmodel.NewCIStr("oldx"), pmodel.NewCIStr("oldy")}
oldSchemaNames := []pmodel.CIStr{pmodel.NewCIStr("db_1"), pmodel.NewCIStr("db_2")}
args := []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames, oldTableNames}
rawArgs, err := json.Marshal(args)
require.Nil(t, err)
var job *timodel.Job = &timodel.Job{
job := &timodel.Job{
Version: timodel.JobVersion1,
Type: timodel.ActionRenameTables,
RawArgs: rawArgs,
BinlogInfo: &timodel.HistoryInfo{
Expand All @@ -450,13 +454,13 @@ func TestHandleRenameTables(t *testing.T) {
job.BinlogInfo.MultipleTableInfos = append(job.BinlogInfo.MultipleTableInfos,
&timodel.TableInfo{
ID: 13,
Name: timodel.NewCIStr("x"),
Name: pmodel.NewCIStr("x"),
State: timodel.StatePublic,
})
job.BinlogInfo.MultipleTableInfos = append(job.BinlogInfo.MultipleTableInfos,
&timodel.TableInfo{
ID: 14,
Name: timodel.NewCIStr("y"),
Name: pmodel.NewCIStr("y"),
State: timodel.StatePublic,
})
testDoDDLAndCheck(t, snap, job, false)
Expand Down Expand Up @@ -485,8 +489,8 @@ func testDoDDLAndCheck(t *testing.T, snap *schema.Snapshot, job *timodel.Job, is

func TestMultiVersionStorage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dbName := timodel.NewCIStr("Test")
tbName := timodel.NewCIStr("T1")
dbName := pmodel.NewCIStr("Test")
tbName := pmodel.NewCIStr("T1")
// db and ignoreDB info
dbInfo := &timodel.DBInfo{
ID: 11,
Expand Down Expand Up @@ -525,7 +529,7 @@ func TestMultiVersionStorage(t *testing.T) {

jobs = append(jobs, job)

tbName = timodel.NewCIStr("T2")
tbName = pmodel.NewCIStr("T2")
// table info
tblInfo = &timodel.TableInfo{
ID: 13,
Expand Down Expand Up @@ -673,7 +677,7 @@ func TestCreateSnapFromMeta(t *testing.T) {
require.Nil(t, err)
defer store.Close() //nolint:errcheck

session.SetSchemaLease(0)
session.SetSchemaLease(time.Second)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
require.Nil(t, err)
Expand Down Expand Up @@ -708,7 +712,7 @@ func TestExplicitTables(t *testing.T) {
require.Nil(t, err)
defer store.Close() //nolint:errcheck

session.SetSchemaLease(0)
session.SetSchemaLease(time.Second)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
require.Nil(t, err)
Expand Down Expand Up @@ -859,7 +863,7 @@ func TestSchemaStorage(t *testing.T) {
ticonfig.UpdateGlobal(func(conf *ticonfig.Config) {
conf.AlterPrimaryKey = true
})
session.SetSchemaLease(0)
session.SetSchemaLease(time.Second)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
require.Nil(t, err)
Expand Down Expand Up @@ -952,7 +956,7 @@ func TestHandleKey(t *testing.T) {
require.Nil(t, err)
defer store.Close() //nolint:errcheck

session.SetSchemaLease(0)
session.SetSchemaLease(time.Second)
session.DisableStats4Test()
domain, err := session.BootstrapSession(store)
require.Nil(t, err)
Expand Down
Loading

0 comments on commit 5b3d086

Please sign in to comment.