Skip to content

Commit

Permalink
importinto: pass kv store directly when rebase ids (#51850)
Browse files Browse the repository at this point in the history
close #51848
  • Loading branch information
D3Hunter authored Mar 18, 2024
1 parent af76c2f commit 5a2b8e8
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 56 deletions.
4 changes: 2 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
}

// postProcess does the post-processing for the task.
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
func postProcess(ctx context.Context, store kv.Storage, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
failpoint.Inject("syncBeforePostProcess", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
Expand All @@ -117,7 +117,7 @@ func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProce
callLog.End(zap.ErrorLevel, err)
}()

if err = importer.RebaseAllocatorBases(ctx, subtaskMeta.MaxIDs, &taskMeta.Plan, logger); err != nil {
if err = importer.RebaseAllocatorBases(ctx, store, subtaskMeta.MaxIDs, &taskMeta.Plan, logger); err != nil {
return err
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func (e *writeAndIngestStepExecutor) Cleanup(_ context.Context) (err error) {
type postProcessStepExecutor struct {
taskexecutor.EmptyStepExecutor
taskID int64
store tidbkv.Storage
taskMeta *TaskMeta
logger *zap.Logger
}
Expand All @@ -470,9 +471,10 @@ var _ execute.StepExecutor = &postProcessStepExecutor{}

// NewPostProcessStepExecutor creates a new post process step executor.
// exported for testing.
func NewPostProcessStepExecutor(taskID int64, taskMeta *TaskMeta, logger *zap.Logger) execute.StepExecutor {
func NewPostProcessStepExecutor(taskID int64, store tidbkv.Storage, taskMeta *TaskMeta, logger *zap.Logger) execute.StepExecutor {
return &postProcessStepExecutor{
taskID: taskID,
store: store,
taskMeta: taskMeta,
logger: logger,
}
Expand All @@ -491,7 +493,7 @@ func (p *postProcessStepExecutor) RunSubtask(ctx context.Context, subtask *proto
failpoint.Inject("waitBeforePostProcess", func() {
time.Sleep(5 * time.Second)
})
return postProcess(ctx, p.taskMeta, &stepMeta, logger)
return postProcess(ctx, p.store, p.taskMeta, &stepMeta, logger)
}

type importExecutor struct {
Expand Down Expand Up @@ -563,7 +565,7 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task, stepResource *proto.S
store: e.store,
}, nil
case proto.ImportStepPostProcess:
return NewPostProcessStepExecutor(task.ID, &taskMeta, logger), nil
return NewPostProcessStepExecutor(task.ID, e.store, &taskMeta, logger), nil
default:
return nil, errors.Errorf("unknown step %d for import task %d", task.Step, task.ID)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/disttask/importinto/task_executor_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPostProcessStepExecutor(t *testing.T) {

bytes, err := json.Marshal(stepMeta)
require.NoError(t, err)
executor := importinto.NewPostProcessStepExecutor(1, taskMeta, zap.NewExample())
executor := importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)

Expand All @@ -79,17 +79,17 @@ func TestPostProcessStepExecutor(t *testing.T) {
stepMeta.Checksum[-1] = tmp
bytes, err = json.Marshal(stepMeta)
require.NoError(t, err)
executor = importinto.NewPostProcessStepExecutor(1, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.ErrorContains(t, err, "checksum mismatched remote vs local")

taskMeta.Plan.Checksum = config.OpLevelOptional
executor = importinto.NewPostProcessStepExecutor(1, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)

taskMeta.Plan.Checksum = config.OpLevelOff
executor = importinto.NewPostProcessStepExecutor(1, taskMeta, zap.NewExample())
executor = importinto.NewPostProcessStepExecutor(1, store, taskMeta, zap.NewExample())
err = executor.RunSubtask(context.Background(), &proto.Subtask{Meta: bytes})
require.NoError(t, err)
}
2 changes: 0 additions & 2 deletions pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
Expand Down Expand Up @@ -157,7 +156,6 @@ go_test(
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
Expand Down
5 changes: 0 additions & 5 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/expression"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
pformat "github.com/pingcap/tidb/pkg/parser/format"
Expand All @@ -60,7 +59,6 @@ import (
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/stringutil"
kvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -171,9 +169,6 @@ func (t DataSourceType) String() string {
}

var (
// GetKVStore returns a kv.Storage.
// kv encoder of physical mode needs it.
GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error)
// NewClientWithContext returns a kv.Client.
NewClientWithContext = pd.NewClientWithContext
)
Expand Down
16 changes: 1 addition & 15 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -47,7 +46,6 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
kvconfig "github.com/tikv/client-go/v2/config"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
Expand Down Expand Up @@ -200,10 +198,6 @@ func TestPostProcess(t *testing.T) {
return tk.Session(), nil
}, 1, 1, time.Second)
defer pool.Close()
bak := importer.GetKVStore
defer func() {
importer.GetKVStore = bak
}()

tk.MustExec("create database db")
tk.MustExec("create table db.tb(id int primary key)")
Expand Down Expand Up @@ -231,19 +225,11 @@ func TestPostProcess(t *testing.T) {
localChecksum = verify.NewKVGroupChecksumForAdd()
localChecksum.AddRawGroup(verify.DataKVGroupID, 1, 1, 1)
require.NoError(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger))
// get KV store failed
importer.GetKVStore = func(path string, tls kvconfig.Security) (kv.Storage, error) {
return nil, errors.New("mock get kv store failed")
}
// rebase success
tk.MustExec("create table db.tb2(id int auto_increment primary key)")
table, err = do.InfoSchema().TableByName(model.NewCIStr("db"), model.NewCIStr("tb2"))
require.NoError(t, err)
plan.TableInfo, plan.DesiredTableInfo = table.Meta(), table.Meta()
require.ErrorContains(t, importer.PostProcess(ctx, tk.Session(), nil, plan, localChecksum, logger), "mock get kv store failed")
// rebase success
importer.GetKVStore = func(path string, tls kvconfig.Security) (kv.Storage, error) {
return store, nil
}
integration.BeforeTestExternal(t)
testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
t.Cleanup(func() {
Expand Down
23 changes: 2 additions & 21 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package importer

import (
"context"
"fmt"
"io"
"math"
"net"
Expand Down Expand Up @@ -127,19 +126,6 @@ func prepareSortDir(e *LoadDataController, id string, tidbCfg *tidb.Config) (str
return sortDir, nil
}

// GetCachedKVStoreFrom gets a cached kv store from PD address.
// Callers should NOT close the kv store.
func GetCachedKVStoreFrom(pdAddr string, tls *common.TLS) (tidbkv.Storage, error) {
// Disable GC because TiDB enables GC already.
keySpaceName := tidb.GetGlobalKeyspaceName()
// the kv store we get is a cached store, so we can't close it.
kvStore, err := GetKVStore(fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", pdAddr, keySpaceName), tls.ToTiKVSecurityConfig())
if err != nil {
return nil, errors.Trace(err)
}
return kvStore, nil
}

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error) {
tidbCfg := tidb.GetGlobalConfig()
Expand Down Expand Up @@ -780,7 +766,7 @@ func PostProcess(
callLog.End(zap.ErrorLevel, err)
}()

if err = RebaseAllocatorBases(ctx, maxIDs, plan, logger); err != nil {
if err = RebaseAllocatorBases(ctx, se.GetStore(), maxIDs, plan, logger); err != nil {
return err
}

Expand All @@ -801,7 +787,7 @@ func (r *autoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
}

// RebaseAllocatorBases rebase the allocator bases.
func RebaseAllocatorBases(ctx context.Context, maxIDs map[autoid.AllocatorType]int64, plan *Plan, logger *zap.Logger) (err error) {
func RebaseAllocatorBases(ctx context.Context, kvStore tidbkv.Storage, maxIDs map[autoid.AllocatorType]int64, plan *Plan, logger *zap.Logger) (err error) {
callLog := log.BeginTask(logger, "rebase allocators")
defer func() {
callLog.End(zap.ErrorLevel, err)
Expand All @@ -824,11 +810,6 @@ func RebaseAllocatorBases(ctx context.Context, maxIDs map[autoid.AllocatorType]i
return err2
}

// no need to close kvStore, since it's a cached store.
kvStore, err2 := GetCachedKVStoreFrom(tidbCfg.Path, tls)
if err2 != nil {
return errors.Trace(err2)
}
addrs := strings.Split(tidbCfg.Path, ",")
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
Expand Down
1 change: 0 additions & 1 deletion pkg/store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/store/driver",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/importer",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
Expand Down
3 changes: 0 additions & 3 deletions pkg/store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -60,8 +59,6 @@ func init() {
// Setup the Hooks to dynamic control global resource controller.
variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl
variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl
// cannot use this package directly, it causes import cycle
importer.GetKVStore = getKVStore
}

// Option is a function that changes some config of Driver
Expand Down

0 comments on commit 5a2b8e8

Please sign in to comment.