Skip to content

Commit

Permalink
import into: add analyze table for import into (#44329)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
GMHDBJD authored Jun 14, 2023
1 parent abd9ff5 commit 14ca3ce
Show file tree
Hide file tree
Showing 13 changed files with 105 additions and 28 deletions.
5 changes: 5 additions & 0 deletions disttask/importinto/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,15 @@ func executeSQL(ctx context.Context, handle dispatcher.TaskHandle, logger *zap.L
}

func updateResult(taskMeta *TaskMeta, subtaskMetas []*SubtaskMeta, logger *zap.Logger) {
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.ReadRowCnt += subtaskMeta.Result.ReadRowCnt
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap
logger.Info("update result", zap.Any("task_meta", taskMeta))
}

Expand Down
5 changes: 3 additions & 2 deletions disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (ti *DistImporter) Result() importer.JobImportResult {
// we can have it when there's duplicate detection.
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings)
return importer.JobImportResult{
Msg: msg,
Affected: taskMeta.Result.ReadRowCnt,
Msg: msg,
Affected: taskMeta.Result.ReadRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
}
}

Expand Down
1 change: 1 addition & 0 deletions disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ type Checksum struct {
type Result struct {
ReadRowCnt uint64
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
1 change: 1 addition & 0 deletions disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (s *ImportScheduler) OnSubtaskFinished(ctx context.Context, subtaskMetaByte
subtaskMeta.Result = Result{
ReadRowCnt: sharedVars.Progress.ReadRowCnt.Load(),
LoadedRowCnt: uint64(dataKVCount),
ColSizeMap: sharedVars.Progress.GetColSize(),
}
s.sharedVars.Delete(subtaskMeta.ID)
return json.Marshal(subtaskMeta)
Expand Down
1 change: 1 addition & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//util",
"@org_golang_x_exp//maps",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
25 changes: 24 additions & 1 deletion executor/asyncloaddata/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package asyncloaddata

import (
"encoding/json"
"sync"

"go.uber.org/atomic"
"golang.org/x/exp/maps"
)

// LogicalImportProgress is the progress info of the logical import mode.
Expand All @@ -37,6 +39,9 @@ type PhysicalImportProgress struct {
// EncodeFileSize is the size of the file that has finished KV encoding in bytes.
// it should equal to SourceFileSize eventually.
EncodeFileSize atomic.Int64

colSizeMu sync.Mutex
colSizeMap map[int64]int64
}

// Progress is the progress of the LOAD DATA task.
Expand Down Expand Up @@ -64,7 +69,9 @@ func NewProgress(logicalImport bool) *Progress {
if logicalImport {
li = &LogicalImportProgress{}
} else {
pi = &PhysicalImportProgress{}
pi = &PhysicalImportProgress{
colSizeMap: make(map[int64]int64),
}
}
return &Progress{
SourceFileSize: -1,
Expand All @@ -73,6 +80,22 @@ func NewProgress(logicalImport bool) *Progress {
}
}

// AddColSize adds the size of the column to the progress.
func (p *Progress) AddColSize(colSizeMap map[int64]int64) {
p.colSizeMu.Lock()
defer p.colSizeMu.Unlock()
for key, value := range colSizeMap {
p.colSizeMap[key] += value
}
}

// GetColSize returns the size of the column.
func (p *Progress) GetColSize() map[int64]int64 {
p.colSizeMu.Lock()
defer p.colSizeMu.Unlock()
return maps.Clone(p.colSizeMap)
}

// String implements the fmt.Stringer interface.
func (p *Progress) String() string {
bs, _ := json.Marshal(p)
Expand Down
33 changes: 29 additions & 4 deletions executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -123,18 +125,24 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
}

if e.controller.Detached {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalImportInto)
se, err := CreateSession(e.userSctx)
if err != nil {
return err
}
go func() {
defer CloseSession(se)
// todo: there's no need to wait for the import to finish, we can just return here.
// error is stored in system table, so we can ignore it here
//nolint: errcheck
_ = e.doImport(distImporter, task)
_ = e.doImport(ctx, se, distImporter, task)
failpoint.Inject("testDetachedTaskFinished", func() {
TestDetachedTaskFinished.Store(true)
})
}()
return e.fillJobInfo(ctx, jobID, req)
}
if err = e.doImport(distImporter, task); err != nil {
if err = e.doImport(ctx, e.userSctx, distImporter, task); err != nil {
return err
}
return e.fillJobInfo(ctx, jobID, req)
Expand Down Expand Up @@ -180,10 +188,14 @@ func (e *ImportIntoExec) getJobImporter(param *importer.JobImportParam) (*import
return importinto.NewDistImporterCurrNode(param, e.importPlan, e.stmt, e.controller.TotalFileSize)
}

func (e *ImportIntoExec) doImport(distImporter *importinto.DistImporter, task *proto.Task) error {
func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, distImporter *importinto.DistImporter, task *proto.Task) error {
distImporter.ImportTask(task)
group := distImporter.Param().Group
return group.Wait()
err := group.Wait()
if err2 := flushStats(ctx, se, e.importPlan.TableInfo.ID, distImporter.Result()); err2 != nil {
logutil.BgLogger().Error("flush stats failed", zap.Error(err2))
}
return err
}

// ImportIntoActionExec represents a import into action executor.
Expand Down Expand Up @@ -244,3 +256,16 @@ func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, mana
}
return nil
}

// flushStats flushes the stats of the table.
func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, result importer.JobImportResult) error {
if err := sessiontxn.NewTxn(ctx, se); err != nil {
return err
}
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
5 changes: 0 additions & 5 deletions executor/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ func TestImportIntoOptionsNegativeCase(t *testing.T) {
{OptionStr: "checksum_table=false", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "checksum_table=null", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: "analyze_table='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "analyze_table=123", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "analyze_table=false", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "analyze_table=null", Err: exeerrors.ErrInvalidOptionVal},

{OptionStr: "record_errors='aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "record_errors='111aa'", Err: exeerrors.ErrInvalidOptionVal},
{OptionStr: "record_errors=-123", Err: exeerrors.ErrInvalidOptionVal},
Expand Down
1 change: 1 addition & 0 deletions executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func ProcessChunk(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
tableImporter.setLastInsertID(encoder.GetLastInsertID())
return nil
}
14 changes: 1 addition & 13 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ const (
threadOption = "thread"
maxWriteSpeedOption = "max_write_speed"
checksumTableOption = "checksum_table"
analyzeTableOption = "analyze_table"
recordErrorsOption = "record_errors"
detachedOption = "detached"
)
Expand All @@ -107,7 +106,6 @@ var (
threadOption: true,
maxWriteSpeedOption: true,
checksumTableOption: true,
analyzeTableOption: true,
recordErrorsOption: true,
detachedOption: false,
}
Expand Down Expand Up @@ -176,7 +174,6 @@ type Plan struct {

DiskQuota config.ByteSize
Checksum config.PostOpLevel
Analyze config.PostOpLevel
ThreadCnt int64
MaxWriteSpeed config.ByteSize
SplitFile bool
Expand Down Expand Up @@ -465,7 +462,6 @@ func (p *Plan) initDefaultOptions() {
}

p.Checksum = config.OpLevelRequired
p.Analyze = config.OpLevelOptional
p.ThreadCnt = int64(threadCnt)
p.MaxWriteSpeed = unlimitedWriteSpeed
p.SplitFile = false
Expand Down Expand Up @@ -614,15 +610,6 @@ func (p *Plan) initOptions(seCtx sessionctx.Context, options []*plannercore.Load
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[analyzeTableOption]; ok {
v, err := optAsString(opt)
if err != nil {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
if err = p.Analyze.FromStringValue(v); err != nil {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
if opt, ok := specifiedOptions[recordErrorsOption]; ok {
vInt, err := optAsInt64(opt)
if err != nil || vInt < -1 {
Expand Down Expand Up @@ -1117,6 +1104,7 @@ type JobImportResult struct {
LastInsertID uint64
Affected uint64
Warnings []stmtctx.SQLWarn
ColSizeMap map[int64]int64
}

// JobImporter is the interface for importing a job.
Expand Down
3 changes: 0 additions & 3 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestInitDefaultOptions(t *testing.T) {
plan.initDefaultOptions()
require.Equal(t, config.ByteSize(0), plan.DiskQuota)
require.Equal(t, config.OpLevelRequired, plan.Checksum)
require.Equal(t, config.OpLevelOptional, plan.Analyze)
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt)
require.Equal(t, unlimitedWriteSpeed, plan.MaxWriteSpeed)
require.Equal(t, false, plan.SplitFile)
Expand Down Expand Up @@ -83,7 +82,6 @@ func TestInitOptionsPositiveCase(t *testing.T) {
skipRowsOption+"=3, "+
diskQuotaOption+"='100gib', "+
checksumTableOption+"='optional', "+
analyzeTableOption+"='required', "+
threadOption+"=100000, "+
maxWriteSpeedOption+"='200mib', "+
splitFileOption+", "+
Expand All @@ -102,7 +100,6 @@ func TestInitOptionsPositiveCase(t *testing.T) {
require.Equal(t, uint64(3), plan.IgnoreLines, sql)
require.Equal(t, config.ByteSize(100<<30), plan.DiskQuota, sql)
require.Equal(t, config.OpLevelOptional, plan.Checksum, sql)
require.Equal(t, config.OpLevelRequired, plan.Analyze, sql)
require.Equal(t, int64(runtime.NumCPU()), plan.ThreadCnt, sql) // it's adjusted to the number of CPUs
require.Equal(t, config.ByteSize(200<<20), plan.MaxWriteSpeed, sql)
require.True(t, plan.SplitFile, sql)
Expand Down
9 changes: 9 additions & 0 deletions executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type kvEncoder interface {
// GetLastInsertID returns the first auto-generated ID in the current encoder.
// if there's no auto-generated id column or the column value is not auto-generated, it will be 0.
GetLastInsertID() uint64
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -97,6 +99,13 @@ func (en *tableKVEncoder) GetLastInsertID() uint64 {
return en.LastInsertID
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
return sessionVars.TxnCtx.TableDeltaMap[en.Table.Meta().ID].ColSize
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
30 changes: 30 additions & 0 deletions tests/realtikvtest/importintotest/import_into_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -1134,3 +1135,32 @@ func (s *mockGCSSuite) TestDiskQuota() {
strconv.Itoa(lineCount),
))
}

func (s *mockGCSSuite) TestAnalyze() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_data;")
s.tk.MustExec("CREATE DATABASE load_data;")

// test auto analyze
s.tk.MustExec("create table load_data.analyze_table(a int, b int, c int, index idx_ac(a,c), index idx_b(b))")
lineCount := 10000
data := make([]byte, 0, 1<<13)
for i := 0; i < lineCount; i++ {
data = append(data, []byte(fmt.Sprintf("1,%d,1\n", i))...)
}
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "test-load", Name: "analyze-1.tsv"},
Content: data,
})

// without analyze, use idx_ac
s.tk.MustExec("SET GLOBAL tidb_enable_auto_analyze=ON;")
s.tk.MustQuery("EXPLAIN SELECT * FROM load_data.analyze_table WHERE a=1 and b=1 and c=1;").CheckContain("idx_ac(a, c)")

sql := fmt.Sprintf(`IMPORT INTO load_data.analyze_table FROM 'gs://test-load/analyze-1.tsv?endpoint=%s'`, gcsEndpoint)
s.tk.MustQuery(sql)
require.Eventually(s.T(), func() bool {
result := s.tk.MustQuery("EXPLAIN SELECT * FROM load_data.analyze_table WHERE a=1 and b=1 and c=1;")
return strings.Contains(result.Rows()[1][3].(string), "idx_b(b)")
}, 60*time.Second, time.Second)
s.tk.MustQuery("SHOW ANALYZE STATUS;").CheckContain("analyze_table")
}

0 comments on commit 14ca3ce

Please sign in to comment.