Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Dec 19, 2024
1 parent 27a9a7d commit 79e7cdb
Show file tree
Hide file tree
Showing 54 changed files with 2,312 additions and 1,086 deletions.
1 change: 0 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ func BuildBackupSchemas(
if err != nil {
return errors.Trace(err)
}

for _, dbInfo := range dbs {
// skip system databases
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
require.False(t, exists)
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
Progress: checkpoint.InLogRestoreAndIdMapPersist,
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
})
require.NoError(t, err)
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand All @@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)

exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
require.False(t, exists)
Expand Down
29 changes: 16 additions & 13 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ func StartCheckpointLogRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
createSessionFn func() (glue.Session, error),
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
session, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
newTableCheckpointStorage(session, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
Expand Down Expand Up @@ -194,22 +197,22 @@ func ExistsLogRestoreCheckpointMetadata(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
}

// A progress type for snapshot + log restore.
// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persist into external storage, the snapshot restore and
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
// Before the id-maps is persisted into external storage, the snapshot restore and
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
// it can retry from snapshot restore.
//
// After the id-maps is persist into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
// After the id-maps is persisted into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. A situation could be:
//
// the first execution:
//
// table A created in snapshot restore is renamed to table B in log restore
// table A (id 80) --------------> table B (id 80)
// ( snapshot restore ) ( log restore )
//
// the second execution if don't skip snasphot restore:
// the second execution if don't skip snapshot restore:
//
// table A is created again in snapshot restore, because there is no table named A
// table A (id 81) --------------> [not in id-maps, so ignored]
Expand All @@ -221,8 +224,8 @@ type RestoreProgress int

const (
InSnapshotRestore RestoreProgress = iota
// Only when the id-maps is persist, status turns into it.
InLogRestoreAndIdMapPersist
// Only when the id-maps is persisted, status turns into it.
InLogRestoreAndIdMapPersisted
)

type CheckpointProgress struct {
Expand Down Expand Up @@ -254,8 +257,8 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
// restore task executed for this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ func StartCheckpointRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore(
ctx context.Context,
se glue.Session,
createSessionFn func() (glue.Session, error),
dbName string,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) {
session, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[RestoreKeyType, RestoreValueType](
newTableCheckpointStorage(se, dbName),
newTableCheckpointStorage(session, dbName),
nil, valueMarshalerForRestore)

// for restore, no need to set lock
Expand Down
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func IsContextCanceled(err error) bool {
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrInvalidState = errors.Normalize("invalid state", errors.RFCCodeText("BR:Common:ErrInvalidState"))
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (switcher *ImportModeSwitcher) GoSwitchToImportMode(
return nil
}

// RestorePreWork executes some prepare work before restore.
// RestorePreWork switches to import mode and removes pd schedulers if needed
// TODO make this function returns a restore post work.
func RestorePreWork(
ctx context.Context,
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log_client",
srcs = [
"batch_file_processor.go",
"client.go",
"compacted_file_strategy.go",
"import.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
Expand Down Expand Up @@ -103,6 +105,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/domain",
Expand All @@ -117,7 +120,6 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
226 changes: 226 additions & 0 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logclient

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/consts"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"go.uber.org/zap"
)

// BatchMetaKVProcessor defines how to process a batch of files
type BatchMetaKVProcessor interface {
// ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration
ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error)
}

// RestoreMetaKVProcessor implements BatchMetaKVProcessor for restoring files in batches
type RestoreMetaKVProcessor struct {
client *LogClient
schemasReplace *stream.SchemasReplace
updateStats func(kvCount uint64, size uint64)
progressInc func()
}

func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func()) *RestoreMetaKVProcessor {
return &RestoreMetaKVProcessor{
client: client,
schemasReplace: schemasReplace,
updateStats: updateStats,
progressInc: progressInc,
}
}

// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
) error {
// starts gc row collector
rp.client.RunGCRowsLoader(ctx)

// separate the files by CF and sort each group by TS
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)

log.Info("start to restore meta files",
zap.Int("total files", len(files)),
zap.Int("default files", len(filesInDefaultCF)),
zap.Int("write files", len(filesInWriteCF)))

if err := LoadAndProcessMetaKVFilesInBatch(
ctx,
filesInDefaultCF,
filesInWriteCF,
rp,
); err != nil {
return errors.Trace(err)
}

// UpdateTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// the latest schema update.
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
return errors.Trace(err)
}
return nil
}

func (rp *RestoreMetaKVProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
return rp.client.RestoreBatchMetaKVFiles(
ctx, files, rp.schemasReplace, entries,
filterTS, rp.updateStats, rp.progressInc, cf,
)
}

// MetaKVInfoProcessor implements BatchMetaKVProcessor to iterate meta kv and collect information.
//
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
// that corresponds to the same table id.
//
// 2. It builds the id mapping from upstream to downstream. This logic was nested into table rewrite previously and now
// separated out to its own component.
type MetaKVInfoProcessor struct {
client *LogClient
tableHistoryManager *stream.LogBackupTableHistoryManager
tableMappingManager *stream.TableMappingManager
}

func NewMetaKVInfoProcessor(client *LogClient) *MetaKVInfoProcessor {
return &MetaKVInfoProcessor{
client: client,
tableHistoryManager: stream.NewTableHistoryManager(),
tableMappingManager: stream.NewTableMappingManager(),
}
}

func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(
ctx context.Context,
files []*backuppb.DataFileInfo,
) error {
// separate the files by CF and sort each group by TS
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)

if err := LoadAndProcessMetaKVFilesInBatch(
ctx,
filesInDefaultCF,
filesInWriteCF,
mp,
); err != nil {
return errors.Trace(err)
}
return nil
}

func (mp *MetaKVInfoProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
// doesn't need to parse writeCF as it contains value like "p\XXXX\XXX" which is meaningless.
// DefaultCF value should contain everything we want for DDL operation
if cf == consts.WriteCF {
return nil, nil
}

curSortedEntries, filteredEntries, err := mp.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
if err != nil {
return nil, errors.Trace(err)
}

// process entries to collect table IDs
for _, entry := range curSortedEntries {
value := entry.E.Value

if utils.IsMetaDBKey(entry.E.Key) {
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
if err != nil {
return nil, errors.Trace(err)
}

if meta.IsDBkey(rawKey.Field) {
var dbInfo model.DBInfo
if err := json.Unmarshal(value, &dbInfo); err != nil {
return nil, errors.Trace(err)
}

// collect db id -> name mapping during log backup, it will contain information about newly created db
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)

// update the id map
if err = mp.tableMappingManager.ProcessDBEntryAndUpdateIdMapping(dbInfo); err != nil {
return nil, errors.Trace(err)
}
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
}

// collect table history indexed by table id, same id may have different table names in history
if meta.IsTableKey(rawKey.Field) {
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
// cannot use dbib in the parsed table info cuz it might not set so default to 0
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return nil, errors.Trace(err)
}

// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// update the id map
if err = mp.tableMappingManager.ProcessTableEntryAndUpdateIdMapping(dbID, tableInfo); err != nil {
return nil, errors.Trace(err)
}
}
}
}
return filteredEntries, nil
}

func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager {
return mp.tableMappingManager
}

func (mp *MetaKVInfoProcessor) GetTableHistoryManager() *stream.LogBackupTableHistoryManager {
return mp.tableHistoryManager
}
Loading

0 comments on commit 79e7cdb

Please sign in to comment.