Skip to content

Commit

Permalink
ddl: wrap the sessionctx to public delete range logic to BR (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Leavrth authored Dec 7, 2023
1 parent 83f9209 commit be62f75
Show file tree
Hide file tree
Showing 16 changed files with 783 additions and 744 deletions.
98 changes: 30 additions & 68 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ type Client struct {
// this feature is controlled by flag with-sys-table
fullClusterRestore bool
// the query to insert rows into table `gc_delete_range`, lack of ts.
deleteRangeQuery []string
deleteRangeQueryCh chan string
deleteRangeQuery []*stream.PreDelRangeQuery
deleteRangeQueryCh chan *stream.PreDelRangeQuery
deleteRangeQueryWaitGroup sync.WaitGroup

// see RestoreCommonConfig.WithSysTable
Expand Down Expand Up @@ -204,8 +204,8 @@ func NewRestoreClient(
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
deleteRangeQuery: make([]string, 0),
deleteRangeQueryCh: make(chan string, 10),
deleteRangeQuery: make([]*stream.PreDelRangeQuery, 0),
deleteRangeQueryCh: make(chan *stream.PreDelRangeQuery, 10),
}
}

Expand Down Expand Up @@ -2797,7 +2797,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
Name: newTableInfo.Name.O,
TableID: newTableInfo.ID,
PartitionMap: getTableIDMap(newTableInfo, t.Info),
PartitionMap: getPartitionIDMap(newTableInfo, t.Info),
IndexMap: getIndexIDMap(newTableInfo, t.Info),
}
}
Expand All @@ -2824,7 +2824,7 @@ func (rc *Client) InitSchemasReplaceForDDL(

rp := stream.NewSchemasReplace(
dbReplaces, needConstructIdMap, cfg.TiFlashRecorder, rc.currentTS, cfg.TableFilter, rc.GenGlobalID, rc.GenGlobalIDs,
rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex)
rc.RecordDeleteRange)
return rp, nil
}

Expand Down Expand Up @@ -3428,66 +3428,8 @@ NEXTSQL:
return nil
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"

batchInsertDeleteRangeSize = 256
)

// InsertDeleteRangeForTable generates query to insert table delete job into table `gc_delete_range`.
func (rc *Client) InsertDeleteRangeForTable(jobID int64, tableIDs []int64) {
var elementID int64 = 1
var tableID int64
for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize {
batchEnd := len(tableIDs)
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}

var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
for j := i; j < batchEnd; j++ {
tableID = tableIDs[j]
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, elementID, startKeyEncoded, endKeyEncoded))
if j != batchEnd-1 {
buf.WriteString(",")
}
elementID += 1
}
rc.deleteRangeQueryCh <- buf.String()
}
}

// InsertDeleteRangeForIndex generates query to insert index delete job into table `gc_delete_range`.
func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64) {
var indexID int64
for i := 0; i < len(indexIDs); i += batchInsertDeleteRangeSize {
batchEnd := len(indexIDs)
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}

var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
for j := i; j < batchEnd; j++ {
indexID = indexIDs[j]
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, *elementID, startKeyEncoded, endKeyEncoded))
if j != batchEnd-1 {
buf.WriteString(",")
}
*elementID += 1
}
rc.deleteRangeQueryCh <- buf.String()
}
func (rc *Client) RecordDeleteRange(sql *stream.PreDelRangeQuery) {
rc.deleteRangeQueryCh <- sql
}

// use channel to save the delete-range query to make it thread-safety.
Expand Down Expand Up @@ -3518,16 +3460,36 @@ func (rc *Client) InsertGCRows(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
jobIDMap := make(map[int64]int64)
for _, query := range rc.deleteRangeQuery {
if err := rc.db.se.ExecuteInternal(ctx, fmt.Sprintf(query, ts)); err != nil {
paramsList := make([]interface{}, 0, len(query.ParamsList)*5)
for _, params := range query.ParamsList {
newJobID, exists := jobIDMap[params.JobID]
if !exists {
newJobID, err = rc.GenGlobalID(ctx)
if err != nil {
return errors.Trace(err)
}
jobIDMap[params.JobID] = newJobID
}
log.Info("insert into the delete range",
zap.Int64("jobID", newJobID),
zap.Int64("elemID", params.ElemID),
zap.String("startKey", params.StartKey),
zap.String("endKey", params.EndKey),
zap.Uint64("ts", ts))
// (job_id, elem_id, start_key, end_key, ts)
paramsList = append(paramsList, newJobID, params.ElemID, params.StartKey, params.EndKey, ts)
}
if err := rc.db.se.ExecuteInternal(ctx, query.Sql, paramsList...); err != nil {
return errors.Trace(err)
}
}
return nil
}

// only for unit test
func (rc *Client) GetGCRows() []string {
func (rc *Client) GetGCRows() []*stream.PreDelRangeQuery {
close(rc.deleteRangeQueryCh)
rc.deleteRangeQueryWaitGroup.Wait()
return rc.deleteRangeQuery
Expand Down
43 changes: 32 additions & 11 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,18 +619,40 @@ func TestDeleteRangeQuery(t *testing.T) {

client.RunGCRowsLoader(ctx)

client.InsertDeleteRangeForTable(2, []int64{3})
client.InsertDeleteRangeForTable(4, []int64{5, 6})

elementID := int64(1)
client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1})
client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2})
client.RecordDeleteRange(&stream.PreDelRangeQuery{
Sql: "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)",
ParamsList: []stream.DelRangeParams{
{
JobID: 1,
ElemID: 1,
StartKey: "a",
EndKey: "b",
},
{
JobID: 1,
ElemID: 2,
StartKey: "b",
EndKey: "c",
},
},
})

