From 6ebc5311efcce11e12abd35d7c66dd6cc88b111e Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Mon, 25 Nov 2024 10:53:12 -0500 Subject: [PATCH] run prepare Signed-off-by: Wenqi Mou --- br/pkg/checkpoint/log_restore.go | 3 +- br/pkg/restore/log_client/BUILD.bazel | 2 +- .../log_client/batch_file_processor.go | 8 +- br/pkg/restore/log_client/client.go | 19 +- br/pkg/restore/log_client/client_test.go | 311 ++++++++---------- .../log_client/log_file_manager_test.go | 9 +- .../snap_client/systable_restore_test.go | 2 +- br/pkg/stream/BUILD.bazel | 2 +- br/pkg/stream/rewrite_meta_rawkv.go | 54 ++- br/pkg/task/restore.go | 1 - br/pkg/task/stream.go | 7 +- br/pkg/utils/BUILD.bazel | 2 +- 12 files changed, 186 insertions(+), 234 deletions(-) diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index e78e35fe6cc85..84ee0eed14281 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -264,7 +264,8 @@ func ExistsCheckpointProgress( TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName)) } -// CheckpointTaskInfoForLogRestore is tied to a specific cluster. It represents the last restore task executed this cluster. +// CheckpointTaskInfoForLogRestore is tied to a specific cluster. +// It represents the last restore task executed in this cluster. type CheckpointTaskInfoForLogRestore struct { Metadata *CheckpointMetadataForLogRestore HasSnapshotMetadata bool diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 541edd9b3ea3e..bb9da80979226 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -105,6 +105,7 @@ go_test( "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/utils", + "//br/pkg/utils/consts", "//br/pkg/utils/iter", "//br/pkg/utiltest", "//pkg/domain", @@ -119,7 +120,6 @@ go_test( "//pkg/util/chunk", "//pkg/util/codec", "//pkg/util/sqlexec", - "//pkg/util/table-filter", "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/br/pkg/restore/log_client/batch_file_processor.go b/br/pkg/restore/log_client/batch_file_processor.go index b332a3dcbc1c4..da8a74f1c4937 100644 --- a/br/pkg/restore/log_client/batch_file_processor.go +++ b/br/pkg/restore/log_client/batch_file_processor.go @@ -29,8 +29,8 @@ import ( // BatchFileProcessor defines how to process a batch of files type BatchFileProcessor interface { - // process a batch of files and with a filterTS and return what's not processed for next iteration - processBatch( + // ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration + ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, @@ -47,7 +47,7 @@ type RestoreProcessor struct { progressInc func() } -func (rp *RestoreProcessor) processBatch( +func (rp *RestoreProcessor) ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, @@ -71,7 +71,7 @@ type DDLCollector struct { tableRenameInfo *stream.LogBackupTableHistory } -func (dc *DDLCollector) processBatch( +func (dc *DDLCollector) ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index b1c96d90c2784..e19aa26a250bd 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -1016,15 +1016,17 @@ func (rc *LogClient) InitSchemasReplaceForDDL( dbReplaces = stream.FromSchemaMaps(dbMaps) if len(dbReplaces) <= 0 { envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL) - if ok && len(envVal) > 0 { - log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)) - } else { + if !ok || len(envVal) <= 0 { return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS) } + log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)) } } else { log.Info("building table replaces from full backup storage") dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg, cipherInfo) + if err != nil { + return nil, errors.Trace(err) + } } } @@ -1272,7 +1274,7 @@ func LoadAndProcessMetaKVFilesInBatch( } else { // Either f.MinTS > rangeMax or f.MinTs is the filterTs we need. // So it is ok to pass f.MinTs as filterTs. - defaultKvEntries, err = processor.processBatch(ctx, defaultFiles[defaultIdx:i], defaultKvEntries, f.MinTs, consts.DefaultCF) + defaultKvEntries, err = processor.ProcessBatch(ctx, defaultFiles[defaultIdx:i], defaultKvEntries, f.MinTs, consts.DefaultCF) if err != nil { return errors.Trace(err) } @@ -1289,7 +1291,7 @@ func LoadAndProcessMetaKVFilesInBatch( break } } - writeKvEntries, err = processor.processBatch(ctx, writeFiles[writeIdx:toWriteIdx], writeKvEntries, f.MinTs, consts.WriteCF) + writeKvEntries, err = processor.ProcessBatch(ctx, writeFiles[writeIdx:toWriteIdx], writeKvEntries, f.MinTs, consts.WriteCF) if err != nil { return errors.Trace(err) } @@ -1301,11 +1303,11 @@ func LoadAndProcessMetaKVFilesInBatch( // restore the left meta kv files and entries // Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty // Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles) - _, err = processor.processBatch(ctx, defaultFiles[defaultIdx:], defaultKvEntries, math.MaxUint64, consts.DefaultCF) + _, err = processor.ProcessBatch(ctx, defaultFiles[defaultIdx:], defaultKvEntries, math.MaxUint64, consts.DefaultCF) if err != nil { return errors.Trace(err) } - _, err = processor.processBatch(ctx, writeFiles[writeIdx:], writeKvEntries, math.MaxUint64, consts.WriteCF) + _, err = processor.ProcessBatch(ctx, writeFiles[writeIdx:], writeKvEntries, math.MaxUint64, consts.WriteCF) if err != nil { return errors.Trace(err) } @@ -1327,6 +1329,9 @@ func (rc *LogClient) RestoreBatchMetaKVFiles( cf string, ) ([]*KvEntryWithTS, error) { curSortedKvEntries, filteredOutKvEntries, err := rc.filterAndSortKvEntriesFromFiles(ctx, files, kvEntries, filterTS) + if err != nil { + return nil, errors.Trace(err) + } // restore and rewrite these entries to TiKV with rawPut() method. kvCount, size, err := rc.restoreAndRewriteMetaKvEntries(ctx, schemasReplace, curSortedKvEntries, cf) diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 93d9eee1ebf74..7ec880383f28b 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/sqlexec" - filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" "google.golang.org/grpc/keepalive" ) @@ -137,21 +136,6 @@ func TestDeleteRangeQuery(t *testing.T) { } } -func MockEmptySchemasReplace() *stream.SchemasReplace { - dbMap := make(map[stream.UpstreamID]*stream.DBReplace) - return stream.NewSchemasReplace( - dbMap, - true, - nil, - 1, - filter.All(), - nil, - nil, - nil, - nil, - ) -} - func TestRestoreBatchMetaKVFiles(t *testing.T) { client := logclient.NewLogClient(nil, nil, nil, keepalive.ClientParameters{}) files := []*backuppb.DataFileInfo{} @@ -162,41 +146,35 @@ func TestRestoreBatchMetaKVFiles(t *testing.T) { } func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { - files_default := []*backuppb.DataFileInfo{} - files_write := []*backuppb.DataFileInfo{} + var filesDefault []*backuppb.DataFileInfo + var filesWrite []*backuppb.DataFileInfo batchCount := 0 - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - files_default, - files_write, - sr, - nil, - nil, - func( - ctx context.Context, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, files []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { + cf string) ([]*logclient.KvEntryWithTS, error) { require.Equal(t, 0, len(entries)) require.Equal(t, 0, len(files)) batchCount++ return nil, nil }, + } + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + filesDefault, + filesWrite, + mockProcessor, ) require.Nil(t, err) require.Equal(t, batchCount, 2) } func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) { - files_default := []*backuppb.DataFileInfo{} - files_write := []*backuppb.DataFileInfo{ + var filesDefault []*backuppb.DataFileInfo + filesWrite := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -205,89 +183,78 @@ func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) { } batchCount := 0 - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - files_default, - files_write, - sr, - nil, - nil, - func( - ctx context.Context, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, files []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { + cf string) ([]*logclient.KvEntryWithTS, error) { if len(entries) == 0 && len(files) == 0 { - require.Equal(t, stream.DefaultCF, cf) + require.Equal(t, consts.DefaultCF, cf) batchCount++ } else { require.Equal(t, 0, len(entries)) require.Equal(t, 1, len(files)) require.Equal(t, uint64(100), files[0].MinTs) - require.Equal(t, stream.WriteCF, cf) + require.Equal(t, consts.WriteCF, cf) } require.Equal(t, uint64(math.MaxUint64), filterTS) return nil, nil }, + } + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + filesDefault, + filesWrite, + mockProcessor, ) require.Nil(t, err) require.Equal(t, batchCount, 1) } func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_1(t *testing.T) { - files_default := []*backuppb.DataFileInfo{ + filesDefault := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, MaxTs: 120, }, } - files_write := []*backuppb.DataFileInfo{} + var filesWrite []*backuppb.DataFileInfo batchCount := 0 - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - files_default, - files_write, - sr, - nil, - nil, - func( - ctx context.Context, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, files []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { + cf string) ([]*logclient.KvEntryWithTS, error) { if len(entries) == 0 && len(files) == 0 { - require.Equal(t, stream.WriteCF, cf) + require.Equal(t, consts.WriteCF, cf) batchCount++ } else { require.Equal(t, 0, len(entries)) require.Equal(t, 1, len(files)) require.Equal(t, uint64(100), files[0].MinTs) - require.Equal(t, stream.DefaultCF, cf) + require.Equal(t, consts.DefaultCF, cf) } require.Equal(t, uint64(math.MaxUint64), filterTS) return nil, nil }, + } + + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + filesDefault, + filesWrite, + mockProcessor, ) require.Nil(t, err) require.Equal(t, batchCount, 1) } func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { - files_default := []*backuppb.DataFileInfo{ + filesDefault := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -301,31 +268,19 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { Length: logclient.MetaKVBatchSize, }, } - files_write := []*backuppb.DataFileInfo{} + var filesWrite []*backuppb.DataFileInfo emptyCount := 0 batchCount := 0 - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - files_default, - files_write, - sr, - nil, - nil, - func( - ctx context.Context, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, files []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { + cf string) ([]*logclient.KvEntryWithTS, error) { if len(entries) == 0 && len(files) == 0 { // write - write - require.Equal(t, stream.WriteCF, cf) + require.Equal(t, consts.WriteCF, cf) emptyCount++ if emptyCount == 1 { require.Equal(t, uint64(110), filterTS) @@ -336,7 +291,7 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { // default - default batchCount++ require.Equal(t, 1, len(files)) - require.Equal(t, stream.DefaultCF, cf) + require.Equal(t, consts.DefaultCF, cf) if batchCount == 1 { require.Equal(t, uint64(100), files[0].MinTs) require.Equal(t, uint64(110), filterTS) @@ -346,6 +301,13 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { } return nil, nil }, + } + + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + filesDefault, + filesWrite, + mockProcessor, ) require.Nil(t, err) require.Equal(t, batchCount, 2) @@ -353,7 +315,7 @@ func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { } func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { - files_default := []*backuppb.DataFileInfo{ + filesDefault := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -367,31 +329,19 @@ func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { Length: logclient.MetaKVBatchSize, }, } - files_write := []*backuppb.DataFileInfo{} + var filesWrite []*backuppb.DataFileInfo emptyCount := 0 batchCount := 0 - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - files_default, - files_write, - sr, - nil, - nil, - func( - ctx context.Context, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, files []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { + cf string) ([]*logclient.KvEntryWithTS, error) { if len(entries) == 0 && len(files) == 0 { // write - write - require.Equal(t, stream.WriteCF, cf) + require.Equal(t, consts.WriteCF, cf) emptyCount++ if emptyCount == 1 { require.Equal(t, uint64(110), filterTS) @@ -402,7 +352,7 @@ func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { // default - default batchCount++ require.Equal(t, 1, len(files)) - require.Equal(t, stream.DefaultCF, cf) + require.Equal(t, consts.DefaultCF, cf) if batchCount == 1 { require.Equal(t, uint64(100), files[0].MinTs) require.Equal(t, uint64(110), filterTS) @@ -412,6 +362,13 @@ func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { } return nil, nil }, + } + + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + filesDefault, + filesWrite, + mockProcessor, ) require.Nil(t, err) require.Equal(t, batchCount, 2) @@ -478,31 +435,27 @@ func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { result := make(map[int][]*backuppb.DataFileInfo) resultKV := make(map[int]int) - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - defaultFiles, - writeFiles, - sr, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, + files []*backuppb.DataFileInfo, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { - result[batchCount] = fs + cf string) ([]*logclient.KvEntryWithTS, error) { + result[batchCount] = files t.Log(filterTS) resultKV[batchCount] = len(entries) batchCount++ return make([]*logclient.KvEntryWithTS, batchCount), nil }, + } + + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + defaultFiles, + writeFiles, + mockProcessor, ) + require.Nil(t, err) require.Equal(t, len(result), 4) require.Equal(t, result[0], defaultFiles[0:3]) @@ -564,29 +517,25 @@ func TestRestoreMetaKVFilesWithBatchMethod4(t *testing.T) { batchCount := 0 result := make(map[int][]*backuppb.DataFileInfo) - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - defaultFiles, - writeFiles, - sr, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, + files []*backuppb.DataFileInfo, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { - result[batchCount] = fs + cf string) ([]*logclient.KvEntryWithTS, error) { + result[batchCount] = files batchCount++ return nil, nil }, + } + + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + defaultFiles, + writeFiles, + mockProcessor, ) + require.Nil(t, err) require.Equal(t, len(result), 4) require.Equal(t, result[0], defaultFiles[0:2]) @@ -644,28 +593,22 @@ func TestRestoreMetaKVFilesWithBatchMethod5(t *testing.T) { batchCount := 0 result := make(map[int][]*backuppb.DataFileInfo) - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - defaultFiles, - writeFiles, - sr, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, + files []*backuppb.DataFileInfo, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { - result[batchCount] = fs + cf string) ([]*logclient.KvEntryWithTS, error) { + result[batchCount] = files batchCount++ return nil, nil }, + } + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + defaultFiles, + writeFiles, + mockProcessor, ) require.Nil(t, err) require.Equal(t, len(result), 4) @@ -741,30 +684,24 @@ func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { result := make(map[int][]*backuppb.DataFileInfo) resultKV := make(map[int]int) - sr := MockEmptySchemasReplace() - err := logclient.LoadAndProcessMetaKVFilesInBatch( - context.Background(), - defaultFiles, - writeFiles, - sr, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, + mockProcessor := &mockBatchProcessor{ + processFunc: func(ctx context.Context, + files []*backuppb.DataFileInfo, entries []*logclient.KvEntryWithTS, filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*logclient.KvEntryWithTS, error) { - result[batchCount] = fs + cf string) ([]*logclient.KvEntryWithTS, error) { + result[batchCount] = files t.Log(filterTS) resultKV[batchCount] = len(entries) batchCount++ return make([]*logclient.KvEntryWithTS, batchCount), nil }, + } + err := logclient.LoadAndProcessMetaKVFilesInBatch( + context.Background(), + defaultFiles, + writeFiles, + mockProcessor, ) require.Nil(t, err) require.Equal(t, len(result), 6) @@ -1991,3 +1928,23 @@ func fakeRowKey(tableID, rowID int64) kv.Key { func fakeRowRawKey(tableID, rowID int64) kv.Key { return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID)) } + +type mockBatchProcessor struct { + processFunc func( + ctx context.Context, + files []*backuppb.DataFileInfo, + entries []*logclient.KvEntryWithTS, + filterTS uint64, + cf string, + ) ([]*logclient.KvEntryWithTS, error) +} + +func (m *mockBatchProcessor) ProcessBatch( + ctx context.Context, + files []*backuppb.DataFileInfo, + entries []*logclient.KvEntryWithTS, + filterTS uint64, + cf string, +) ([]*logclient.KvEntryWithTS, error) { + return m.processFunc(ctx, files, entries, filterTS, cf) +} diff --git a/br/pkg/restore/log_client/log_file_manager_test.go b/br/pkg/restore/log_client/log_file_manager_test.go index b153fb9f57d2a..d9ee2ea18d191 100644 --- a/br/pkg/restore/log_client/log_file_manager_test.go +++ b/br/pkg/restore/log_client/log_file_manager_test.go @@ -23,6 +23,7 @@ import ( logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/br/pkg/utils/consts" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/codec" @@ -55,7 +56,7 @@ func wr(start, end uint64, minBegin uint64) *backuppb.DataFileInfo { MinTs: start, MaxTs: end, MinBeginTsInDefaultCf: minBegin, - Cf: stream.WriteCF, + Cf: consts.WriteCF, } } @@ -66,7 +67,7 @@ func dr(start, end uint64) *backuppb.DataFileInfo { Path: fmt.Sprintf("write-%06d", id), MinTs: start, MaxTs: end, - Cf: stream.DefaultCF, + Cf: consts.DefaultCF, } } @@ -618,7 +619,7 @@ func TestReadAllEntries(t *testing.T) { data, file := generateKvData() fm := logclient.TEST_NewLogFileManager(35, 75, 25, &logclient.FakeStreamMetadataHelper{Data: data}) { - file.Cf = stream.WriteCF + file.Cf = consts.WriteCF kvEntries, nextKvEntries, err := fm.ReadFilteredEntriesFromFiles(ctx, file, 50) require.NoError(t, err) require.Equal(t, []*logclient.KvEntryWithTS{ @@ -631,7 +632,7 @@ func TestReadAllEntries(t *testing.T) { }, nextKvEntries) } { - file.Cf = stream.DefaultCF + file.Cf = consts.DefaultCF kvEntries, nextKvEntries, err := fm.ReadFilteredEntriesFromFiles(ctx, file, 50) require.NoError(t, err) require.Equal(t, []*logclient.KvEntryWithTS{ diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index d5952135dbc5b..4af404293a1e7 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -36,7 +36,7 @@ func TestCheckSysTableCompatibility(t *testing.T) { cluster := mc g := gluetidb.New() client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) - err := client.Init(g, cluster.Storage) + err := client.InitConnections(g, cluster.Storage) require.NoError(t, err) info, err := cluster.Domain.GetSnapshotInfoSchema(math.MaxUint64) diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index 9cfcd0faee316..509361d6f165c 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -66,7 +66,7 @@ go_test( ], embed = [":stream"], flaky = True, - shard_count = 46, + shard_count = 44, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index eff2e4be1d2c8..6e411939a3aff 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -262,11 +262,7 @@ func (sr *SchemasReplace) rewriteDBInfo(value []byte) ([]byte, error) { return nil, errors.Trace(err) } - shouldProcess, err := sr.shouldProcessDB(dbInfo.ID) - if err != nil { - return nil, errors.Trace(err) - } - if !shouldProcess { + if shouldProcess := sr.shouldProcessDB(dbInfo.ID); !shouldProcess { return nil, nil } @@ -355,11 +351,7 @@ func (sr *SchemasReplace) rewriteKeyForTable( return nil, errors.Trace(err) } - shouldProcess, err := sr.shouldProcessTable(dbID, tableID) - if err != nil { - return nil, errors.Trace(err) - } - if !shouldProcess { + if shouldProcess := sr.shouldProcessTable(dbID, tableID); !shouldProcess { return nil, nil } @@ -418,14 +410,10 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, er return nil, errors.Trace(err) } - shouldProcess, err := sr.shouldProcessTable(dbID, tableInfo.ID) - - if err != nil { - return nil, errors.Trace(err) - } - if !shouldProcess { + if shouldProcess := sr.shouldProcessTable(dbID, tableInfo.ID); !shouldProcess { return nil, nil } + // construct or find the id map. dbReplace, exist = sr.DbReplaceMap[dbID] if !exist { @@ -757,34 +745,32 @@ func (sr *SchemasReplace) processIngestIndexAndDeleteRangeFromJob(job *model.Job return sr.tryRecordIngestIndex(job) } -func (sr *SchemasReplace) shouldProcessDB(dbId int64) (bool, error) { +func (sr *SchemasReplace) shouldProcessDB(dbId int64) bool { if sr.IsPreConsturctMapStatus() { if sr.PiTRTableFilter == nil { - //return false, errors.Annotate(berrors.ErrRestoreInvalidRewrite, "expecting pitr table filter but got none") - return true, nil + return true } - return sr.PiTRTableFilter.ContainsDB(dbId), nil - } else { - _, ok := sr.DbReplaceMap[dbId] - return ok, nil + return sr.PiTRTableFilter.ContainsDB(dbId) } + // actual rewrite phase + _, ok := sr.DbReplaceMap[dbId] + return ok } -func (sr *SchemasReplace) shouldProcessTable(dbId, tableId int64) (bool, error) { +func (sr *SchemasReplace) shouldProcessTable(dbId, tableId int64) bool { if sr.IsPreConsturctMapStatus() { if sr.PiTRTableFilter == nil { - //return false, errors.Annotate(berrors.ErrRestoreInvalidRewrite, "expecting pitr table filter but got none") - return true, nil + return true } - return sr.PiTRTableFilter.ContainsTable(dbId, tableId), nil - } else { - tableReplace, ok := sr.DbReplaceMap[dbId] - if !ok { - return false, nil - } - _, ok = tableReplace.TableMap[tableId] - return ok, nil + return sr.PiTRTableFilter.ContainsTable(dbId, tableId) + } + // actual rewrite phase + tableReplace, ok := sr.DbReplaceMap[dbId] + if !ok { + return false } + _, ok = tableReplace.TableMap[tableId] + return ok } type DelRangeParams struct { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 58df02811206b..d20a3c0820a30 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -1556,7 +1556,6 @@ func adjustTablesToRestoreAndCreateFilter( break } } - } } // store the filter into config diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6cb40c7120bdc..3a9d236208da5 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1229,7 +1229,11 @@ func RunStreamRestore( return errors.Trace(err) } defer logClient.Close(ctx) + ddlFiles, err := logClient.LoadDDLFilesAndCountDMLFiles(ctx) + if err != nil { + return errors.Trace(err) + } logBackupTableHistory, err = logClient.LoadMetaKVFilesAndBuildTableRenameInfo(ctx, ddlFiles) if err != nil { return errors.Trace(err) @@ -1315,10 +1319,10 @@ func restoreStream( } client, err := createLogClient(ctx, g, cfg, mgr) - defer client.Close(ctx) if err != nil { return errors.Annotate(err, "failed to create log client") } + defer client.Close(ctx) if checkpointTaskInfo != nil && checkpointTaskInfo.Metadata != nil { // reuse the checkpoint task's rewrite ts @@ -1897,7 +1901,6 @@ func isNewRestoreTask(checkpointTaskInfo *checkpoint.CheckpointTaskInfoForLogRes func buildSchemaReplace(ctx context.Context, client *logclient.LogClient, cfg *RestoreConfig, isNewRestoreTask bool) ( *stream.SchemasReplace, error) { - // get full backup meta storage to generate rewrite rules. fullBackupStorage, err := parseFullBackupTablesStorage(cfg) if err != nil { diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index f7f7c6eca29c4..4393584951ddb 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -88,7 +88,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 34, + shard_count = 36, deps = [ "//br/pkg/errors", "//pkg/kv",