diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 7f47605c3b7e5..ffedb518208c7 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -170,8 +170,8 @@ type Client struct { // this feature is controlled by flag with-sys-table fullClusterRestore bool // the query to insert rows into table `gc_delete_range`, lack of ts. - deleteRangeQuery []string - deleteRangeQueryCh chan string + deleteRangeQuery []*stream.PreDelRangeQuery + deleteRangeQueryCh chan *stream.PreDelRangeQuery deleteRangeQueryWaitGroup sync.WaitGroup // see RestoreCommonConfig.WithSysTable @@ -204,8 +204,8 @@ func NewRestoreClient( tlsConf: tlsConf, keepaliveConf: keepaliveConf, switchCh: make(chan struct{}), - deleteRangeQuery: make([]string, 0), - deleteRangeQueryCh: make(chan string, 10), + deleteRangeQuery: make([]*stream.PreDelRangeQuery, 0), + deleteRangeQueryCh: make(chan *stream.PreDelRangeQuery, 10), } } @@ -2797,7 +2797,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{ Name: newTableInfo.Name.O, TableID: newTableInfo.ID, - PartitionMap: getTableIDMap(newTableInfo, t.Info), + PartitionMap: getPartitionIDMap(newTableInfo, t.Info), IndexMap: getIndexIDMap(newTableInfo, t.Info), } } @@ -2824,7 +2824,7 @@ func (rc *Client) InitSchemasReplaceForDDL( rp := stream.NewSchemasReplace( dbReplaces, needConstructIdMap, cfg.TiFlashRecorder, rc.currentTS, cfg.TableFilter, rc.GenGlobalID, rc.GenGlobalIDs, - rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex) + rc.RecordDeleteRange) return rp, nil } @@ -3428,66 +3428,8 @@ NEXTSQL: return nil } -const ( - insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES ` - insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)" - - batchInsertDeleteRangeSize = 256 -) - -// InsertDeleteRangeForTable generates query to insert table delete job into table `gc_delete_range`. -func (rc *Client) InsertDeleteRangeForTable(jobID int64, tableIDs []int64) { - var elementID int64 = 1 - var tableID int64 - for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize { - batchEnd := len(tableIDs) - if batchEnd > i+batchInsertDeleteRangeSize { - batchEnd = i + batchInsertDeleteRangeSize - } - - var buf strings.Builder - buf.WriteString(insertDeleteRangeSQLPrefix) - for j := i; j < batchEnd; j++ { - tableID = tableIDs[j] - startKey := tablecodec.EncodeTablePrefix(tableID) - endKey := tablecodec.EncodeTablePrefix(tableID + 1) - startKeyEncoded := hex.EncodeToString(startKey) - endKeyEncoded := hex.EncodeToString(endKey) - buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, elementID, startKeyEncoded, endKeyEncoded)) - if j != batchEnd-1 { - buf.WriteString(",") - } - elementID += 1 - } - rc.deleteRangeQueryCh <- buf.String() - } -} - -// InsertDeleteRangeForIndex generates query to insert index delete job into table `gc_delete_range`. -func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64) { - var indexID int64 - for i := 0; i < len(indexIDs); i += batchInsertDeleteRangeSize { - batchEnd := len(indexIDs) - if batchEnd > i+batchInsertDeleteRangeSize { - batchEnd = i + batchInsertDeleteRangeSize - } - - var buf strings.Builder - buf.WriteString(insertDeleteRangeSQLPrefix) - for j := i; j < batchEnd; j++ { - indexID = indexIDs[j] - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - startKeyEncoded := hex.EncodeToString(startKey) - endKeyEncoded := hex.EncodeToString(endKey) - buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, *elementID, startKeyEncoded, endKeyEncoded)) - if j != batchEnd-1 { - buf.WriteString(",") - } - *elementID += 1 - } - rc.deleteRangeQueryCh <- buf.String() - } +func (rc *Client) RecordDeleteRange(sql *stream.PreDelRangeQuery) { + rc.deleteRangeQueryCh <- sql } // use channel to save the delete-range query to make it thread-safety. @@ -3518,8 +3460,28 @@ func (rc *Client) InsertGCRows(ctx context.Context) error { if err != nil { return errors.Trace(err) } + jobIDMap := make(map[int64]int64) for _, query := range rc.deleteRangeQuery { - if err := rc.db.se.ExecuteInternal(ctx, fmt.Sprintf(query, ts)); err != nil { + paramsList := make([]interface{}, 0, len(query.ParamsList)*5) + for _, params := range query.ParamsList { + newJobID, exists := jobIDMap[params.JobID] + if !exists { + newJobID, err = rc.GenGlobalID(ctx) + if err != nil { + return errors.Trace(err) + } + jobIDMap[params.JobID] = newJobID + } + log.Info("insert into the delete range", + zap.Int64("jobID", newJobID), + zap.Int64("elemID", params.ElemID), + zap.String("startKey", params.StartKey), + zap.String("endKey", params.EndKey), + zap.Uint64("ts", ts)) + // (job_id, elem_id, start_key, end_key, ts) + paramsList = append(paramsList, newJobID, params.ElemID, params.StartKey, params.EndKey, ts) + } + if err := rc.db.se.ExecuteInternal(ctx, query.Sql, paramsList...); err != nil { return errors.Trace(err) } } @@ -3527,7 +3489,7 @@ func (rc *Client) InsertGCRows(ctx context.Context) error { } // only for unit test -func (rc *Client) GetGCRows() []string { +func (rc *Client) GetGCRows() []*stream.PreDelRangeQuery { close(rc.deleteRangeQueryCh) rc.deleteRangeQueryWaitGroup.Wait() return rc.deleteRangeQuery diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index eeec487895fe7..7d2cd38789dd6 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -619,18 +619,40 @@ func TestDeleteRangeQuery(t *testing.T) { client.RunGCRowsLoader(ctx) - client.InsertDeleteRangeForTable(2, []int64{3}) - client.InsertDeleteRangeForTable(4, []int64{5, 6}) - - elementID := int64(1) - client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1}) - client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2}) + client.RecordDeleteRange(&stream.PreDelRangeQuery{ + Sql: "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)", + ParamsList: []stream.DelRangeParams{ + { + JobID: 1, + ElemID: 1, + StartKey: "a", + EndKey: "b", + }, + { + JobID: 1, + ElemID: 2, + StartKey: "b", + EndKey: "c", + }, + }, + }) querys := client.GetGCRows() - require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)") - require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)") - require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)") - require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") + require.Equal(t, len(querys), 1) + require.Equal(t, querys[0].Sql, "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)") + require.Equal(t, len(querys[0].ParamsList), 2) + require.Equal(t, querys[0].ParamsList[0], stream.DelRangeParams{ + JobID: 1, + ElemID: 1, + StartKey: "a", + EndKey: "b", + }) + require.Equal(t, querys[0].ParamsList[1], stream.DelRangeParams{ + JobID: 1, + ElemID: 2, + StartKey: "b", + EndKey: "c", + }) } func MockEmptySchemasReplace() *stream.SchemasReplace { @@ -644,7 +666,6 @@ func MockEmptySchemasReplace() *stream.SchemasReplace { nil, nil, nil, - nil, ) } diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go index 277a031b050d3..2a2deed34cb1d 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder.go +++ b/br/pkg/restore/ingestrec/ingest_recorder.go @@ -83,8 +83,8 @@ func notSynced(job *model.Job, isSubJob bool) bool { return (job.State != model.JobStateSynced) && !(isSubJob && job.State == model.JobStateDone) } -// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder. -func (i *IngestRecorder) AddJob(job *model.Job, isSubJob bool) error { +// TryAddJob firstly filters the ingest index add operation job, and records it into IngestRecorder. +func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error { if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job, isSubJob) { return nil } diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go index eaacde6e73c1c..71a6ada559569 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -138,7 +138,7 @@ func TestAddIngestRecorder(t *testing.T) { } recorder := ingestrec.New() // no ingest job, should ignore it - err := recorder.AddJob(fakeJob( + err := recorder.TryAddJob(fakeJob( model.ReorgTypeTxn, model.ActionAddIndex, model.JobStateSynced, @@ -154,7 +154,7 @@ func TestAddIngestRecorder(t *testing.T) { require.NoError(t, err) // no add-index job, should ignore it - err = recorder.AddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionDropIndex, model.JobStateSynced, @@ -170,7 +170,7 @@ func TestAddIngestRecorder(t *testing.T) { require.NoError(t, err) // no synced job, should ignore it - err = recorder.AddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateRollbackDone, @@ -188,7 +188,7 @@ func TestAddIngestRecorder(t *testing.T) { { recorder := ingestrec.New() // a normal ingest add index job - err = recorder.AddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateSynced, @@ -209,7 +209,7 @@ func TestAddIngestRecorder(t *testing.T) { { recorder := ingestrec.New() // a normal ingest add primary index job - err = recorder.AddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddPrimaryKey, model.JobStateSynced, @@ -229,7 +229,7 @@ func TestAddIngestRecorder(t *testing.T) { { // a sub job as add primary index job - err = recorder.AddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddPrimaryKey, model.JobStateDone, @@ -304,7 +304,7 @@ func TestIndexesKind(t *testing.T) { } recorder := ingestrec.New() - err := recorder.AddJob(fakeJob( + err := recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateSynced, @@ -382,7 +382,7 @@ func TestRewriteTableID(t *testing.T) { }, } recorder := ingestrec.New() - err := recorder.AddJob(fakeJob( + err := recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateSynced, diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index a94aa4f24f29c..e554f92d21afd 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -43,8 +43,8 @@ type AppliedFile interface { GetEndKey() []byte } -// getTableIDMap creates a map maping old tableID to new tableID. -func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { +// getPartitionIDMap creates a map maping old physical ID to new physical ID. +func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { tableIDMap := make(map[int64]int64) if oldTable.Partition != nil && newTable.Partition != nil { @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { } } + return tableIDMap +} + +// getTableIDMap creates a map maping old tableID to new tableID. +func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 { + tableIDMap := getPartitionIDMap(newTable, oldTable) tableIDMap[oldTable.ID] = newTable.ID return tableIDMap } diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index fc013f01bd3a1..4301e64f2973e 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//br/pkg/storage", "//br/pkg/streamhelper", "//br/pkg/utils", + "//pkg/ddl", "//pkg/kv", "//pkg/meta", "//pkg/parser/model", @@ -54,10 +55,11 @@ go_test( ], embed = [":stream"], flaky = True, - shard_count = 24, + shard_count = 25, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", + "//pkg/ddl", "//pkg/meta", "//pkg/parser/ast", "//pkg/parser/model", diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 5366b60150d40..8bda2427fffd4 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -23,9 +23,9 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" @@ -71,15 +71,14 @@ type SchemasReplace struct { globalTableIdMap map[UpstreamID]DownstreamID needConstructIdMap bool - ingestRecorder *ingestrec.IngestRecorder - TiflashRecorder *tiflashrec.TiFlashRecorder - RewriteTS uint64 // used to rewrite commit ts in meta kv. - TableFilter filter.Filter // used to filter schema/table + delRangeRecorder *brDelRangeExecWrapper + ingestRecorder *ingestrec.IngestRecorder + TiflashRecorder *tiflashrec.TiFlashRecorder + RewriteTS uint64 // used to rewrite commit ts in meta kv. + TableFilter filter.Filter // used to filter schema/table - genGenGlobalID func(ctx context.Context) (int64, error) - genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error) - insertDeleteRangeForTable func(jobID int64, tableIDs []int64) - insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64) + genGenGlobalID func(ctx context.Context) (int64, error) + genGenGlobalIDs func(ctx context.Context, n int) ([]int64, error) AfterTableRewritten func(deleted bool, tableInfo *model.TableInfo) } @@ -112,8 +111,7 @@ func NewSchemasReplace( tableFilter filter.Filter, genID func(ctx context.Context) (int64, error), genIDs func(ctx context.Context, n int) ([]int64, error), - insertDeleteRangeForTable func(jobID int64, tableIDs []int64), - insertDeleteRangeForIndex func(jobID int64, elementID *int64, tableID int64, indexIDs []int64), + recordDeleteRange func(*PreDelRangeQuery), ) *SchemasReplace { globalTableIdMap := make(map[UpstreamID]DownstreamID) for _, dr := range dbMap { @@ -126,17 +124,16 @@ func NewSchemasReplace( } return &SchemasReplace{ - DbMap: dbMap, - globalTableIdMap: globalTableIdMap, - needConstructIdMap: needConstructIdMap, - ingestRecorder: ingestrec.New(), - TiflashRecorder: tiflashRecorder, - RewriteTS: restoreTS, - TableFilter: tableFilter, - genGenGlobalID: genID, - genGenGlobalIDs: genIDs, - insertDeleteRangeForTable: insertDeleteRangeForTable, - insertDeleteRangeForIndex: insertDeleteRangeForIndex, + DbMap: dbMap, + globalTableIdMap: globalTableIdMap, + needConstructIdMap: needConstructIdMap, + delRangeRecorder: newDelRangeExecWrapper(globalTableIdMap, recordDeleteRange), + ingestRecorder: ingestrec.New(), + TiflashRecorder: tiflashRecorder, + RewriteTS: restoreTS, + TableFilter: tableFilter, + genGenGlobalID: genID, + genGenGlobalIDs: genIDs, } } @@ -650,7 +647,7 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err return nil, nil } - return nil, sr.restoreFromHistory(job, false) + return nil, sr.restoreFromHistory(job) } return nil, nil } @@ -679,349 +676,90 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err return nil, nil } -func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) error { - if !job.IsCancelled() { - switch job.Type { - case model.ActionAddIndex, model.ActionAddPrimaryKey: - if job.State == model.JobStateRollbackDone { - return sr.deleteRange(job) - } - err := sr.ingestRecorder.AddJob(job, isSubJob) - return errors.Trace(err) - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, - model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, - model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: - return sr.deleteRange(job) - case model.ActionMultiSchemaChange: - for i, sub := range job.MultiSchemaInfo.SubJobs { - proxyJob := sub.ToProxyJob(job, i) - // ASSERT: the proxyJob can not be MultiSchemaInfo anymore - if err := sr.restoreFromHistory(&proxyJob, true); err != nil { - return err - } - } +func (sr *SchemasReplace) tryRecordIngestIndex(job *model.Job) error { + if job.Type != model.ActionMultiSchemaChange { + return sr.ingestRecorder.TryAddJob(job, false) + } + + for i, sub := range job.MultiSchemaInfo.SubJobs { + proxyJob := sub.ToProxyJob(job, i) + // ASSERT: the proxyJob can not be MultiSchemaInfo anymore + if err := sr.ingestRecorder.TryAddJob(&proxyJob, true); err != nil { + return err } } return nil } -func (sr *SchemasReplace) deleteRange(job *model.Job) error { - lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range")) - dbReplace, exist := sr.DbMap[job.SchemaID] - if !exist { - // skip this mddljob, the same below - logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) - return nil - } - - // allocate a new fake job id to avoid row conflicts in table `gc_delete_range` - newJobID, err := sr.genGenGlobalID(context.Background()) - if err != nil { - return errors.Trace(err) - } - - switch job.Type { - case model.ActionDropSchema: - var tableIDs []int64 - if err := job.DecodeArgs(&tableIDs); err != nil { - return errors.Trace(err) - } - // Note: tableIDs contains partition ids, cannot directly use dbReplace.TableMap - /* TODO: use global ID replace map - * - * for i := 0; i < len(tableIDs); i++ { - * tableReplace, exist := dbReplace.TableMap[tableIDs[i]] - * if !exist { - * return errors.Errorf("DropSchema: try to drop a non-existent table, missing oldTableID") - * } - * tableIDs[i] = tableReplace.NewTableID - * } - */ - - argsSet := make(map[int64]struct{}, len(tableIDs)) - for _, tableID := range tableIDs { - argsSet[tableID] = struct{}{} - } - - newTableIDs := make([]int64, 0, len(tableIDs)) - for tableID, tableReplace := range dbReplace.TableMap { - if _, exist := argsSet[tableID]; !exist { - logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args", - zap.Int64("oldTableID", tableID)) - continue - } - newTableIDs = append(newTableIDs, tableReplace.TableID) - for partitionID, newPartitionID := range tableReplace.PartitionMap { - if _, exist := argsSet[partitionID]; !exist { - logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args", - zap.Int64("oldPartitionID", partitionID)) - continue - } - newTableIDs = append(newTableIDs, newPartitionID) - } - } - - if len(newTableIDs) != len(tableIDs) { - logutil.CL(lctx).Warn( - "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") - // only drop newTableIDs' ranges - } - - if len(newTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, newTableIDs) - } - - return nil - // Truncate will generates new id for table or partition, so ts can be large enough - case model.ActionDropTable, model.ActionTruncateTable: - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", - zap.Int64("oldTableID", job.TableID)) - return nil - } - - // The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with. - var startKey kv.Key // unused - var physicalTableIDs []int64 - var ruleIDs []string // unused - if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil { - return errors.Trace(err) - } - if len(physicalTableIDs) > 0 { - newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) - // delete partition id - for _, oldPid := range physicalTableIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", oldPid)) - continue - } - newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) - } - if len(newPhysicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) - } - return nil +func (sr *SchemasReplace) restoreFromHistory(job *model.Job) error { + if ddl.JobNeedGC(job) { + if err := ddl.AddDelRangeJobInternal(context.TODO(), sr.delRangeRecorder, job); err != nil { + return err } + } - sr.insertDeleteRangeForTable(newJobID, []int64{tableReplace.TableID}) - return nil - case model.ActionDropTablePartition, model.ActionTruncateTablePartition: - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn( - "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", - zap.Int64("oldTableID", job.TableID)) - return nil - } - var physicalTableIDs []int64 - if err := job.DecodeArgs(&physicalTableIDs); err != nil { - return errors.Trace(err) - } + return sr.tryRecordIngestIndex(job) +} - newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) - for _, oldPid := range physicalTableIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn( - "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", oldPid)) - continue - } - newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) - } - if len(newPhysicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) - } - return nil - // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. - case model.ActionAddIndex, model.ActionAddPrimaryKey: - // iff job.State = model.JobStateRollbackDone - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", - zap.Int64("oldTableID", job.TableID)) - return nil - } - indexIDs := make([]int64, 1) - ifExists := make([]bool, 1) - var partitionIDs []int64 - if err := job.DecodeArgs(&indexIDs[0], &ifExists[0], &partitionIDs); err != nil { - if err = job.DecodeArgs(&indexIDs, &ifExists, &partitionIDs); err != nil { - return errors.Trace(err) - } - } +type DelRangeParams struct { + JobID int64 + ElemID int64 + StartKey string + EndKey string +} - var elementID int64 = 1 +type PreDelRangeQuery struct { + Sql string + ParamsList []DelRangeParams +} - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn( - "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", oldPid)) - continue - } +type brDelRangeExecWrapper struct { + globalTableIdMap map[UpstreamID]DownstreamID - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - return nil - case model.ActionDropIndex, model.ActionDropPrimaryKey: - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } + recordDeleteRange func(*PreDelRangeQuery) - var indexName interface{} - var ifExists bool - var indexID int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil { - return errors.Trace(err) - } + // temporary values + query *PreDelRangeQuery +} - var elementID int64 = 1 - indexIDs := []int64{indexID} +func newDelRangeExecWrapper( + globalTableIdMap map[UpstreamID]DownstreamID, + recordDeleteRange func(*PreDelRangeQuery), +) *brDelRangeExecWrapper { + return &brDelRangeExecWrapper{ + globalTableIdMap: globalTableIdMap, + recordDeleteRange: recordDeleteRange, - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - // len(indexIDs) = 1 - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - return nil - case model.ActionDropIndexes: // // Deprecated, we use ActionMultiSchemaChange instead. - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - // Remove data in TiKV. - if len(indexIDs) == 0 { - return nil - } + query: nil, + } +} - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } +// UpdateTSOForJob just does nothing. BR would generate ts after log restore done. +func (bdr *brDelRangeExecWrapper) UpdateTSOForJob() error { + return nil +} - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - return nil - case model.ActionDropColumn: - var colName model.CIStr - var ifExists bool - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - if len(indexIDs) > 0 { - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } +func (bdr *brDelRangeExecWrapper) PrepareParamsList(sz int) { + bdr.query = &PreDelRangeQuery{ + ParamsList: make([]DelRangeParams, 0, sz), + } +} - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - } - return nil - case model.ActionDropColumns: // Deprecated, we use ActionMultiSchemaChange instead. - var colNames []model.CIStr - var ifExists []bool - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - if len(indexIDs) > 0 { - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } +func (bdr *brDelRangeExecWrapper) RewriteTableID(tableID int64) (int64, bool) { + tableID, exists := bdr.globalTableIdMap[tableID] + if !exists { + log.Warn("failed to find the downstream id when rewrite delete range", zap.Int64("tableID", tableID)) + } + return tableID, exists +} - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - } - case model.ActionModifyColumn: - var indexIDs []int64 - var partitionIDs []int64 - if err := job.DecodeArgs(&indexIDs, &partitionIDs); err != nil { - return errors.Trace(err) - } - if len(indexIDs) == 0 { - return nil - } - tableReplace, exist := dbReplace.TableMap[job.TableID] - if !exist { - logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) - return nil - } +func (bdr *brDelRangeExecWrapper) AppendParamsList(jobID, elemID int64, startKey, endKey string) { + bdr.query.ParamsList = append(bdr.query.ParamsList, DelRangeParams{jobID, elemID, startKey, endKey}) +} - var elementID int64 = 1 - if len(partitionIDs) > 0 { - for _, oldPid := range partitionIDs { - newPid, exist := tableReplace.PartitionMap[oldPid] - if !exist { - logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) - continue - } - sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) - } - } else { - sr.insertDeleteRangeForIndex(newJobID, &elementID, tableReplace.TableID, indexIDs) - } - } +func (bdr *brDelRangeExecWrapper) ConsumeDeleteRange(ctx context.Context, sql string) error { + bdr.query.Sql = sql + bdr.recordDeleteRange(bdr.query) + bdr.query = nil return nil } diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index d09e137ddae61..104bd3732a113 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -4,13 +4,16 @@ package stream import ( "context" + "encoding/hex" "encoding/json" "testing" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" @@ -23,8 +26,10 @@ func mockGenGenGlobalID(ctx context.Context) (int64, error) { return increaseID, nil } -func MockEmptySchemasReplace(midr *mockInsertDeleteRange) *SchemasReplace { - dbMap := make(map[UpstreamID]*DBReplace) +func MockEmptySchemasReplace(midr *mockInsertDeleteRange, dbMap map[UpstreamID]*DBReplace) *SchemasReplace { + if dbMap == nil { + dbMap = make(map[UpstreamID]*DBReplace) + } if midr == nil { midr = newMockInsertDeleteRange() } @@ -36,8 +41,7 @@ func MockEmptySchemasReplace(midr *mockInsertDeleteRange) *SchemasReplace { filter.All(), mockGenGenGlobalID, nil, - midr.mockInsertDeleteRangeForTable, - midr.mockInsertDeleteRangeForIndex, + midr.mockRecordDeleteRange, ) } @@ -79,7 +83,7 @@ func TestTidySchemaMaps(t *testing.T) { drs[oldDBID] = dr // create schemas replace and test TidySchemaMaps(). - sr := NewSchemasReplace(drs, true, nil, 0, filter.All(), nil, nil, nil, nil) + sr := NewSchemasReplace(drs, true, nil, 0, filter.All(), nil, nil, nil) globalTableIdMap := sr.globalTableIdMap require.Equal(t, len(globalTableIdMap), 3) require.Equal(t, globalTableIdMap[oldTblID], newTblID) @@ -126,7 +130,7 @@ func TestRewriteKeyForDB(t *testing.T) { encodedKey := encodeTxnMetaKey(mDbs, meta.DBkey(dbID), ts) // create schemasReplace. - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) // preConstruct Map information. sr.SetPreConstructMapStatus() @@ -169,7 +173,7 @@ func TestRewriteDBInfo(t *testing.T) { require.Nil(t, err) // create schemasReplace. - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) // rewrite it directly without preConstruct Map, it will get failed result. sr.SetRestoreKVStatus() @@ -237,7 +241,7 @@ func TestRewriteKeyForTable(t *testing.T) { for _, ca := range cases { encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts) // create schemasReplace. - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) // set preConstruct status and construct map information. sr.SetPreConstructMapStatus() @@ -292,7 +296,7 @@ func TestRewriteTableInfo(t *testing.T) { require.Nil(t, err) // create schemasReplace. - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) tableCount := 0 sr.AfterTableRewritten = func(deleted bool, tableInfo *model.TableInfo) { tableCount++ @@ -370,7 +374,7 @@ func TestRewriteTableInfoForPartitionTable(t *testing.T) { require.Nil(t, err) // create schemasReplace, and preConstructMap. - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) sr.SetPreConstructMapStatus() newValue, err := sr.rewriteTableInfo(value, dbId) require.Nil(t, err) @@ -496,7 +500,6 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { mockGenGenGlobalID, nil, nil, - nil, ) sr.SetRestoreKVStatus() //exchange partition, t1 parition0 with the t2 @@ -557,7 +560,7 @@ func TestRewriteTableInfoForTTLTable(t *testing.T) { require.Nil(t, err) // create empty schemasReplace - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) // preConsutruct Map information. sr.SetPreConstructMapStatus() @@ -583,7 +586,7 @@ func TestRewriteTableInfoForTTLTable(t *testing.T) { func TestIsPreConsturctMapStatus(t *testing.T) { // create empty schemasReplace - sr := MockEmptySchemasReplace(nil) + sr := MockEmptySchemasReplace(nil, nil) sr.SetPreConstructMapStatus() require.True(t, sr.IsPreConsturctMapStatus()) require.False(t, sr.IsRestoreKVStatus()) @@ -626,35 +629,67 @@ var ( mDDLJobPartition2NewID: {}, mDDLJobTable1NewID: {}, } + mDDLJobALLNewTableKeySet = map[string]struct{}{ + encodeTableKey(mDDLJobTable0NewID): {}, + encodeTableKey(mDDLJobPartition0NewID): {}, + encodeTableKey(mDDLJobPartition1NewID): {}, + encodeTableKey(mDDLJobPartition2NewID): {}, + encodeTableKey(mDDLJobTable1NewID): {}, + } mDDLJobALLNewPartitionIDSet = map[int64]struct{}{ mDDLJobPartition0NewID: {}, mDDLJobPartition1NewID: {}, mDDLJobPartition2NewID: {}, } + mDDLJobALLNewPartitionKeySet = map[string]struct{}{ + encodeTableKey(mDDLJobPartition0NewID): {}, + encodeTableKey(mDDLJobPartition1NewID): {}, + encodeTableKey(mDDLJobPartition2NewID): {}, + } + mDDLJobALLNewPartitionIndex2KeySet = map[string]struct{}{ + encodeTableIndexKey(mDDLJobPartition0NewID, 2): {}, + encodeTableIndexKey(mDDLJobPartition1NewID, 2): {}, + encodeTableIndexKey(mDDLJobPartition2NewID, 2): {}, + } + mDDLJobALLNewPartitionIndex3KeySet = map[string]struct{}{ + encodeTableIndexKey(mDDLJobPartition0NewID, 3): {}, + encodeTableIndexKey(mDDLJobPartition1NewID, 3): {}, + encodeTableIndexKey(mDDLJobPartition2NewID, 3): {}, + } + tempIndex2 = tablecodec.TempIndexPrefix | int64(2) + mDDLJobALLNewPartitionTempIndex2KeySet = map[string]struct{}{ + encodeTableIndexKey(mDDLJobPartition0NewID, tempIndex2): {}, + encodeTableIndexKey(mDDLJobPartition1NewID, tempIndex2): {}, + encodeTableIndexKey(mDDLJobPartition2NewID, tempIndex2): {}, + } mDDLJobALLIndexesIDSet = map[int64]struct{}{ 2: {}, 3: {}, } + mDDLJobAllIndexesKeySet = []map[string]struct{}{ + mDDLJobALLNewPartitionIndex2KeySet, mDDLJobALLNewPartitionIndex3KeySet, + } ) var ( - dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} - dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} - dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} - dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} - rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} - rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} - dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} - dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} - dropTable0IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1IndexesJob = &model.Job{Type: model.ActionDropIndexes, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} - dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} - dropTable0ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[72,73,74]]`)} - dropTable1ColumnsJob = &model.Job{Type: model.ActionDropColumns, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[],[],[2,3],[]]`)} - modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} - modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} - multiSchemaChangeJob0 = &model.Job{ + dropSchemaJob = &model.Job{Type: model.ActionDropSchema, SchemaID: mDDLJobDBOldID, RawArgs: json.RawMessage(`[[71,72,73,74,75]]`)} + dropTable0Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)} + dropTable1Job = &model.Job{Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)} + dropTable0Partition1Job = &model.Job{Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + reorganizeTable0Partition1Job = &model.Job{Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + removeTable0Partition1Job = &model.Job{Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + alterTable0Partition1Job = &model.Job{Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)} + rollBackTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + rollBackTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + addTable0IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)} + addTable1IndexJob = &model.Job{Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)} + dropTable0IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,2,[72,73,74]]`)} + dropTable1IndexJob = &model.Job{Type: model.ActionDropIndex, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,2,[]]`)} + dropTable0ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",false,[2,3],[72,73,74]]`)} + dropTable1ColumnJob = &model.Job{Type: model.ActionDropColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",false,[2,3],[]]`)} + modifyTable0ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[2,3],[72,73,74]]`)} + modifyTable1ColumnJob = &model.Job{Type: model.ActionModifyColumn, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[[2,3],[]]`)} + multiSchemaChangeJob0 = &model.Job{ Type: model.ActionMultiSchemaChange, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, @@ -662,11 +697,11 @@ var ( SubJobs: []*model.SubJob{ { Type: model.ActionDropIndex, - RawArgs: json.RawMessage(`[{"O":"k1","L":"k1"},false,1,[72,73,74]]`), + RawArgs: json.RawMessage(`[{"O":"k1","L":"k1"},false,2,[72,73,74]]`), }, { Type: model.ActionDropIndex, - RawArgs: json.RawMessage(`[{"O":"k2","L":"k2"},false,2,[72,73,74]]`), + RawArgs: json.RawMessage(`[{"O":"k2","L":"k2"},false,3,[72,73,74]]`), }, }, }, @@ -679,55 +714,44 @@ var ( SubJobs: []*model.SubJob{ { Type: model.ActionDropIndex, - RawArgs: json.RawMessage(`[{"O":"k1","L":"k1"},false,1,[]]`), + RawArgs: json.RawMessage(`[{"O":"k1","L":"k1"},false,2,[]]`), }, { Type: model.ActionDropIndex, - RawArgs: json.RawMessage(`[{"O":"k2","L":"k2"},false,2,[]]`), + RawArgs: json.RawMessage(`[{"O":"k2","L":"k2"},false,3,[]]`), }, }, }, } ) -type TableDeletQueryArgs struct { - tableIDs []int64 -} - -type IndexDeleteQueryArgs struct { - tableID int64 - indexIDs []int64 -} - type mockInsertDeleteRange struct { - tableCh chan TableDeletQueryArgs - indexCh chan IndexDeleteQueryArgs + queryCh chan *PreDelRangeQuery } func newMockInsertDeleteRange() *mockInsertDeleteRange { // Since there is only single thread, we need to set the channel buf large enough. return &mockInsertDeleteRange{ - tableCh: make(chan TableDeletQueryArgs, 10), - indexCh: make(chan IndexDeleteQueryArgs, 10), + queryCh: make(chan *PreDelRangeQuery, 10), } } -func (midr *mockInsertDeleteRange) mockInsertDeleteRangeForTable(jobID int64, tableIDs []int64) { - midr.tableCh <- TableDeletQueryArgs{ - tableIDs: tableIDs, - } +func (midr *mockInsertDeleteRange) mockRecordDeleteRange(query *PreDelRangeQuery) { + midr.queryCh <- query } -func (midr *mockInsertDeleteRange) mockInsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64) { - midr.indexCh <- IndexDeleteQueryArgs{ - tableID: tableID, - indexIDs: indexIDs, - } +func encodeTableKey(tableID int64) string { + key := tablecodec.EncodeTablePrefix(tableID) + return hex.EncodeToString(key) +} + +func encodeTableIndexKey(tableID, indexID int64) string { + key := tablecodec.EncodeTableIndexPrefix(tableID, indexID) + return hex.EncodeToString(key) } func TestDeleteRangeForMDDLJob(t *testing.T) { midr := newMockInsertDeleteRange() - schemaReplace := MockEmptySchemasReplace(midr) partitionMap := map[int64]int64{ mDDLJobPartition0OldID: mDDLJobPartition0NewID, mDDLJobPartition1OldID: mDDLJobPartition1NewID, @@ -748,203 +772,233 @@ func TestDeleteRangeForMDDLJob(t *testing.T) { DbID: mDDLJobDBNewID, TableMap: tableMap, } - schemaReplace.DbMap[mDDLJobDBOldID] = dbReplace + schemaReplace := MockEmptySchemasReplace(midr, map[int64]*DBReplace{ + mDDLJobDBOldID: dbReplace, + }) - var targs TableDeletQueryArgs - var iargs IndexDeleteQueryArgs - var err error + var qargs *PreDelRangeQuery // drop schema - err = schemaReplace.deleteRange(dropSchemaJob) + err := schemaReplace.restoreFromHistory(dropSchemaJob) require.NoError(t, err) - targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewTableIDSet)) - for _, tableID := range targs.tableIDs { - _, exist := mDDLJobALLNewTableIDSet[tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), len(mDDLJobALLNewTableIDSet)) + for _, params := range qargs.ParamsList { + _, exist := mDDLJobALLNewTableKeySet[params.StartKey] require.True(t, exist) } // drop table0 - err = schemaReplace.deleteRange(dropTable0Job) + err = schemaReplace.restoreFromHistory(dropTable0Job) require.NoError(t, err) - targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), len(mDDLJobALLNewPartitionIDSet)) - for _, tableID := range targs.tableIDs { - _, exist := mDDLJobALLNewPartitionIDSet[tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), len(mDDLJobALLNewPartitionIDSet)) + for _, params := range qargs.ParamsList { + _, exist := mDDLJobALLNewPartitionKeySet[params.StartKey] require.True(t, exist) } + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, qargs.ParamsList[0].StartKey, encodeTableKey(mDDLJobTable0NewID)) // drop table1 - err = schemaReplace.deleteRange(dropTable1Job) + err = schemaReplace.restoreFromHistory(dropTable1Job) require.NoError(t, err) - targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), 1) - require.Equal(t, targs.tableIDs[0], mDDLJobTable1NewID) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, qargs.ParamsList[0].StartKey, encodeTableKey(mDDLJobTable1NewID)) // drop table partition1 - err = schemaReplace.deleteRange(dropTable0Partition1Job) + err = schemaReplace.restoreFromHistory(dropTable0Partition1Job) + require.NoError(t, err) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, qargs.ParamsList[0].StartKey, encodeTableKey(mDDLJobPartition1NewID)) + + // reorganize table partition1 + err = schemaReplace.restoreFromHistory(reorganizeTable0Partition1Job) + require.NoError(t, err) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableKey(mDDLJobPartition1NewID), qargs.ParamsList[0].StartKey) + + // remove table partition1 + err = schemaReplace.restoreFromHistory(removeTable0Partition1Job) require.NoError(t, err) - targs = <-midr.tableCh - require.Equal(t, len(targs.tableIDs), 1) - require.Equal(t, targs.tableIDs[0], mDDLJobPartition1NewID) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableKey(mDDLJobPartition1NewID), qargs.ParamsList[0].StartKey) + + // alter table partition1 + err = schemaReplace.restoreFromHistory(alterTable0Partition1Job) + require.NoError(t, err) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableKey(mDDLJobPartition1NewID), qargs.ParamsList[0].StartKey) // roll back add index for table0 - err = schemaReplace.deleteRange(rollBackTable0IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable0IndexJob) require.NoError(t, err) + oldPartitionIDMap := make(map[string]struct{}) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 2) + for _, params := range qargs.ParamsList { + _, exist := oldPartitionIDMap[params.StartKey] + require.False(t, exist) + oldPartitionIDMap[params.StartKey] = struct{}{} + } + + // index ID + _, exist := mDDLJobALLNewPartitionIndex2KeySet[qargs.ParamsList[0].StartKey] + require.True(t, exist) + // temp index ID + _, exist = mDDLJobALLNewPartitionTempIndex2KeySet[qargs.ParamsList[1].StartKey] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) } // roll back add index for table1 - err = schemaReplace.deleteRange(rollBackTable1IndexJob) + err = schemaReplace.restoreFromHistory(rollBackTable1IndexJob) require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 2) + // index ID + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(2)), qargs.ParamsList[0].StartKey) + // temp index ID + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(tablecodec.TempIndexPrefix|2)), qargs.ParamsList[1].StartKey) // drop index for table0 - err = schemaReplace.deleteRange(dropTable0IndexJob) + err = schemaReplace.restoreFromHistory(dropTable0IndexJob) require.NoError(t, err) + oldPartitionIDMap = make(map[string]struct{}) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + _, exist := oldPartitionIDMap[qargs.ParamsList[0].StartKey] + require.False(t, exist) + oldPartitionIDMap[qargs.ParamsList[0].StartKey] = struct{}{} + _, exist = mDDLJobALLNewPartitionIndex2KeySet[qargs.ParamsList[0].StartKey] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) } // drop index for table1 - err = schemaReplace.deleteRange(dropTable1IndexJob) + err = schemaReplace.restoreFromHistory(dropTable1IndexJob) require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(2)) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(2)), qargs.ParamsList[0].StartKey) - // drop indexes for table0 - err = schemaReplace.deleteRange(dropTable0IndexesJob) + // add index for table 0 + err = schemaReplace.restoreFromHistory(addTable0IndexJob) require.NoError(t, err) + oldPartitionIDMap = make(map[string]struct{}) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + _, exist := oldPartitionIDMap[qargs.ParamsList[0].StartKey] + require.False(t, exist) + oldPartitionIDMap[qargs.ParamsList[0].StartKey] = struct{}{} + _, exist = mDDLJobALLNewPartitionTempIndex2KeySet[qargs.ParamsList[0].StartKey] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } } - // drop indexes for table1 - err = schemaReplace.deleteRange(dropTable1IndexesJob) + // add index for table 1 + err = schemaReplace.restoreFromHistory(addTable1IndexJob) require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, tempIndex2), qargs.ParamsList[0].StartKey) // drop column for table0 - err = schemaReplace.deleteRange(dropTable0ColumnJob) + err = schemaReplace.restoreFromHistory(dropTable0ColumnJob) require.NoError(t, err) + oldPartitionIDMap = make(map[string]struct{}) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] - require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 2) + for _, params := range qargs.ParamsList { + _, exist := oldPartitionIDMap[params.StartKey] + require.False(t, exist) + oldPartitionIDMap[params.StartKey] = struct{}{} } - } - // drop column for table1 - err = schemaReplace.deleteRange(dropTable1ColumnJob) - require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] + // index ID 2 + _, exist := mDDLJobALLNewPartitionIndex2KeySet[qargs.ParamsList[0].StartKey] + require.True(t, exist) + // index ID 3 + _, exist = mDDLJobALLNewPartitionIndex3KeySet[qargs.ParamsList[1].StartKey] require.True(t, exist) } - // drop columns for table0 - err = schemaReplace.deleteRange(dropTable0ColumnsJob) + // drop column for table1 + err = schemaReplace.restoreFromHistory(dropTable1ColumnJob) require.NoError(t, err) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), len(mDDLJobALLIndexesIDSet)) + // index ID 2 + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(2)), qargs.ParamsList[0].StartKey) + // index ID 3 + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(3)), qargs.ParamsList[1].StartKey) + + // modify column for table0 + err = schemaReplace.restoreFromHistory(modifyTable0ColumnJob) + require.NoError(t, err) + oldPartitionIDMap = make(map[string]struct{}) for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] - require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 2) + for _, params := range qargs.ParamsList { + _, exist := oldPartitionIDMap[params.StartKey] + require.False(t, exist) + oldPartitionIDMap[params.StartKey] = struct{}{} } - } - // drop columns for table1 - err = schemaReplace.deleteRange(dropTable1ColumnsJob) - require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] + // index ID 2 + _, exist := mDDLJobALLNewPartitionIndex2KeySet[qargs.ParamsList[0].StartKey] require.True(t, exist) - } - - // drop columns for table0 - err = schemaReplace.deleteRange(modifyTable0ColumnJob) - require.NoError(t, err) - for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + // index ID 3 + _, exist = mDDLJobALLNewPartitionIndex3KeySet[qargs.ParamsList[1].StartKey] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } } - // drop columns for table1 - err = schemaReplace.deleteRange(modifyTable1ColumnJob) + // modify column for table1 + err = schemaReplace.restoreFromHistory(modifyTable1ColumnJob) require.NoError(t, err) - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), len(mDDLJobALLIndexesIDSet)) - for _, indexID := range iargs.indexIDs { - _, exist := mDDLJobALLIndexesIDSet[indexID] - require.True(t, exist) - } + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), len(mDDLJobALLIndexesIDSet)) + // index ID 2 + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(2)), qargs.ParamsList[0].StartKey) + // index ID 3 + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(3)), qargs.ParamsList[1].StartKey) // drop indexes(multi-schema-change) for table0 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0, false) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob0) require.NoError(t, err) + oldPartitionIDMap = make(map[string]struct{}) for l := 0; l < 2; l++ { for i := 0; i < len(mDDLJobALLNewPartitionIDSet); i++ { - iargs = <-midr.indexCh - _, exist := mDDLJobALLNewPartitionIDSet[iargs.tableID] + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + _, exist := oldPartitionIDMap[qargs.ParamsList[0].StartKey] + require.False(t, exist) + oldPartitionIDMap[qargs.ParamsList[0].StartKey] = struct{}{} + _, exist = mDDLJobAllIndexesKeySet[l][qargs.ParamsList[0].StartKey] require.True(t, exist) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(l+1)) } } // drop indexes(multi-schema-change) for table1 - err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1, false) + err = schemaReplace.restoreFromHistory(multiSchemaChangeJob1) require.NoError(t, err) - for l := 0; l < 2; l++ { - iargs = <-midr.indexCh - require.Equal(t, iargs.tableID, mDDLJobTable1NewID) - require.Equal(t, len(iargs.indexIDs), 1) - require.Equal(t, iargs.indexIDs[0], int64(l+1)) - } + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(2)), qargs.ParamsList[0].StartKey) + + qargs = <-midr.queryCh + require.Equal(t, len(qargs.ParamsList), 1) + require.Equal(t, encodeTableIndexKey(mDDLJobTable1NewID, int64(3)), qargs.ParamsList[0].StartKey) +} + +func TestCompatibleAlert(t *testing.T) { + require.Equal(t, ddl.BRInsertDeleteRangeSQLPrefix, `INSERT IGNORE INTO mysql.gc_delete_range VALUES `) + require.Equal(t, ddl.BRInsertDeleteRangeSQLValue, `(%?, %?, %?, %?, %?)`) } diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql index f5afde943649e..180a7cd1d0e62 100644 --- a/br/tests/br_pitr/incremental_data/delete_range.sql +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -5,14 +5,23 @@ drop table table_to_be_dropped_or_truncated.t0_dropped; drop table table_to_be_dropped_or_truncated.t1_dropped; truncate table table_to_be_dropped_or_truncated.t0_truncated; truncate table table_to_be_dropped_or_truncated.t1_truncated; --- 3. Drop/Truncate Table Partition +-- 3.1. Drop/Truncate Table Partition alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated reorganize partition p2 INTO (PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN MAXVALUE); +-- 3.2. Remove/Alter Table Partitioning +alter table partition_to_be_removed_or_altered.t_removed remove partitioning; +alter table partition_to_be_removed_or_altered.t_altered partition by range(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (100), PARTITION p2 VALUES LESS THAN MAXVALUE ); +alter table partition_to_be_removed_or_altered.t_altered partition by key(id) partitions 3; -- 4. Drop Table Index/PrimaryKey alter table index_or_primarykey_to_be_dropped.t0 drop index k1; alter table index_or_primarykey_to_be_dropped.t1 drop index k1; alter table index_or_primarykey_to_be_dropped.t0 drop primary key; alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +alter table index_or_primarykey_to_be_dropped.t0 add primary key (id); +alter table index_or_primarykey_to_be_dropped.t1 add primary key (id); -- 5. Drop Table Indexes alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; @@ -23,3 +32,184 @@ alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; -- 7. Modify Table Column alter table column_to_be_modified.t0 modify column name varchar(25); + + +-- CREATE TABLE IN THE LOR RESTORE STAGE +-- 1. Drop Schema +create database db_to_be_dropped_2; +create table db_to_be_dropped_2.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped_2.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped_2.t0 (name); +create index k2 on db_to_be_dropped_2.t0(c); +create index k1 on db_to_be_dropped_2.t1(name); +create index k2 on db_to_be_dropped_2.t1(c); +create index k3 on db_to_be_dropped_2.t1 (id, c); + +insert into db_to_be_dropped_2.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped_2.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated_2; +create table table_to_be_dropped_or_truncated_2.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated_2.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated_2.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated_2.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated_2.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated_2.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated_2.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated_2.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated_2.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated_2.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated_2.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated_2.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated_2.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated_2.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated_2.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated_2.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated_2.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated_2.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3.1. Drop/Truncate Table Partition +create database partition_to_be_dropped_or_truncated_2; +create table partition_to_be_dropped_or_truncated_2.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated_2.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated_2.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated_2.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated_2.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated_2.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated_2.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated_2.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated_2.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated_2.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated_2.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated_2.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated_2.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated_2.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated_2.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated_2.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated_2.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated_2.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3.2. Remove/Alter Table Partitioning +create database partition_to_be_removed_or_altered_2; +create table partition_to_be_removed_or_altered_2.t_removed(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_removed_or_altered_2.t_altered(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_removed_or_altered_2.t_removed (name); +create index k2 on partition_to_be_removed_or_altered_2.t_removed (c); +create index k3 on partition_to_be_removed_or_altered_2.t_removed (id, c); + +create index k1 on partition_to_be_removed_or_altered_2.t_altered (name); +create index k2 on partition_to_be_removed_or_altered_2.t_altered (c); +create index k3 on partition_to_be_removed_or_altered_2.t_altered (id, c); + +insert into partition_to_be_removed_or_altered_2.t_removed values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_removed_or_altered_2.t_altered values (1, 2, "123"), (2, 3, "123"); + +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped_2; +create table index_or_primarykey_to_be_dropped_2.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped_2.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped_2.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped_2.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped_2.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped_2.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped_2.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped_2.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped_2.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped_2; +create table indexes_to_be_dropped_2.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped_2.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped_2.t0 (name); +create index k2 on indexes_to_be_dropped_2.t0 (c); +create index k1 on indexes_to_be_dropped_2.t1 (name); +create index k2 on indexes_to_be_dropped_2.t1 (c); +create index k3 on indexes_to_be_dropped_2.t1 (id, c); + +insert into indexes_to_be_dropped_2.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped_2.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped_2; +create table column_s_to_be_dropped_2.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped_2.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped_2.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped_2.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped_2.t0_column (name); +create index k2 on column_s_to_be_dropped_2.t0_column (c); +create index k1 on column_s_to_be_dropped_2.t1_column (name); +create index k2 on column_s_to_be_dropped_2.t1_column (c); +create index k3 on column_s_to_be_dropped_2.t1_column (id, c); + +create index k1 on column_s_to_be_dropped_2.t0_columns (name); +create index k2 on column_s_to_be_dropped_2.t0_columns (c); +create index k1 on column_s_to_be_dropped_2.t1_columns (name); +create index k2 on column_s_to_be_dropped_2.t1_columns (c); +-- create index k3 on column_s_to_be_dropped_2.t1_columns (id, c); + +insert into column_s_to_be_dropped_2.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped_2.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped_2.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped_2.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified_2; +create table column_to_be_modified_2.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified_2.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified_2.t0 (name); +create index k2 on column_to_be_modified_2.t0 (c); +create index k1 on column_to_be_modified_2.t1 (name); +create index k2 on column_to_be_modified_2.t1 (c); +create index k3 on column_to_be_modified_2.t1 (id, c); + +insert into column_to_be_modified_2.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified_2.t1 values (1, 2, "123"), (2, 3, "123"); + +-- 1. Drop Schema +drop database db_to_be_dropped_2; +-- 2. Drop/Truncate Table +drop table table_to_be_dropped_or_truncated_2.t0_dropped; +drop table table_to_be_dropped_or_truncated_2.t1_dropped; +truncate table table_to_be_dropped_or_truncated_2.t0_truncated; +truncate table table_to_be_dropped_or_truncated_2.t1_truncated; +-- 3.1. Drop/Truncate Table Partition +alter table partition_to_be_dropped_or_truncated_2.t1_dropped drop partition p0; +alter table partition_to_be_dropped_or_truncated_2.t1_truncated truncate partition p0; +alter table partition_to_be_dropped_or_truncated_2.t1_truncated reorganize partition p2 INTO (PARTITION p2 VALUES LESS THAN (20), PARTITION p3 VALUES LESS THAN MAXVALUE); +-- 3.2. Remove/Alter Table Partitioning +alter table partition_to_be_removed_or_altered_2.t_removed remove partitioning; +alter table partition_to_be_removed_or_altered_2.t_altered partition by range(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (100), PARTITION p2 VALUES LESS THAN MAXVALUE ); +alter table partition_to_be_removed_or_altered_2.t_altered partition by key(id) partitions 3; +-- 4. Drop Table Index/PrimaryKey +alter table index_or_primarykey_to_be_dropped_2.t0 drop index k1; +alter table index_or_primarykey_to_be_dropped_2.t1 drop index k1; +alter table index_or_primarykey_to_be_dropped_2.t0 drop primary key; +alter table index_or_primarykey_to_be_dropped_2.t1 drop primary key; +create index k1 on index_or_primarykey_to_be_dropped_2.t0 (name); +create index k1 on index_or_primarykey_to_be_dropped_2.t1 (name); +alter table index_or_primarykey_to_be_dropped_2.t0 add primary key (id); +alter table index_or_primarykey_to_be_dropped_2.t1 add primary key (id); +-- 5. Drop Table Indexes +alter table indexes_to_be_dropped_2.t0 drop index k1, drop index k2; +alter table indexes_to_be_dropped_2.t1 drop index k1, drop index k2; +-- 6. Drop Table Column/Columns +alter table column_s_to_be_dropped_2.t0_column drop column name; +alter table column_s_to_be_dropped_2.t1_column drop column name; +alter table column_s_to_be_dropped_2.t0_columns drop column name, drop column c; +alter table column_s_to_be_dropped_2.t1_columns drop column name, drop column c; +-- 7. Modify Table Column +alter table column_to_be_modified_2.t0 modify column name varchar(25); + diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql index e2a20be9e45fa..4abb9422c432a 100644 --- a/br/tests/br_pitr/prepare_data/delete_range.sql +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -36,7 +36,7 @@ insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2 insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); --- 3. Drop/Truncate Table Partition +-- 3.1. Drop/Truncate Table Partition create database partition_to_be_dropped_or_truncated; create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); @@ -60,6 +60,24 @@ insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123") insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3.2. Remove/Alter Table Partitioning +create database partition_to_be_removed_or_altered; +create table partition_to_be_removed_or_altered.t_removed(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_removed_or_altered.t_altered(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_removed_or_altered.t_removed (name); +create index k2 on partition_to_be_removed_or_altered.t_removed (c); +create index k3 on partition_to_be_removed_or_altered.t_removed (id, c); + +create index k1 on partition_to_be_removed_or_altered.t_altered (name); +create index k2 on partition_to_be_removed_or_altered.t_altered (c); +create index k3 on partition_to_be_removed_or_altered.t_altered (id, c); + +insert into partition_to_be_removed_or_altered.t_removed values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_removed_or_altered.t_altered values (1, 2, "123"), (2, 3, "123"); + -- 4. Drop Table Index/PrimaryKey create database index_or_primarykey_to_be_dropped; create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 25a7fda5588f2..afe400820eb7e 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -31,6 +31,10 @@ echo "prepare the data" run_sql_file $CUR/prepare_data/delete_range.sql # ... +# check something after prepare the data +prepare_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "prepare_delete_range_count: $prepare_delete_range_count" + # start the log backup task echo "start log task" run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" @@ -44,6 +48,10 @@ echo "load the incremental data" run_sql_file $CUR/incremental_data/delete_range.sql # ... +# check something after load the incremental data +incremental_delete_range_count=$(run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range;" | tail -n 1 | awk '{print $2}') +echo "incremental_delete_range_count: $incremental_delete_range_count" + # wait checkpoint advance echo "wait checkpoint advance" sleep 10 @@ -93,8 +101,9 @@ run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-bac # check something in downstream cluster echo "check br log" check_contains "restore log success summary" -# check_not_contains "rewrite delete range" +check_not_contains "rewrite delete range" echo "" > $res_file echo "check sql result" -run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" -check_contains "DELETE_RANGE_CNT: 46" +run_sql "select count(*) DELETE_RANGE_CNT from (select * from mysql.gc_delete_range union all select * from mysql.gc_delete_range_done) del_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +expect_delete_range=$(($incremental_delete_range_count-$prepare_delete_range_count)) +check_contains "DELETE_RANGE_CNT: $expect_delete_range" diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index a59c58833fc88..cb0120e3f4718 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -73,9 +73,6 @@ func (d *ddl) RemoveReorgCtx(id int64) { d.removeReorgCtx(id) } -// JobNeedGCForTest is only used for test. -var JobNeedGCForTest = jobNeedGC - func createMockStore(t *testing.T) kv.Storage { store, err := mockstore.NewMockStore() require.NoError(t, err) diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 25e5acd4db155..cb9a2b66f06f7 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -538,7 +538,11 @@ func needUpdateRawArgs(job *model.Job, meetErr bool) bool { return true } -func jobNeedGC(job *model.Job) bool { +// JobNeedGC is called to determine whether delete-ranges need to be generated for the provided job. +// +// NOTICE: BR also uses jobNeedGC to determine whether delete-ranges need to be generated for the provided job. +// Therefore, please make sure any modification is compatible with BR. +func JobNeedGC(job *model.Job) bool { if !job.IsCancelled() { if job.Warning != nil && dbterror.ErrCantDropFieldOrKey.Equal(job.Warning) { // For the field/key not exists warnings, there is no need to @@ -558,7 +562,7 @@ func jobNeedGC(job *model.Job) bool { case model.ActionMultiSchemaChange: for i, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job, i) - needGC := jobNeedGC(&proxyJob) + needGC := JobNeedGC(&proxyJob) if needGC { return true } @@ -578,7 +582,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { markJobFinish(job) }() - if jobNeedGC(job) { + if JobNeedGC(job) { err = w.delRangeManager.addDelRangeJob(w.ctx, job) if err != nil { return errors.Trace(err) diff --git a/pkg/ddl/ddl_worker_test.go b/pkg/ddl/ddl_worker_test.go index 9d6abc20dece7..4fe7026959b4a 100644 --- a/pkg/ddl/ddl_worker_test.go +++ b/pkg/ddl/ddl_worker_test.go @@ -263,46 +263,46 @@ func TestParallelDDL(t *testing.T) { func TestJobNeedGC(t *testing.T) { job := &model.Job{Type: model.ActionAddIndex, State: model.JobStateCancelled} - require.False(t, ddl.JobNeedGCForTest(job)) + require.False(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionAddColumn, State: model.JobStateDone} - require.False(t, ddl.JobNeedGCForTest(job)) + require.False(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionAddIndex, State: model.JobStateDone} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionAddPrimaryKey, State: model.JobStateDone} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionAddPrimaryKey, State: model.JobStateRollbackDone} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{ SubJobs: []*model.SubJob{ {Type: model.ActionAddColumn, State: model.JobStateDone}, {Type: model.ActionRebaseAutoID, State: model.JobStateDone}, }}} - require.False(t, ddl.JobNeedGCForTest(job)) + require.False(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{ SubJobs: []*model.SubJob{ {Type: model.ActionAddIndex, State: model.JobStateDone}, {Type: model.ActionAddColumn, State: model.JobStateDone}, {Type: model.ActionRebaseAutoID, State: model.JobStateDone}, }}} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{ SubJobs: []*model.SubJob{ {Type: model.ActionAddIndex, State: model.JobStateDone}, {Type: model.ActionDropColumn, State: model.JobStateDone}, {Type: model.ActionRebaseAutoID, State: model.JobStateDone}, }}} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateRollbackDone, MultiSchemaInfo: &model.MultiSchemaInfo{ SubJobs: []*model.SubJob{ {Type: model.ActionAddIndex, State: model.JobStateRollbackDone}, {Type: model.ActionAddColumn, State: model.JobStateRollbackDone}, {Type: model.ActionRebaseAutoID, State: model.JobStateCancelled}, }}} - require.True(t, ddl.JobNeedGCForTest(job)) + require.True(t, ddl.JobNeedGC(job)) } func TestUsingReorgCtx(t *testing.T) { diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index c06ebe4a62432..572c9dd35c7f6 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -17,7 +17,6 @@ package ddl import ( "context" "encoding/hex" - "fmt" "math" "strings" "sync" @@ -41,12 +40,17 @@ import ( const ( insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES ` insertDeleteRangeSQLValue = `(%?, %?, %?, %?, %?)` - insertDeleteRangeSQL = insertDeleteRangeSQLPrefix + insertDeleteRangeSQLValue delBatchSize = 65536 delBackLog = 128 ) +// Only used in the BR unit test. Once these const variables modified, please make sure compatible with BR. +const ( + BRInsertDeleteRangeSQLPrefix = insertDeleteRangeSQLPrefix + BRInsertDeleteRangeSQLValue = insertDeleteRangeSQLValue +) + var ( // batchInsertDeleteRangeSize is the maximum size for each batch insert statement in the delete-range. batchInsertDeleteRangeSize = 256 @@ -96,11 +100,9 @@ func (dr *delRange) addDelRangeJob(ctx context.Context, job *model.Job) error { } defer dr.sessPool.Put(sctx) - if job.MultiSchemaInfo != nil { - err = insertJobIntoDeleteRangeTableMultiSchema(ctx, sctx, job) - } else { - err = insertJobIntoDeleteRangeTable(ctx, sctx, job, &elementIDAlloc{}) - } + // The same Job ID uses the same element ID allocator + wrapper := newDelRangeExecWrapper(sctx) + err = AddDelRangeJobInternal(ctx, wrapper, job) if err != nil { logutil.BgLogger().Error("add job into delete-range table failed", zap.String("category", "ddl"), zap.Int64("jobID", job.ID), zap.String("jobType", job.Type.String()), zap.Error(err)) return errors.Trace(err) @@ -112,12 +114,23 @@ func (dr *delRange) addDelRangeJob(ctx context.Context, job *model.Job) error { return nil } -func insertJobIntoDeleteRangeTableMultiSchema(ctx context.Context, sctx sessionctx.Context, job *model.Job) error { +// AddDelRangeJobInternal implements the generation the delete ranges for the provided job and consumes the delete ranges through delRangeExecWrapper. +func AddDelRangeJobInternal(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job) error { + var err error var ea elementIDAlloc + if job.MultiSchemaInfo != nil { + err = insertJobIntoDeleteRangeTableMultiSchema(ctx, wrapper, job, &ea) + } else { + err = insertJobIntoDeleteRangeTable(ctx, wrapper, job, &ea) + } + return errors.Trace(err) +} + +func insertJobIntoDeleteRangeTableMultiSchema(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job, ea *elementIDAlloc) error { for i, sub := range job.MultiSchemaInfo.SubJobs { proxyJob := sub.ToProxyJob(job, i) - if jobNeedGC(&proxyJob) { - err := insertJobIntoDeleteRangeTable(ctx, sctx, &proxyJob, &ea) + if JobNeedGC(&proxyJob) { + err := insertJobIntoDeleteRangeTable(ctx, wrapper, &proxyJob, ea) if err != nil { return errors.Trace(err) } @@ -262,14 +275,12 @@ func (dr *delRange) doTask(sctx sessionctx.Context, r util.DelRangeTask) error { // insertJobIntoDeleteRangeTable parses the job into delete-range arguments, // and inserts a new record into gc_delete_range table. The primary key is // (job ID, element ID), so we ignore key conflict error. -func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job, ea *elementIDAlloc) error { - now, err := getNowTSO(sctx) - if err != nil { +func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job, ea *elementIDAlloc) error { + if err := wrapper.UpdateTSOForJob(); err != nil { return errors.Trace(err) } ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job.Type)) - s := sctx.(sqlexec.SQLExecutor) switch job.Type { case model.ActionDropSchema: var tableIDs []int64 @@ -281,7 +292,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if batchEnd > i+batchInsertDeleteRangeSize { batchEnd = i + batchInsertDeleteRangeSize } - if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, ea); err != nil { + if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, tableIDs[i:batchEnd], ea, "drop schema: table IDs"); err != nil { return errors.Trace(err) } } @@ -295,24 +306,13 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return errors.Trace(err) } if len(physicalTableIDs) > 0 { - for _, pid := range physicalTableIDs { - startKey = tablecodec.EncodeTablePrefix(pid) - endKey := tablecodec.EncodeTablePrefix(pid + 1) - elemID := ea.allocForPhysicalID(pid) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil { - return errors.Trace(err) - } + if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "drop table: partition table IDs"); err != nil { + return errors.Trace(err) } // logical table may contain global index regions, so delete the logical table range. - startKey = tablecodec.EncodeTablePrefix(tableID) - endKey := tablecodec.EncodeTablePrefix(tableID + 1) - elemID := ea.allocForPhysicalID(tableID) - return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) + return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID")) } - startKey = tablecodec.EncodeTablePrefix(tableID) - endKey := tablecodec.EncodeTablePrefix(tableID + 1) - elemID := ea.allocForPhysicalID(tableID) - return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) + return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID")) case model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: @@ -324,14 +324,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if err := job.DecodeArgs(&physicalTableIDs, &partInfo); err != nil { return errors.Trace(err) } - for _, physicalTableID := range physicalTableIDs { - startKey := tablecodec.EncodeTablePrefix(physicalTableID) - endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) - elemID := ea.allocForPhysicalID(physicalTableID) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil { - return errors.Trace(err) - } - } + return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "drop partition: physical table ID(s)")) // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: allIndexIDs := make([]int64, 1) @@ -357,13 +350,8 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, indexIDs = []int64{tempIdxID} } for _, pid := range physicalIDs { - for _, iid := range indexIDs { - startKey := tablecodec.EncodeTableIndexPrefix(pid, iid) - endKey := tablecodec.EncodeTableIndexPrefix(pid, iid+1) - elemID := ea.allocForIndexID(pid, iid) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("physical ID is %d", pid)); err != nil { - return errors.Trace(err) - } + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "add index: physical table ID(s)"); err != nil { + return errors.Trace(err) } } } @@ -378,29 +366,18 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return errors.Trace(err) } } - for _, indexID := range allIndexIDs { - // partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table. - if len(partitionIDs) == 0 { - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - elemID := ea.allocForIndexID(tableID, indexID) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)); err != nil { - return errors.Trace(err) - } - continue + // partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table. + if len(partitionIDs) == 0 { + return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, allIndexIDs, ea, "drop index: table ID")) + } + failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) { + if val.(bool) { + panic("drop global index must not delete partition index range") } - failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) { - if val.(bool) { - panic("drop global index must not delete partition index range") - } - }) - for _, pid := range partitionIDs { - startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - elemID := ea.allocForIndexID(pid, indexID) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { - return errors.Trace(err) - } + }) + for _, pid := range partitionIDs { + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, allIndexIDs, ea, "drop index: partition table ID"); err != nil { + return errors.Trace(err) } } case model.ActionDropColumn: @@ -413,10 +390,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, } if len(indexIDs) > 0 { if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, ea) + return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "drop column: table ID")) } for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, ea); err != nil { + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop column: partition table ID"); err != nil { return errors.Trace(err) } } @@ -431,10 +408,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, ea) + return doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "modify column: table ID") } for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, ea); err != nil { + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "modify column: partition table ID"); err != nil { return errors.Trace(err) } } @@ -442,11 +419,15 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } -func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, ea *elementIDAlloc) error { - logutil.BgLogger().Info("batch insert into delete-range indices", zap.String("category", "ddl"), zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs)) - paramsList := make([]interface{}, 0, len(indexIDs)*5) +func doBatchDeleteIndiceRange(ctx context.Context, wrapper DelRangeExecWrapper, jobID, tableID int64, indexIDs []int64, ea *elementIDAlloc, comment string) error { + logutil.BgLogger().Info("insert into delete-range indices", zap.String("category", "ddl"), zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs), zap.String("comment", comment)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) + wrapper.PrepareParamsList(len(indexIDs) * 5) + tableID, ok := wrapper.RewriteTableID(tableID) + if !ok { + return nil + } for i, indexID := range indexIDs { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) @@ -457,31 +438,22 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, buf.WriteString(",") } elemID := ea.allocForIndexID(tableID, indexID) - paramsList = append(paramsList, jobID, elemID, startKeyEncoded, endKeyEncoded, ts) + wrapper.AppendParamsList(jobID, elemID, startKeyEncoded, endKeyEncoded) } - _, err := s.ExecuteInternal(ctx, buf.String(), paramsList...) - return errors.Trace(err) -} -func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64, startKey, endKey kv.Key, ts uint64, comment string) error { - logutil.BgLogger().Info("insert into delete-range table", zap.String("category", "ddl"), zap.Int64("jobID", jobID), zap.Int64("elementID", elementID), zap.String("comment", comment)) - startKeyEncoded := hex.EncodeToString(startKey) - endKeyEncoded := hex.EncodeToString(endKey) - // set session disk full opt - // TODO ddl txn func including an session pool txn, there may be a problem? - s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) - _, err := s.ExecuteInternal(ctx, insertDeleteRangeSQL, jobID, elementID, startKeyEncoded, endKeyEncoded, ts) - // clear session disk full opt - s.ClearDiskFullOpt() - return errors.Trace(err) + return errors.Trace(wrapper.ConsumeDeleteRange(ctx, buf.String())) } -func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, ea *elementIDAlloc) error { - logutil.BgLogger().Info("batch insert into delete-range table", zap.String("category", "ddl"), zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs)) +func doBatchDeleteTablesRange(ctx context.Context, wrapper DelRangeExecWrapper, jobID int64, tableIDs []int64, ea *elementIDAlloc, comment string) error { + logutil.BgLogger().Info("insert into delete-range table", zap.String("category", "ddl"), zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs), zap.String("comment", comment)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) - paramsList := make([]interface{}, 0, len(tableIDs)*5) + wrapper.PrepareParamsList(len(tableIDs) * 5) for i, tableID := range tableIDs { + tableID, ok := wrapper.RewriteTableID(tableID) + if !ok { + continue + } startKey := tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) startKeyEncoded := hex.EncodeToString(startKey) @@ -491,13 +463,79 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl buf.WriteString(",") } elemID := ea.allocForPhysicalID(tableID) - paramsList = append(paramsList, jobID, elemID, startKeyEncoded, endKeyEncoded, ts) + wrapper.AppendParamsList(jobID, elemID, startKeyEncoded, endKeyEncoded) } + + return errors.Trace(wrapper.ConsumeDeleteRange(ctx, buf.String())) +} + +// DelRangeExecWrapper consumes the delete ranges with the provided table ID(s) and index ID(s). +type DelRangeExecWrapper interface { + // generate a new tso for the next job + UpdateTSOForJob() error + + // initialize the paramsList + PrepareParamsList(sz int) + + // rewrite table id if necessary, used for BR + RewriteTableID(tableID int64) (int64, bool) + + // (job_id, element_id, start_key, end_key, ts) + // ts is generated by delRangeExecWrapper itself + AppendParamsList(jobID, elemID int64, startKey, endKey string) + + // consume the delete range. For TiDB Server, it insert rows into mysql.gc_delete_range. + ConsumeDeleteRange(ctx context.Context, sql string) error +} + +// sessionDelRangeExecWrapper is a lightweight wrapper that implements the DelRangeExecWrapper interface and used for TiDB Server. +// It consumes the delete ranges by directly insert rows into mysql.gc_delete_range. +type sessionDelRangeExecWrapper struct { + sctx sessionctx.Context + s sqlexec.SQLExecutor + ts uint64 + + // temporary values + paramsList []interface{} +} + +func newDelRangeExecWrapper(sctx sessionctx.Context) DelRangeExecWrapper { + return &sessionDelRangeExecWrapper{ + sctx: sctx, + s: sctx.(sqlexec.SQLExecutor), + + paramsList: nil, + } +} + +func (sdr *sessionDelRangeExecWrapper) UpdateTSOForJob() error { + now, err := getNowTSO(sdr.sctx) + if err != nil { + return errors.Trace(err) + } + sdr.ts = now + return nil +} + +func (sdr *sessionDelRangeExecWrapper) PrepareParamsList(sz int) { + sdr.paramsList = make([]interface{}, 0, sz) +} + +func (*sessionDelRangeExecWrapper) RewriteTableID(tableID int64) (int64, bool) { + return tableID, true +} + +func (sdr *sessionDelRangeExecWrapper) AppendParamsList(jobID, elemID int64, startKey, endKey string) { + sdr.paramsList = append(sdr.paramsList, jobID, elemID, startKey, endKey, sdr.ts) +} + +func (sdr *sessionDelRangeExecWrapper) ConsumeDeleteRange(ctx context.Context, sql string) error { // set session disk full opt - s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) - _, err := s.ExecuteInternal(ctx, buf.String(), paramsList...) + sdr.s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + _, err := sdr.s.ExecuteInternal(ctx, sql, sdr.paramsList...) // clear session disk full opt - s.ClearDiskFullOpt() + sdr.s.ClearDiskFullOpt() + sdr.paramsList = nil return errors.Trace(err) } diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index d8108e71869cd..afb074161e15a 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -194,7 +194,7 @@ func (d *ddl) checkHistoryJobInTest(ctx sessionctx.Context, historyJob *model.Jo } // Check delete range. - if jobNeedGC(historyJob) { + if JobNeedGC(historyJob) { d.checkDeleteRangeCnt(historyJob) }