Skip to content

Commit

Permalink
statistics: stop loading too many stats when to init stats (#53999) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 4, 2024
1 parent 3501fea commit c6e52c0
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 34 deletions.
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tiancaiamao_gp//:gp",
Expand Down
105 changes: 71 additions & 34 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -191,7 +193,7 @@ func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache u
}
}

func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk, isCacheFull bool) {
var table *statistics.Table
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tblID, statsVer := row.GetInt64(0), row.GetInt64(8)
Expand Down Expand Up @@ -220,10 +222,17 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.
if idxInfo == nil {
continue
}
cms, topN, err := statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil)
if err != nil {
cms = nil
terror.Log(errors.Trace(err))

var cms *statistics.CMSketch
var topN *statistics.TopN
var err error
if !isCacheFull {
// stats cache is full. we should not put it into cache. but we must set LastAnalyzeVersion
cms, topN, err = statistics.DecodeCMSketchAndTopN(row.GetBytes(6), nil)
if err != nil {
cms = nil
terror.Log(errors.Trace(err))
}
}
hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0)
index := &statistics.Index{
Expand All @@ -236,7 +245,8 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.
PhysicalID: tblID,
}
if statsVer != statistics.Version0 {
index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
// We first set the StatsLoadedStatus as AllEvicted. when completing to load bucket, we will set it as ALlLoad.
index.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
lastAnalyzePos.Copy(&index.LastAnalyzePos)
table.Indices[hist.ID] = index
Expand All @@ -261,6 +271,8 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache util.
Flag: row.GetInt64(10),
StatsVer: statsVer,
}
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
lastAnalyzePos.Copy(&col.LastAnalyzePos)
table.Columns[hist.ID] = col
}
Expand Down Expand Up @@ -311,12 +323,12 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
h.initStatsHistograms4Chunk(is, cache, iter, false)
}
return nil
}

func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache util.StatsCache, task initstats.Task) error {
func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache util.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.SPool().Get()
if err != nil {
return err
Expand All @@ -326,6 +338,7 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache uti
h.SPool().Put(se)
}
}()

sctx := se.(sessionctx.Context)
// Why do we need to add `is_index=1` in the SQL?
// because it is aligned to the `initStatsTopN` function, which only loads the topn of the index too.
Expand All @@ -347,16 +360,16 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache uti
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
h.initStatsHistograms4Chunk(is, cache, iter, isFullCache(cache, totalMemory))
}
return nil
}

func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache util.StatsCache) error {
func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache util.StatsCache, totalMemory uint64) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
return h.initStatsHistogramsByPaging(is, cache, task, totalMemory)
})
ls.LoadStats()
for tid <= maxTid {
Expand All @@ -370,7 +383,10 @@ func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache
return nil
}

func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk, totalMemory uint64) {
if isFullCache(cache, totalMemory) {
return
}
affectedIndexes := make(map[*statistics.Index]struct{})
var table *statistics.Table
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand Down Expand Up @@ -406,7 +422,7 @@ func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4C
}
}

func (h *Handle) initStatsTopN(cache util.StatsCache) error {
func (h *Handle) initStatsTopN(cache util.StatsCache, totalMemory uint64) error {
sql := "select /*+ ORDER_INDEX(mysql.stats_top_n,tbl)*/ HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 order by table_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
Expand All @@ -424,12 +440,12 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error {
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
h.initStatsTopN4Chunk(cache, iter, totalMemory)
}
return nil
}

func (h *Handle) initStatsTopNByPaging(cache util.StatsCache, task initstats.Task) error {
func (h *Handle) initStatsTopNByPaging(cache util.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.SPool().Get()
if err != nil {
return err
Expand Down Expand Up @@ -457,19 +473,25 @@ func (h *Handle) initStatsTopNByPaging(cache util.StatsCache, task initstats.Tas
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
h.initStatsTopN4Chunk(cache, iter, totalMemory)
}
return nil
}

func (h *Handle) initStatsTopNConcurrency(cache util.StatsCache) error {
func (h *Handle) initStatsTopNConcurrency(cache util.StatsCache, totalMemory uint64) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
if isFullCache(cache, totalMemory) {
return nil
}
return h.initStatsTopNByPaging(cache, task, totalMemory)
})
ls.LoadStats()
for tid <= maxTid {
if isFullCache(cache, totalMemory) {
break
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + initStatsStep,
Expand Down Expand Up @@ -536,6 +558,9 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
if table == nil || table.PhysicalID != tableID {
if table != nil {
for _, index := range table.Indices {
index.StatsLoadedStatus = statistics.NewStatsFullLoadStatus()
}
cache.Put(table.PhysicalID, table) // put this table in the cache because all statstics of the table have been read.
}
var ok bool
Expand Down Expand Up @@ -591,9 +616,12 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato
}
}

func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
func (h *Handle) initStatsBuckets(cache util.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err := h.initStatsBucketsConcurrency(cache)
err := h.initStatsBucketsConcurrency(cache, totalMemory)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -670,10 +698,16 @@ func (h *Handle) initStatsBucketsByPaging(cache util.StatsCache, task initstats.
return nil
}

func (h *Handle) initStatsBucketsConcurrency(cache util.StatsCache) error {
func (h *Handle) initStatsBucketsConcurrency(cache util.StatsCache, totalMemory uint64) error {
if isFullCache(cache, totalMemory) {
return nil
}
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
if isFullCache(cache, totalMemory) {
return nil
}
return h.initStatsBucketsByPaging(cache, task)
})
ls.LoadStats()
Expand All @@ -683,6 +717,9 @@ func (h *Handle) initStatsBucketsConcurrency(cache util.StatsCache) error {
EndTid: tid + initStatsStep,
})
tid += initStatsStep
if isFullCache(cache, totalMemory) {
break
}
}
ls.Wait()
return nil
Expand Down Expand Up @@ -718,6 +755,10 @@ func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
// Index/PK stats are fully loaded.
// Column stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN.
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
totalMemory, err := memory.MemTotal()
if err != nil {
return err
}
loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch
defer func() {
_, err1 := util.Exec(h.initStatsCtx, "commit")
Expand All @@ -735,17 +776,17 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
return errors.Trace(err)
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
err = h.initStatsHistogramsConcurrency(is, cache, totalMemory)
} else {
err = h.initStatsHistograms(is, cache)
}
if err != nil {
return errors.Trace(err)
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache)
err = h.initStatsTopNConcurrency(cache, totalMemory)
} else {
err = h.initStatsTopN(cache)
err = h.initStatsTopN(cache, totalMemory)
}
if err != nil {
return err
Expand All @@ -756,19 +797,15 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
return err
}
}
err = h.initStatsBuckets(cache)
err = h.initStatsBuckets(cache, totalMemory)
if err != nil {
return errors.Trace(err)
}
// Set columns' stats status.
for _, table := range cache.Values() {
for _, col := range table.Columns {
if col.StatsAvailable() {
// primary key column has no stats info, because primary key's is_index is false. so it cannot load the topn
col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus()
}
}
}
h.Replace(cache)
return nil
}

func isFullCache(cache util.StatsCache, total uint64) bool {
memQuota := variable.StatsCacheMemQuota.Load()
return (uint64(cache.MemConsumed()) >= total/4) || (cache.MemConsumed() >= memQuota && memQuota != 0)
}

0 comments on commit c6e52c0

Please sign in to comment.