Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add table filter for log restore #57394

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
require.False(t, exists)
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
Progress: checkpoint.InLogRestoreAndIdMapPersist,
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
})
require.NoError(t, err)
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand All @@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)

exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
require.False(t, exists)
Expand Down
20 changes: 10 additions & 10 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,22 @@ func ExistsLogRestoreCheckpointMetadata(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
}

// A progress type for snapshot + log restore.
// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persist into external storage, the snapshot restore and
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
// Before the id-maps is persisted into external storage, the snapshot restore and
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
// it can retry from snapshot restore.
//
// After the id-maps is persist into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
// After the id-maps is persisted into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. A situation could be:
//
// the first execution:
//
// table A created in snapshot restore is renamed to table B in log restore
// table A (id 80) --------------> table B (id 80)
// ( snapshot restore ) ( log restore )
//
// the second execution if don't skip snasphot restore:
// the second execution if don't skip snapshot restore:
//
// table A is created again in snapshot restore, because there is no table named A
// table A (id 81) --------------> [not in id-maps, so ignored]
Expand All @@ -221,8 +221,8 @@ type RestoreProgress int

const (
InSnapshotRestore RestoreProgress = iota
// Only when the id-maps is persist, status turns into it.
InLogRestoreAndIdMapPersist
// Only when the id-maps is persisted, status turns into it.
InLogRestoreAndIdMapPersisted
)

type CheckpointProgress struct {
Expand Down Expand Up @@ -254,8 +254,8 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
// restore task executed for this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
1 change: 0 additions & 1 deletion br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func StartCheckpointRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore(
ctx context.Context,
se glue.Session,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (switcher *ImportModeSwitcher) GoSwitchToImportMode(
return nil
}

// RestorePreWork executes some prepare work before restore.
// RestorePreWork switches to import mode and removes pd schedulers if needed
// TODO make this function returns a restore post work.
func RestorePreWork(
ctx context.Context,
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log_client",
srcs = [
"batch_meta_processor.go",
"client.go",
"compacted_file_strategy.go",
"import.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
Expand Down Expand Up @@ -71,7 +73,6 @@ go_library(
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

Expand Down Expand Up @@ -103,6 +104,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/domain",
Expand All @@ -117,7 +119,6 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
228 changes: 228 additions & 0 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logclient

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"go.uber.org/zap"
)

// BatchMetaKVProcessor defines how to process a batch of files
type BatchMetaKVProcessor interface {
// ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration
ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error)
}

// RestoreMetaKVProcessor implements BatchMetaKVProcessor for restoring files in batches
type RestoreMetaKVProcessor struct {
client *LogClient
schemasReplace *stream.SchemasReplace
updateStats func(kvCount uint64, size uint64)
progressInc func()
}

func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.SchemasReplace,
updateStats func(kvCount uint64, size uint64),
progressInc func()) *RestoreMetaKVProcessor {
return &RestoreMetaKVProcessor{
client: client,
schemasReplace: schemasReplace,
updateStats: updateStats,
progressInc: progressInc,
}
}

// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
ctx context.Context,
files []*backuppb.DataFileInfo,
) error {
// starts gc row collector
rp.client.RunGCRowsLoader(ctx)

// separate the files by CF and sort each group by TS
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)

log.Info("start to restore meta files",
zap.Int("total files", len(files)),
zap.Int("default files", len(filesInDefaultCF)),
zap.Int("write files", len(filesInWriteCF)))

if err := LoadAndProcessMetaKVFilesInBatch(
ctx,
filesInDefaultCF,
filesInWriteCF,
rp,
); err != nil {
return errors.Trace(err)
}

// UpdateTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// the latest schema update.
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
return errors.Trace(err)
}
return nil
}

func (rp *RestoreMetaKVProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
return rp.client.RestoreBatchMetaKVFiles(
ctx, files, rp.schemasReplace, entries,
filterTS, rp.updateStats, rp.progressInc, cf,
)
}

// MetaKVInfoProcessor implements BatchMetaKVProcessor to iterate meta kv and collect information.
//
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
// that corresponds to the same table id.
//
// 2. It builds the id mapping from upstream to downstream. This logic was nested into table rewrite previously and now
// separated out to its own component.
type MetaKVInfoProcessor struct {
client *LogClient
tableHistoryManager *stream.LogBackupTableHistoryManager
tableMappingManager *stream.TableMappingManager
}

func NewMetaKVInfoProcessor(client *LogClient) *MetaKVInfoProcessor {
return &MetaKVInfoProcessor{
client: client,
tableHistoryManager: stream.NewTableHistoryManager(),
tableMappingManager: stream.NewTableMappingManager(),
}
}

func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(
ctx context.Context,
files []*backuppb.DataFileInfo,
) error {
// separate the files by CF and sort each group by TS
filesInDefaultCF, filesInWriteCF := SeparateAndSortFilesByCF(files)

if err := LoadAndProcessMetaKVFilesInBatch(
ctx,
filesInDefaultCF,
filesInWriteCF,
mp,
); err != nil {
return errors.Trace(err)
}
return nil
}

func (mp *MetaKVInfoProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
curSortedEntries, filteredEntries, err := mp.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
if err != nil {
return nil, errors.Trace(err)
}

// process entries to collect table IDs
for _, entry := range curSortedEntries {
// get value from default cf and get the short value if possible from write cf
value, err := stream.ExtractValue(&entry.E, cf)

// write cf doesn't have short value in it
if value == nil {
continue
}
if err != nil {
return nil, errors.Trace(err)
}

if utils.IsMetaDBKey(entry.E.Key) {
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
if err != nil {
return nil, errors.Trace(err)
}

if meta.IsDBkey(rawKey.Field) {
var dbInfo model.DBInfo
if err := json.Unmarshal(value, &dbInfo); err != nil {
return nil, errors.Trace(err)
}

// collect db id -> name mapping during log backup, it will contain information about newly created db
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)

// update the id map
if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(dbInfo); err != nil {
return nil, errors.Trace(err)
}
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
}

// collect table history indexed by table id, same id may have different table names in history
if meta.IsTableKey(rawKey.Field) {
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
// cannot use dbib in the parsed table info cuz it might not set so default to 0
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return nil, errors.Trace(err)
}

// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// update the id map
if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, tableInfo); err != nil {
return nil, errors.Trace(err)
}
}
}
}
return filteredEntries, nil
}

func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager {
return mp.tableMappingManager
}

func (mp *MetaKVInfoProcessor) GetTableHistoryManager() *stream.LogBackupTableHistoryManager {
return mp.tableHistoryManager
}
Loading
Loading