querys := client.GetGCRows()
require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)")
require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)")
require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)")
require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)")
require.Equal(t, len(querys), 1)
require.Equal(t, querys[0].Sql, "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)")
require.Equal(t, len(querys[0].ParamsList), 2)
require.Equal(t, querys[0].ParamsList[0], stream.DelRangeParams{
JobID: 1,
ElemID: 1,
StartKey: "a",
EndKey: "b",
})
require.Equal(t, querys[0].ParamsList[1], stream.DelRangeParams{
JobID: 1,
ElemID: 2,
StartKey: "b",
EndKey: "c",
})
}

func MockEmptySchemasReplace() *stream.SchemasReplace {
Expand All @@ -644,7 +666,6 @@ func MockEmptySchemasReplace() *stream.SchemasReplace {
nil,
nil,
nil,
nil,
)
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func notSynced(job *model.Job, isSubJob bool) bool {
return (job.State != model.JobStateSynced) && !(isSubJob && job.State == model.JobStateDone)
}

// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
func (i *IngestRecorder) AddJob(job *model.Job, isSubJob bool) error {
// TryAddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {
if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job, isSubJob) {
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestAddIngestRecorder(t *testing.T) {
}
recorder := ingestrec.New()
// no ingest job, should ignore it
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeTxn,
model.ActionAddIndex,
model.JobStateSynced,
Expand All @@ -154,7 +154,7 @@ func TestAddIngestRecorder(t *testing.T) {
require.NoError(t, err)

// no add-index job, should ignore it
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionDropIndex,
model.JobStateSynced,
Expand All @@ -170,7 +170,7 @@ func TestAddIngestRecorder(t *testing.T) {
require.NoError(t, err)

// no synced job, should ignore it
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateRollbackDone,
Expand All @@ -188,7 +188,7 @@ func TestAddIngestRecorder(t *testing.T) {
{
recorder := ingestrec.New()
// a normal ingest add index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand All @@ -209,7 +209,7 @@ func TestAddIngestRecorder(t *testing.T) {
{
recorder := ingestrec.New()
// a normal ingest add primary index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddPrimaryKey,
model.JobStateSynced,
Expand All @@ -229,7 +229,7 @@ func TestAddIngestRecorder(t *testing.T) {

{
// a sub job as add primary index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddPrimaryKey,
model.JobStateDone,
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestIndexesKind(t *testing.T) {
}

recorder := ingestrec.New()
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestRewriteTableID(t *testing.T) {
},
}
recorder := ingestrec.New()
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type AppliedFile interface {
GetEndKey() []byte
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
// getPartitionIDMap creates a map maping old physical ID to new physical ID.
func getPartitionIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := make(map[int64]int64)

if oldTable.Partition != nil && newTable.Partition != nil {
Expand All @@ -60,6 +60,12 @@ func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
}
}

return tableIDMap
}

// getTableIDMap creates a map maping old tableID to new tableID.
func getTableIDMap(newTable, oldTable *model.TableInfo) map[int64]int64 {
tableIDMap := getPartitionIDMap(newTable, oldTable)
tableIDMap[oldTable.ID] = newTable.ID
return tableIDMap
}
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//pkg/ddl",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
Expand Down Expand Up @@ -54,10 +55,11 @@ go_test(
],
embed = [":stream"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//pkg/ddl",
"//pkg/meta",
"//pkg/parser/ast",
"//pkg/parser/model",
Expand Down
Loading

0 comments on commit be62f75

Please sign in to comment.