Skip to content

Commit

Permalink
cdc: use typed args for renametables and adapt args v2 (#11617)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and wk989898 committed Sep 24, 2024
1 parent c36db6e commit 3947b71
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 183 deletions.
18 changes: 8 additions & 10 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/log"
timeta "github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -1085,28 +1084,27 @@ func (s *snapshot) alterPartitioning(job *timodel.Job) error {
}

func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []*pmodel.CIStr
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames)
args, err := timodel.GetRenameTablesArgs(job)
if err != nil {
return errors.Trace(err)
}
if len(job.BinlogInfo.MultipleTableInfos) < len(newTableNames) {
if len(job.BinlogInfo.MultipleTableInfos) < len(args.RenameTableInfos) {
return cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}
// NOTE: should handle failures in halfway better.
for _, tableID := range oldTableIDs {
if err := s.dropTable(tableID, currentTs); err != nil {
for _, info := range args.RenameTableInfos {
if err := s.dropTable(info.TableID, currentTs); err != nil {
return errors.Trace(err)
}
}
for i, tableInfo := range job.BinlogInfo.MultipleTableInfos {
newSchema, ok := s.schemaByID(newSchemaIDs[i])
info := args.RenameTableInfos[i]
newSchema, ok := s.schemaByID(info.NewSchemaID)
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(info.NewSchemaID)
}
newSchemaName := newSchema.Name.O
tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
tbInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
err = s.createTable(tbInfo, currentTs)
if err != nil {
return errors.Trace(err)
Expand Down
39 changes: 22 additions & 17 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/pkg/kv"
timodel "github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -472,28 +471,33 @@ func (s *schemaStorage) BuildDDLEvents(
return ddlEvents, nil
}

// GetNewJobWithArgs returns a new job with the given args
func GetNewJobWithArgs(job *timodel.Job, args timodel.JobArgs) (*timodel.Job, error) {
job.FillArgs(args)
bytes, err := job.Encode(true)
if err != nil {
return nil, errors.Trace(err)
}
encodedJob := &timodel.Job{}
if err = encodedJob.Decode(bytes); err != nil {
return nil, errors.Trace(err)
}
return encodedJob, nil
}

// TODO: find a better way to refactor this function.
// buildRenameEvents gets a list of DDLEvent from a rename tables DDL job.
func (s *schemaStorage) buildRenameEvents(
ctx context.Context, job *timodel.Job,
) ([]*model.DDLEvent, error) {
var (
oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
newTableNames, oldSchemaNames []*pmodel.CIStr
ddlEvents []*model.DDLEvent
)
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs,
&newTableNames, &oldTableIDs, &oldSchemaNames)
var ddlEvents []*model.DDLEvent
args, err := timodel.GetRenameTablesArgs(job)
if err != nil {
return nil, errors.Trace(err)
}

multiTableInfos := job.BinlogInfo.MultipleTableInfos
if len(multiTableInfos) != len(oldSchemaIDs) ||
len(multiTableInfos) != len(newSchemaIDs) ||
len(multiTableInfos) != len(newTableNames) ||
len(multiTableInfos) != len(oldTableIDs) ||
len(multiTableInfos) != len(oldSchemaNames) {
if len(multiTableInfos) != len(args.RenameTableInfos) {
return nil, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
}

Expand All @@ -503,21 +507,22 @@ func (s *schemaStorage) buildRenameEvents(
}

for i, tableInfo := range multiTableInfos {
newSchema, ok := preSnap.SchemaByID(newSchemaIDs[i])
info := args.RenameTableInfos[i]
newSchema, ok := preSnap.SchemaByID(info.NewSchemaID)
if !ok {
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
newSchemaIDs[i])
info.NewSchemaID)
}
newSchemaName := newSchema.Name.O
oldSchemaName := oldSchemaNames[i].O
oldSchemaName := info.OldSchemaName.O
event := new(model.DDLEvent)
preTableInfo, ok := preSnap.PhysicalTableByID(tableInfo.ID)
if !ok {
return nil, cerror.ErrSchemaStorageTableMiss.GenWithStackByArgs(
job.TableID)
}

tableInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName,
tableInfo := model.WrapTableInfo(info.NewSchemaID, newSchemaName,
job.BinlogInfo.FinishedTS, tableInfo)
event.FromJobWithArgs(job, preTableInfo, tableInfo, oldSchemaName, newSchemaName)
ddlEvents = append(ddlEvents, event)
Expand Down
40 changes: 22 additions & 18 deletions cdc/entry/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package entry

import (
"context"
"encoding/json"
"fmt"
"sort"
"testing"
Expand Down Expand Up @@ -328,26 +327,31 @@ func TestBuildDDLEventsFromRenameTablesDDL(t *testing.T) {
// rename test.t1 and test.t2
job = helper.DDL2Job(
"rename table test1.t1 to test1.t10, test1.t2 to test1.t20")
oldSchemaIDs := []int64{schemaID, schemaID}
oldTableIDs := []int64{t1TableID, t2TableID}
newSchemaIDs := oldSchemaIDs
oldSchemaNames := []pmodel.CIStr{
pmodel.NewCIStr("test1"),
pmodel.NewCIStr("test1"),
}
newTableNames := []pmodel.CIStr{
pmodel.NewCIStr("t10"),
pmodel.NewCIStr("t20"),
}
args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
args := &timodel.RenameTablesArgs{
RenameTableInfos: []*timodel.RenameTableArgs{
{
OldSchemaID: schemaID,
NewSchemaID: schemaID,
NewTableName: pmodel.NewCIStr("t10"),
TableID: t1TableID,
OldSchemaName: pmodel.NewCIStr("test1"),
OldTableName: pmodel.NewCIStr("oldt10"),
},
{
OldSchemaID: schemaID,
NewSchemaID: schemaID,
NewTableName: pmodel.NewCIStr("t20"),
TableID: t2TableID,
OldSchemaName: pmodel.NewCIStr("test1"),
OldTableName: pmodel.NewCIStr("oldt20"),
},
},
}
rawArgs, err := json.Marshal(args)
require.Nil(t, err)
// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
job.RawArgs = rawArgs
bakJob, err := GetNewJobWithArgs(job, args)
require.Nil(t, err)
job.RawArgs = bakJob.RawArgs
schema.AdvanceResolvedTs(job.BinlogInfo.FinishedTS - 1)
events, err = schema.BuildDDLEvents(ctx, job)
require.Nil(t, err)
Expand Down
68 changes: 24 additions & 44 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package entry

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -121,37 +121,27 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
return res
}

// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// the RawArgs field in job fetched from tidb snapshot meta is cleared out after the job is done,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
args := &timodel.RenameTablesArgs{
RenameTableInfos: make([]*timodel.RenameTableArgs, 0, tableNum),
}
newTableNames := make([]pmodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]pmodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = pmodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
args.RenameTableInfos = append(args.RenameTableInfos, &timodel.RenameTableArgs{
OldSchemaID: res.SchemaID,
NewSchemaID: res.SchemaID,
TableID: res.BinlogInfo.MultipleTableInfos[i].ID,
NewTableName: res.BinlogInfo.MultipleTableInfos[i].Name,
OldSchemaName: pmodel.NewCIStr(schema),
OldTableName: pmodel.NewCIStr(fmt.Sprintf("old_%d", i)),
})
}
rawArgs, err := json.Marshal(args)
res, err = GetNewJobWithArgs(res, args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
return res
}

Expand Down Expand Up @@ -234,31 +224,21 @@ func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
args := &timodel.RenameTablesArgs{
RenameTableInfos: make([]*timodel.RenameTableArgs, 0, tableNum),
}
newTableNames := make([]pmodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]pmodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = pmodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
args.RenameTableInfos = append(args.RenameTableInfos, &timodel.RenameTableArgs{
OldSchemaID: res.SchemaID,
NewSchemaID: res.SchemaID,
NewTableName: res.BinlogInfo.MultipleTableInfos[i].Name,
TableID: res.BinlogInfo.MultipleTableInfos[i].ID,
OldSchemaName: pmodel.NewCIStr(schema),
OldTableName: pmodel.NewCIStr("old" + res.BinlogInfo.MultipleTableInfos[i].Name.L),
})
}
rawArgs, err := json.Marshal(args)
res, err = GetNewJobWithArgs(res, args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
}

err = s.schemaStorage.HandleDDLJob(res)
Expand Down
50 changes: 23 additions & 27 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package owner

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -172,8 +171,7 @@ func TestExecRenameTablesDDL(t *testing.T) {
dm := createDDLManagerForTest(t, false)
mockDDLSink := dm.ddlSink.(*mockDDLSink)

var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []pmodel.CIStr
var oldSchemaIDs, oldTableIDs []int64

execCreateStmt := func(tp, actualDDL, expectedDDL string) {
mockDDLSink.ddlDone = false
Expand Down Expand Up @@ -214,35 +212,33 @@ func TestExecRenameTablesDDL(t *testing.T) {

require.Len(t, oldSchemaIDs, 2)
require.Len(t, oldTableIDs, 2)
newSchemaIDs = []int64{oldSchemaIDs[1], oldSchemaIDs[0]}
oldSchemaNames = []pmodel.CIStr{
pmodel.NewCIStr("test1"),
pmodel.NewCIStr("test2"),
}
newTableNames = []pmodel.CIStr{
pmodel.NewCIStr("tb20"),
pmodel.NewCIStr("tb10"),
}
oldTableNames := []pmodel.CIStr{
pmodel.NewCIStr("oldtb20"),
pmodel.NewCIStr("oldtb10"),
}
require.Len(t, newSchemaIDs, 2)
require.Len(t, oldSchemaNames, 2)
require.Len(t, newTableNames, 2)
args := []interface{}{
oldSchemaIDs, newSchemaIDs, newTableNames,
oldTableIDs, oldSchemaNames, oldTableNames,
args := &timodel.RenameTablesArgs{
RenameTableInfos: []*timodel.RenameTableArgs{
{
OldSchemaID: oldSchemaIDs[0],
NewSchemaID: oldSchemaIDs[1],
NewTableName: pmodel.NewCIStr("tb20"),
TableID: oldTableIDs[0],
OldSchemaName: pmodel.NewCIStr("test1"),
OldTableName: pmodel.NewCIStr("oldtb20"),
},
{
OldSchemaID: oldSchemaIDs[1],
NewSchemaID: oldSchemaIDs[0],
NewTableName: pmodel.NewCIStr("tb10"),
TableID: oldTableIDs[1],
OldSchemaName: pmodel.NewCIStr("test2"),
OldTableName: pmodel.NewCIStr("oldtb10"),
},
},
}
rawArgs, err := json.Marshal(args)
require.Nil(t, err)
job := helper.DDL2Job(
"rename table test1.tb1 to test2.tb10, test2.tb2 to test1.tb20")
// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
job.RawArgs = rawArgs
// TODO REMOVE IT AFTER use args v2 decoder function
job.Version = timodel.JobVersion1
var err error
job, err = entry.GetNewJobWithArgs(job, args)
require.Nil(t, err)

mockDDLSink.recordDDLHistory = true
mockDDLSink.ddlDone = false
Expand Down
Loading

0 comments on commit 3947b71

Please sign in to comment.