From 123525ae4ae6263b112298814a382bedac40d839 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 24 Sep 2024 11:31:31 +0800 Subject: [PATCH] cdc: use typed args for renametables and adapt args v2 (#11617) --- cdc/entry/schema/snapshot.go | 18 ++++---- cdc/entry/schema_storage.go | 39 +++++++++-------- cdc/entry/schema_test.go | 40 +++++++++-------- cdc/entry/schema_test_helper.go | 68 ++++++++++------------------- cdc/owner/ddl_manager_test.go | 50 ++++++++++----------- cdc/puller/ddl_puller.go | 77 +++++++++------------------------ cdc/puller/ddl_puller_test.go | 10 ----- 7 files changed, 119 insertions(+), 183 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 6703dc7244a..055179a00a0 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/log" timeta "github.com/pingcap/tidb/pkg/meta" timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" @@ -1085,28 +1084,27 @@ func (s *snapshot) alterPartitioning(job *timodel.Job) error { } func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error { - var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 - var newTableNames, oldSchemaNames []*pmodel.CIStr - err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames) + args, err := timodel.GetRenameTablesArgs(job) if err != nil { return errors.Trace(err) } - if len(job.BinlogInfo.MultipleTableInfos) < len(newTableNames) { + if len(job.BinlogInfo.MultipleTableInfos) < len(args.RenameTableInfos) { return cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) } // NOTE: should handle failures in halfway better. - for _, tableID := range oldTableIDs { - if err := s.dropTable(tableID, currentTs); err != nil { + for _, info := range args.RenameTableInfos { + if err := s.dropTable(info.TableID, currentTs); err != nil { return errors.Trace(err) } } for i, tableInfo := range job.BinlogInfo.MultipleTableInfos { - newSchema, ok := s.schemaByID(newSchemaIDs[i]) + info := args.RenameTableInfos[i] + newSchema, ok := s.schemaByID(info.NewSchemaID) if !ok { - return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i]) + return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(info.NewSchemaID) } newSchemaName := newSchema.Name.O - tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo) + tbInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName, job.BinlogInfo.FinishedTS, tableInfo) err = s.createTable(tbInfo, currentTs) if err != nil { return errors.Trace(err) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index fc1d33f2109..999c8734b5a 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/pkg/kv" timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" @@ -472,28 +471,33 @@ func (s *schemaStorage) BuildDDLEvents( return ddlEvents, nil } +// GetNewJobWithArgs returns a new job with the given args +func GetNewJobWithArgs(job *timodel.Job, args timodel.JobArgs) (*timodel.Job, error) { + job.FillArgs(args) + bytes, err := job.Encode(true) + if err != nil { + return nil, errors.Trace(err) + } + encodedJob := &timodel.Job{} + if err = encodedJob.Decode(bytes); err != nil { + return nil, errors.Trace(err) + } + return encodedJob, nil +} + // TODO: find a better way to refactor this function. // buildRenameEvents gets a list of DDLEvent from a rename tables DDL job. func (s *schemaStorage) buildRenameEvents( ctx context.Context, job *timodel.Job, ) ([]*model.DDLEvent, error) { - var ( - oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 - newTableNames, oldSchemaNames []*pmodel.CIStr - ddlEvents []*model.DDLEvent - ) - err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, - &newTableNames, &oldTableIDs, &oldSchemaNames) + var ddlEvents []*model.DDLEvent + args, err := timodel.GetRenameTablesArgs(job) if err != nil { return nil, errors.Trace(err) } multiTableInfos := job.BinlogInfo.MultipleTableInfos - if len(multiTableInfos) != len(oldSchemaIDs) || - len(multiTableInfos) != len(newSchemaIDs) || - len(multiTableInfos) != len(newTableNames) || - len(multiTableInfos) != len(oldTableIDs) || - len(multiTableInfos) != len(oldSchemaNames) { + if len(multiTableInfos) != len(args.RenameTableInfos) { return nil, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) } @@ -503,13 +507,14 @@ func (s *schemaStorage) buildRenameEvents( } for i, tableInfo := range multiTableInfos { - newSchema, ok := preSnap.SchemaByID(newSchemaIDs[i]) + info := args.RenameTableInfos[i] + newSchema, ok := preSnap.SchemaByID(info.NewSchemaID) if !ok { return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs( - newSchemaIDs[i]) + info.NewSchemaID) } newSchemaName := newSchema.Name.O - oldSchemaName := oldSchemaNames[i].O + oldSchemaName := info.OldSchemaName.O event := new(model.DDLEvent) preTableInfo, ok := preSnap.PhysicalTableByID(tableInfo.ID) if !ok { @@ -517,7 +522,7 @@ func (s *schemaStorage) buildRenameEvents( job.TableID) } - tableInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, + tableInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName, job.BinlogInfo.FinishedTS, tableInfo) event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName) ddlEvents = append(ddlEvents, event) diff --git a/cdc/entry/schema_test.go b/cdc/entry/schema_test.go index 8fb0d68a12c..e32eba2507c 100644 --- a/cdc/entry/schema_test.go +++ b/cdc/entry/schema_test.go @@ -15,7 +15,6 @@ package entry import ( "context" - "encoding/json" "fmt" "sort" "testing" @@ -328,26 +327,31 @@ func TestBuildDDLEventsFromRenameTablesDDL(t *testing.T) { // rename test.t1 and test.t2 job = helper.DDL2Job( "rename table test1.t1 to test1.t10, test1.t2 to test1.t20") - oldSchemaIDs := []int64{schemaID, schemaID} - oldTableIDs := []int64{t1TableID, t2TableID} - newSchemaIDs := oldSchemaIDs - oldSchemaNames := []pmodel.CIStr{ - pmodel.NewCIStr("test1"), - pmodel.NewCIStr("test1"), - } - newTableNames := []pmodel.CIStr{ - pmodel.NewCIStr("t10"), - pmodel.NewCIStr("t20"), - } - args := []interface{}{ - oldSchemaIDs, newSchemaIDs, - newTableNames, oldTableIDs, oldSchemaNames, + args := &timodel.RenameTablesArgs{ + RenameTableInfos: []*timodel.RenameTableArgs{ + { + OldSchemaID: schemaID, + NewSchemaID: schemaID, + NewTableName: pmodel.NewCIStr("t10"), + TableID: t1TableID, + OldSchemaName: pmodel.NewCIStr("test1"), + OldTableName: pmodel.NewCIStr("oldt10"), + }, + { + OldSchemaID: schemaID, + NewSchemaID: schemaID, + NewTableName: pmodel.NewCIStr("t20"), + TableID: t2TableID, + OldSchemaName: pmodel.NewCIStr("test1"), + OldTableName: pmodel.NewCIStr("oldt20"), + }, + }, } - rawArgs, err := json.Marshal(args) - require.Nil(t, err) // the RawArgs field in job fetched from tidb snapshot meta is incorrent, // so we manually construct `job.RawArgs` to do the workaround. - job.RawArgs = rawArgs + bakJob, err := GetNewJobWithArgs(job, args) + require.Nil(t, err) + job.RawArgs = bakJob.RawArgs schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1) events, err = schema.BuildDDLEvents(ctx, job) require.Nil(t, err) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index 57cbc1c6ef5..9f141d9c6cc 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -15,7 +15,7 @@ package entry import ( "context" - "encoding/json" + "fmt" "strings" "testing" "time" @@ -121,37 +121,27 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job { return res } - // the RawArgs field in job fetched from tidb snapshot meta is incorrent, + // the RawArgs field in job fetched from tidb snapshot meta is cleared out after the job is done, // so we manually construct `job.RawArgs` to do the workaround. // we assume the old schema name is same as the new schema name here. // for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test" schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0] tableNum := len(res.BinlogInfo.MultipleTableInfos) - oldSchemaIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaIDs[i] = res.SchemaID - } - oldTableIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID + args := &timodel.RenameTablesArgs{ + RenameTableInfos: make([]*timodel.RenameTableArgs, 0, tableNum), } - newTableNames := make([]pmodel.CIStr, tableNum) for i := 0; i < tableNum; i++ { - newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name - } - oldSchemaNames := make([]pmodel.CIStr, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaNames[i] = pmodel.NewCIStr(schema) - } - newSchemaIDs := oldSchemaIDs - - args := []interface{}{ - oldSchemaIDs, newSchemaIDs, - newTableNames, oldTableIDs, oldSchemaNames, + args.RenameTableInfos = append(args.RenameTableInfos, &timodel.RenameTableArgs{ + OldSchemaID: res.SchemaID, + NewSchemaID: res.SchemaID, + TableID: res.BinlogInfo.MultipleTableInfos[i].ID, + NewTableName: res.BinlogInfo.MultipleTableInfos[i].Name, + OldSchemaName: pmodel.NewCIStr(schema), + OldTableName: pmodel.NewCIStr(fmt.Sprintf("old_%d", i)), + }) } - rawArgs, err := json.Marshal(args) + res, err = GetNewJobWithArgs(res, args) require.NoError(s.t, err) - res.RawArgs = rawArgs return res } @@ -234,31 +224,21 @@ func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent { // for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test" schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0] tableNum := len(res.BinlogInfo.MultipleTableInfos) - oldSchemaIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaIDs[i] = res.SchemaID - } - oldTableIDs := make([]int64, tableNum) - for i := 0; i < tableNum; i++ { - oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID + args := &timodel.RenameTablesArgs{ + RenameTableInfos: make([]*timodel.RenameTableArgs, 0, tableNum), } - newTableNames := make([]pmodel.CIStr, tableNum) for i := 0; i < tableNum; i++ { - newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name - } - oldSchemaNames := make([]pmodel.CIStr, tableNum) - for i := 0; i < tableNum; i++ { - oldSchemaNames[i] = pmodel.NewCIStr(schema) - } - newSchemaIDs := oldSchemaIDs - - args := []interface{}{ - oldSchemaIDs, newSchemaIDs, - newTableNames, oldTableIDs, oldSchemaNames, + args.RenameTableInfos = append(args.RenameTableInfos, &timodel.RenameTableArgs{ + OldSchemaID: res.SchemaID, + NewSchemaID: res.SchemaID, + NewTableName: res.BinlogInfo.MultipleTableInfos[i].Name, + TableID: res.BinlogInfo.MultipleTableInfos[i].ID, + OldSchemaName: pmodel.NewCIStr(schema), + OldTableName: pmodel.NewCIStr("old" + res.BinlogInfo.MultipleTableInfos[i].Name.L), + }) } - rawArgs, err := json.Marshal(args) + res, err = GetNewJobWithArgs(res, args) require.NoError(s.t, err) - res.RawArgs = rawArgs } err = s.schemaStorage.HandleDDLJob(res) diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index abbbfd87b7f..b2422bbb9e9 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -15,7 +15,6 @@ package owner import ( "context" - "encoding/json" "fmt" "testing" @@ -170,8 +169,7 @@ func TestExecRenameTablesDDL(t *testing.T) { dm := createDDLManagerForTest(t) mockDDLSink := dm.ddlSink.(*mockDDLSink) - var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 - var newTableNames, oldSchemaNames []pmodel.CIStr + var oldSchemaIDs, oldTableIDs []int64 execCreateStmt := func(tp, actualDDL, expectedDDL string) { mockDDLSink.ddlDone = false @@ -212,35 +210,33 @@ func TestExecRenameTablesDDL(t *testing.T) { require.Len(t, oldSchemaIDs, 2) require.Len(t, oldTableIDs, 2) - newSchemaIDs = []int64{oldSchemaIDs[1], oldSchemaIDs[0]} - oldSchemaNames = []pmodel.CIStr{ - pmodel.NewCIStr("test1"), - pmodel.NewCIStr("test2"), - } - newTableNames = []pmodel.CIStr{ - pmodel.NewCIStr("tb20"), - pmodel.NewCIStr("tb10"), - } - oldTableNames := []pmodel.CIStr{ - pmodel.NewCIStr("oldtb20"), - pmodel.NewCIStr("oldtb10"), - } - require.Len(t, newSchemaIDs, 2) - require.Len(t, oldSchemaNames, 2) - require.Len(t, newTableNames, 2) - args := []interface{}{ - oldSchemaIDs, newSchemaIDs, newTableNames, - oldTableIDs, oldSchemaNames, oldTableNames, + args := &timodel.RenameTablesArgs{ + RenameTableInfos: []*timodel.RenameTableArgs{ + { + OldSchemaID: oldSchemaIDs[0], + NewSchemaID: oldSchemaIDs[1], + NewTableName: pmodel.NewCIStr("tb20"), + TableID: oldTableIDs[0], + OldSchemaName: pmodel.NewCIStr("test1"), + OldTableName: pmodel.NewCIStr("oldtb20"), + }, + { + OldSchemaID: oldSchemaIDs[1], + NewSchemaID: oldSchemaIDs[0], + NewTableName: pmodel.NewCIStr("tb10"), + TableID: oldTableIDs[1], + OldSchemaName: pmodel.NewCIStr("test2"), + OldTableName: pmodel.NewCIStr("oldtb10"), + }, + }, } - rawArgs, err := json.Marshal(args) - require.Nil(t, err) job := helper.DDL2Job( "rename table test1.tb1 to test2.tb10, test2.tb2 to test1.tb20") // the RawArgs field in job fetched from tidb snapshot meta is incorrent, // so we manually construct `job.RawArgs` to do the workaround. - job.RawArgs = rawArgs - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 + var err error + job, err = entry.GetNewJobWithArgs(job, args) + require.Nil(t, err) mockDDLSink.recordDDLHistory = true mockDDLSink.ddlDone = false diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index 23a80a4aadf..43ce77d9ae7 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -15,7 +15,6 @@ package puller import ( "context" - "encoding/json" "fmt" "strings" "sync" @@ -26,7 +25,6 @@ import ( "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/pkg/kv" timodel "github.com/pingcap/tidb/pkg/meta/model" - pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/entry/schema" @@ -522,28 +520,14 @@ func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, // in the DDL job out and filter them one by one, // if all the tables are filtered, skip it. func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err error) { - var ( - oldSchemaIDs, newSchemaIDs, oldTableIDs []int64 - newTableNames, oldSchemaNames []*pmodel.CIStr - ) - - err = job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, - &newTableNames, &oldTableIDs, &oldSchemaNames) + var args *timodel.RenameTablesArgs + args, err = timodel.GetRenameTablesArgs(job) if err != nil { return true, errors.Trace(err) } - var ( - remainOldSchemaIDs, remainNewSchemaIDs, remainOldTableIDs []int64 - remainNewTableNames, remainOldSchemaNames []*pmodel.CIStr - ) - multiTableInfos := job.BinlogInfo.MultipleTableInfos - if len(multiTableInfos) != len(oldSchemaIDs) || - len(multiTableInfos) != len(newSchemaIDs) || - len(multiTableInfos) != len(newTableNames) || - len(multiTableInfos) != len(oldTableIDs) || - len(multiTableInfos) != len(oldSchemaNames) { + if len(multiTableInfos) != len(args.RenameTableInfos) { return true, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID) } @@ -553,28 +537,31 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err // 3. old table name and new table name do not match the filter rule, skip it. remainTables := make([]*timodel.TableInfo, 0, len(multiTableInfos)) snap := p.schemaStorage.GetLastSnapshot() + + argsForRemaining := &timodel.RenameTablesArgs{} for i, tableInfo := range multiTableInfos { + info := args.RenameTableInfos[i] var shouldDiscardOldTable, shouldDiscardNewTable bool oldTable, ok := snap.PhysicalTableByID(tableInfo.ID) if !ok { shouldDiscardOldTable = true } else { - shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O) + shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O) } - newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i]) + newSchemaName, ok := snap.SchemaByID(info.NewSchemaID) if !ok { // the new table name does not hit the filter rule, so we should discard the table. shouldDiscardNewTable = true } else { - shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O) + shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O) } if shouldDiscardOldTable && shouldDiscardNewTable { // skip a rename table ddl only when its old table name and new table name are both filtered. log.Info("RenameTables is filtered", zap.Int64("tableID", tableInfo.ID), - zap.String("schema", oldSchemaNames[i].O), + zap.String("schema", info.OldSchemaName.O), zap.String("query", job.Query)) continue } @@ -583,50 +570,26 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query) } // old table name matches the filter rule, remain it. + argsForRemaining.RenameTableInfos = append(argsForRemaining.RenameTableInfos, &timodel.RenameTableArgs{ + OldSchemaID: info.OldSchemaID, + NewSchemaID: info.NewSchemaID, + TableID: info.TableID, + NewTableName: info.NewTableName, + OldSchemaName: info.OldSchemaName, + OldTableName: info.OldTableName, + }) remainTables = append(remainTables, tableInfo) - remainOldSchemaIDs = append(remainOldSchemaIDs, oldSchemaIDs[i]) - remainNewSchemaIDs = append(remainNewSchemaIDs, newSchemaIDs[i]) - remainOldTableIDs = append(remainOldTableIDs, oldTableIDs[i]) - remainNewTableNames = append(remainNewTableNames, newTableNames[i]) - remainOldSchemaNames = append(remainOldSchemaNames, oldSchemaNames[i]) } if len(remainTables) == 0 { return true, nil } - newArgs := make([]json.RawMessage, 5) - v, err := json.Marshal(remainOldSchemaIDs) - if err != nil { - return true, errors.Trace(err) - } - newArgs[0] = v - v, err = json.Marshal(remainNewSchemaIDs) - if err != nil { - return true, errors.Trace(err) - } - newArgs[1] = v - v, err = json.Marshal(remainNewTableNames) - if err != nil { - return true, errors.Trace(err) - } - newArgs[2] = v - v, err = json.Marshal(remainOldTableIDs) - if err != nil { - return true, errors.Trace(err) - } - newArgs[3] = v - v, err = json.Marshal(remainOldSchemaNames) - if err != nil { - return true, errors.Trace(err) - } - newArgs[4] = v - - newRawArgs, err := json.Marshal(newArgs) + bakJob, err := entry.GetNewJobWithArgs(job, argsForRemaining) if err != nil { return true, errors.Trace(err) } - job.RawArgs = newRawArgs + job.RawArgs = bakJob.RawArgs job.BinlogInfo.MultipleTableInfos = remainTables return false, nil } diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index 63a25dee4e3..44edbc6a44e 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -198,8 +198,6 @@ func TestHandleRenameTable(t *testing.T) { waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test1.t1 to test1.t11, test1.t3 to test1.t33, test1.t5 to test1.t55, ignore1.a to ignore1.b") - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 skip, err := ddlJobPullerImpl.handleRenameTables(job) require.NoError(t, err) require.False(t, skip) @@ -210,8 +208,6 @@ func TestHandleRenameTable(t *testing.T) { { _ = helper.DDL2Job("create table test1.t6(id int primary key)") job := helper.DDL2Job("rename table test1.t2 to test1.t22, test1.t6 to test1.t66") - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 skip, err := ddlJobPullerImpl.handleRenameTables(job) require.Error(t, err) require.True(t, skip) @@ -243,8 +239,6 @@ func TestHandleRenameTable(t *testing.T) { waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test2.t1 to test2.t11, test2.t2 to test2.t22, test2.t3 to test2.t33") - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 skip, err := ddlJobPullerImpl.handleRenameTables(job) require.NoError(t, err) require.True(t, skip) @@ -269,8 +263,6 @@ func TestHandleRenameTable(t *testing.T) { waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table Test3.t1 to Test3.t11, Test3.t2 to Test3.t22") - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 skip, err := ddlJobPullerImpl.handleRenameTables(job) require.NoError(t, err) require.False(t, skip) @@ -343,8 +335,6 @@ func TestHandleRenameTable(t *testing.T) { // but now it will throw an error since schema ignore1 are not in schemaStorage // ref: https://github.com/pingcap/tiflow/issues/9488 job = helper.DDL2Job("rename table test1.t202308081 to ignore1.ignore1, test1.t202308082 to ignore1.dongmen") - // TODO REMOVE IT AFTER use args v2 decoder function - job.Version = timodel.JobVersion1 _, err = ddlJobPullerImpl.handleJob(job) require.NotNil(t, err) require.Contains(t, err.Error(), "ErrSnapshotSchemaNotFound")