From 84f90f464ae58c26b7eb97a4518e7de97242730f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 31 Jul 2024 14:43:18 +0800 Subject: [PATCH] *: add some comments to exported function, part of enable revive.exported (#8459) ref tikv/pd#8458 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .golangci.yml | 3 + pkg/core/metrics.go | 77 ++++++++++++++++++++---- pkg/core/region_tree.go | 3 + pkg/core/storelimit/store_limit.go | 1 + pkg/id/id.go | 1 + pkg/ratelimit/metrics.go | 20 +++--- pkg/ratelimit/runner.go | 12 ++-- pkg/schedule/filter/counter.go | 1 + pkg/schedule/filter/filters.go | 35 +++++++++++ pkg/schedule/filter/region_filters.go | 2 + pkg/schedule/operator/builder.go | 1 + pkg/schedule/operator/operator_queue.go | 5 ++ pkg/schedule/schedulers/split_bucket.go | 8 +-- pkg/schedule/splitter/region_splitter.go | 2 + pkg/schedule/type/type.go | 7 ++- pkg/statistics/collector.go | 24 ++++---- pkg/statistics/hot_peer.go | 10 +-- pkg/statistics/hot_peer_cache.go | 7 ++- pkg/statistics/store_collection.go | 11 ++-- pkg/statistics/store_hot_peers_infos.go | 6 +- pkg/statistics/utils/topn.go | 46 +++++++------- pkg/storage/kv/etcd_kv.go | 4 ++ pkg/storage/kv/mem_kv.go | 5 ++ pkg/syncer/client.go | 12 ++-- pkg/syncer/history_buffer.go | 8 +-- pkg/syncer/history_buffer_test.go | 12 ++-- pkg/syncer/server.go | 14 ++--- pkg/window/counter.go | 10 +++ plugin/scheduler_example/evict_leader.go | 14 +++++ 29 files changed, 253 insertions(+), 108 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index d5b2e4e7f5a..bc1ba393f39 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -214,3 +214,6 @@ issues: - path: (pd-analysis|pd-api-bench|pd-backup|pd-ctl|pd-heartbeat-bench|pd-recover|pd-simulator|pd-tso-bench|pd-ut|regions-dump|stores-dump) linters: - errcheck + include: + # remove the comment after the path is ready + # - EXC0012 diff --git a/pkg/core/metrics.go b/pkg/core/metrics.go index 7d2c904f319..65cc8be861e 100644 --- a/pkg/core/metrics.go +++ b/pkg/core/metrics.go @@ -108,19 +108,33 @@ type saveCacheStats struct { // RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat. type RegionHeartbeatProcessTracer interface { + // Begin starts the tracing. Begin() + // OnPreCheckFinished will be called when the pre-check is finished. OnPreCheckFinished() + // OnAsyncHotStatsFinished will be called when the async hot stats is finished. OnAsyncHotStatsFinished() + // OnRegionGuideFinished will be called when the region guide is finished. OnRegionGuideFinished() + // OnSaveCacheBegin will be called when the save cache begins. OnSaveCacheBegin() + // OnSaveCacheFinished will be called when the save cache is finished. OnSaveCacheFinished() + // OnCheckOverlapsFinished will be called when the check overlaps is finished. OnCheckOverlapsFinished() + // OnValidateRegionFinished will be called when the validate region is finished. OnValidateRegionFinished() + // OnSetRegionFinished will be called when the set region is finished. OnSetRegionFinished() + // OnUpdateSubTreeFinished will be called when the update sub tree is finished. OnUpdateSubTreeFinished() + // OnCollectRegionStatsFinished will be called when the collect region stats is finished. OnCollectRegionStatsFinished() + // OnAllStageFinished will be called when all stages are finished. OnAllStageFinished() + // LogFields returns the log fields. LogFields() []zap.Field + // Release releases the tracer. Release() } @@ -131,21 +145,48 @@ func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer { return &noopHeartbeatProcessTracer{} } -func (*noopHeartbeatProcessTracer) Begin() {} -func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {} -func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} -func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {} -func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {} -func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {} -func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} -func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {} -func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {} -func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} +// Begin implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) Begin() {} + +// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnPreCheckFinished() {} + +// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} + +// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnRegionGuideFinished() {} + +// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSaveCacheBegin() {} + +// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSaveCacheFinished() {} + +// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} + +// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnValidateRegionFinished() {} + +// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnSetRegionFinished() {} + +// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} + +// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {} -func (*noopHeartbeatProcessTracer) OnAllStageFinished() {} + +// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface. +func (*noopHeartbeatProcessTracer) OnAllStageFinished() {} + +// LogFields implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) LogFields() []zap.Field { return nil } + +// Release implements the RegionHeartbeatProcessTracer interface. func (*noopHeartbeatProcessTracer) Release() {} type regionHeartbeatProcessTracer struct { @@ -163,12 +204,14 @@ func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer { return tracerPool.Get().(*regionHeartbeatProcessTracer) } +// Begin implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) Begin() { now := time.Now() h.startTime = now h.lastCheckTime = now } +// OnPreCheckFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() { now := time.Now() h.preCheckDuration = now.Sub(h.lastCheckTime) @@ -177,6 +220,7 @@ func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() { preCheckCount.Inc() } +// OnAsyncHotStatsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() { now := time.Now() h.asyncHotStatsDuration = now.Sub(h.lastCheckTime) @@ -185,6 +229,7 @@ func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() { asyncHotStatsCount.Inc() } +// OnRegionGuideFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() { now := time.Now() h.regionGuideDuration = now.Sub(h.lastCheckTime) @@ -193,6 +238,7 @@ func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() { regionGuideCount.Inc() } +// OnSaveCacheBegin implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() { now := time.Now() h.saveCacheStats.startTime = now @@ -200,11 +246,13 @@ func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() { h.lastCheckTime = now } +// OnSaveCacheFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() { // update the outer checkpoint time h.lastCheckTime = time.Now() } +// OnCollectRegionStatsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { now := time.Now() regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds()) @@ -212,6 +260,7 @@ func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { h.lastCheckTime = now } +// OnCheckOverlapsFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { now := time.Now() h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime) @@ -220,6 +269,7 @@ func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { checkOverlapsCount.Inc() } +// OnValidateRegionFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() { now := time.Now() h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -228,6 +278,7 @@ func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() { validateRegionCount.Inc() } +// OnSetRegionFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() { now := time.Now() h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -236,6 +287,7 @@ func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() { setRegionCount.Inc() } +// OnUpdateSubTreeFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() { now := time.Now() h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime) @@ -244,6 +296,7 @@ func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() { updateSubTreeCount.Inc() } +// OnAllStageFinished implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) OnAllStageFinished() { now := time.Now() h.OtherDuration = now.Sub(h.lastCheckTime) @@ -251,6 +304,7 @@ func (h *regionHeartbeatProcessTracer) OnAllStageFinished() { otherCount.Inc() } +// LogFields implements the RegionHeartbeatProcessTracer interface. func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field { return []zap.Field{ zap.Duration("pre-check-duration", h.preCheckDuration), @@ -264,6 +318,7 @@ func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field { } } +// Release implements the RegionHeartbeatProcessTracer interface. // Release puts the tracer back into the pool. func (h *regionHeartbeatProcessTracer) Release() { // Reset the fields of h to their zero values. diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 9a148eeed18..0be207d515d 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -437,6 +437,7 @@ func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { return regions } +// TotalSize returns the total size of all regions. func (t *regionTree) TotalSize() int64 { if t.length() == 0 { return 0 @@ -444,6 +445,8 @@ func (t *regionTree) TotalSize() int64 { return t.totalSize } +// TotalWriteRate returns the total write bytes rate and the total write keys +// rate of all regions. func (t *regionTree) TotalWriteRate() (bytesRate, keysRate float64) { if t.length() == 0 { return 0, 0 diff --git a/pkg/core/storelimit/store_limit.go b/pkg/core/storelimit/store_limit.go index e35ec773d80..fb7cad442bb 100644 --- a/pkg/core/storelimit/store_limit.go +++ b/pkg/core/storelimit/store_limit.go @@ -178,6 +178,7 @@ func (l *limit) Take(count int64) bool { return l.limiter.AllowN(int(count)) } +// GetRatePerSec returns the rate per second. func (l *limit) GetRatePerSec() float64 { l.ratePerSecMutex.RLock() defer l.ratePerSecMutex.RUnlock() diff --git a/pkg/id/id.go b/pkg/id/id.go index d0889717242..ea4a2a54341 100644 --- a/pkg/id/id.go +++ b/pkg/id/id.go @@ -105,6 +105,7 @@ func (alloc *allocatorImpl) Alloc() (uint64, error) { return alloc.base, nil } +// SetBase sets the base. func (alloc *allocatorImpl) SetBase(newBase uint64) error { alloc.mu.Lock() defer alloc.mu.Unlock() diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index c5510e66b26..0096a76de4c 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -24,35 +24,35 @@ const ( ) var ( - RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( + runnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - RunnerPendingTasks = prometheus.NewGaugeVec( + runnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerFailedTasks = prometheus.NewCounterVec( + runnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerSucceededTasks = prometheus.NewCounterVec( + runnerSucceededTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", Name: "runner_success_tasks_total", Help: "The number of tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + runnerTaskExecutionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "pd", Subsystem: "ratelimit", @@ -63,9 +63,9 @@ var ( ) func init() { - prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerPendingTasks) - prometheus.MustRegister(RunnerFailedTasks) - prometheus.MustRegister(RunnerTaskExecutionDuration) - prometheus.MustRegister(RunnerSucceededTasks) + prometheus.MustRegister(runnerTaskMaxWaitingDuration) + prometheus.MustRegister(runnerPendingTasks) + prometheus.MustRegister(runnerFailedTasks) + prometheus.MustRegister(runnerTaskExecutionDuration) + prometheus.MustRegister(runnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index a230177ac73..4b1b51f1768 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -90,7 +90,7 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur pendingTasks: make([]*Task, 0, initialCapacity), pendingTaskCount: make(map[string]int), existTasks: make(map[taskID]*Task), - maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), + maxWaitingDuration: runnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -136,7 +136,7 @@ func (cr *ConcurrentRunner) Start(ctx context.Context) { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } for taskName, cnt := range cr.pendingTaskCount { - RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) + runnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -157,8 +157,8 @@ func (cr *ConcurrentRunner) run(ctx context.Context, task *Task, token *TaskToke cr.limiter.ReleaseToken(token) cr.processPendingTasks() } - RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) - RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() + runnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + runnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { @@ -214,12 +214,12 @@ func (cr *ConcurrentRunner) RunTask(id uint64, name string, f func(context.Conte if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } if pendingTaskNum > maxPendingTaskNum { - RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + runnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } diff --git a/pkg/schedule/filter/counter.go b/pkg/schedule/filter/counter.go index 9742d2d0c9d..41211c1acce 100644 --- a/pkg/schedule/filter/counter.go +++ b/pkg/schedule/filter/counter.go @@ -128,6 +128,7 @@ func NewCounter(scope string) *Counter { return &Counter{counter: counter, scope: scope} } +// SetScope sets the scope for the counter. func (c *Counter) SetScope(scope string) { c.scope = scope } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index 1838f0104f4..6c5dd748d17 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -181,14 +181,17 @@ func NewExcludedFilter(scope string, sources, targets map[uint64]struct{}) Filte } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *excludedFilter) Scope() string { return f.scope } +// Type returns the type of the filter. func (*excludedFilter) Type() filterType { return excluded } +// Source filters stores when select them as schedule source. func (f *excludedFilter) Source(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if _, ok := f.sources[store.GetID()]; ok { return statusStoreAlreadyHasPeer @@ -196,6 +199,7 @@ func (f *excludedFilter) Source(_ config.SharedConfigProvider, store *core.Store return statusOK } +// Target filters stores when select them as schedule target. func (f *excludedFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if _, ok := f.targets[store.GetID()]; ok { return statusStoreAlreadyHasPeer @@ -211,18 +215,22 @@ func NewStorageThresholdFilter(scope string) Filter { return &storageThresholdFilter{scope: scope} } +// Scope returns the scheduler or the checker which the filter acts on. func (f *storageThresholdFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*storageThresholdFilter) Type() filterType { return storageThreshold } +// Source filters stores when select them as schedule source. func (*storageThresholdFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (*storageThresholdFilter) Target(conf config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if !store.IsLowSpace(conf.GetLowSpaceRatio()) { return statusOK @@ -279,18 +287,22 @@ func newDistinctScoreFilter(scope string, labels []string, stores []*core.StoreI } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *distinctScoreFilter) Scope() string { return f.scope } +// Type returns the type of the filter. func (*distinctScoreFilter) Type() filterType { return distinctScore } +// Source filters stores when select them as schedule source. func (*distinctScoreFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *distinctScoreFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { score := core.DistinctScore(f.labels, f.stores, store) switch f.policy { @@ -630,14 +642,17 @@ func newRuleFitFilter(scope string, cluster *core.BasicCluster, ruleManager *pla } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleFitFilter) Type() filterType { return ruleFit } +// Source filters stores when select them as schedule source. func (*ruleFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } @@ -683,18 +698,22 @@ func newRuleLeaderFitFilter(scope string, cluster *core.BasicCluster, ruleManage } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleLeaderFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleLeaderFitFilter) Type() filterType { return ruleLeader } +// Source filters stores when select them as schedule source. func (*ruleLeaderFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *ruleLeaderFitFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { targetStoreID := store.GetID() targetPeer := f.region.GetStorePeer(targetStoreID) @@ -739,18 +758,22 @@ func newRuleWitnessFitFilter(scope string, cluster *core.BasicCluster, ruleManag } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *ruleWitnessFitFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*ruleWitnessFitFilter) Type() filterType { return ruleFit } +// Source filters stores when select them as schedule source. func (*ruleWitnessFitFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *ruleWitnessFitFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { targetStoreID := store.GetID() targetPeer := f.region.GetStorePeer(targetStoreID) @@ -811,14 +834,17 @@ func NewEngineFilter(scope string, constraint placement.LabelConstraint) Filter } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *engineFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*engineFilter) Type() filterType { return engine } +// Source filters stores when select them as schedule source. func (f *engineFilter) Source(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK @@ -826,6 +852,7 @@ func (f *engineFilter) Source(_ config.SharedConfigProvider, store *core.StoreIn return statusStoreNotMatchRule } +// Target filters stores when select them as schedule target. func (f *engineFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if f.constraint.MatchStore(store) { return statusOK @@ -854,14 +881,17 @@ func NewSpecialUseFilter(scope string, allowUses ...string) Filter { } } +// Scope returns the scheduler or the checker which the filter acts on. func (f *specialUseFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*specialUseFilter) Type() filterType { return specialUse } +// Source filters stores when select them as schedule source. func (f *specialUseFilter) Source(conf config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if store.IsLowSpace(conf.GetLowSpaceRatio()) || !f.constraint.MatchStore(store) { return statusOK @@ -869,6 +899,7 @@ func (f *specialUseFilter) Source(conf config.SharedConfigProvider, store *core. return statusStoreNotMatchRule } +// Target filters stores when select them as schedule target. func (f *specialUseFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { if !f.constraint.MatchStore(store) { return statusOK @@ -928,18 +959,22 @@ func NewIsolationFilter(scope, isolationLevel string, locationLabels []string, r return isolationFilter } +// Scope returns the scheduler or the checker which the filter acts on. func (f *isolationFilter) Scope() string { return f.scope } +// Type returns the name of the filter. func (*isolationFilter) Type() filterType { return isolation } +// Source filters stores when select them as schedule source. func (*isolationFilter) Source(config.SharedConfigProvider, *core.StoreInfo) *plan.Status { return statusOK } +// Target filters stores when select them as schedule target. func (f *isolationFilter) Target(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { // No isolation constraint to fit if len(f.constraintSet) == 0 { diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index 7cd015412c2..e233ec75973 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -76,6 +76,7 @@ func NewRegionPendingFilter() RegionFilter { return ®ionPendingFilter{} } +// Select implements the RegionFilter interface. func (*regionPendingFilter) Select(region *core.RegionInfo) *plan.Status { if hasPendingPeers(region) { return statusRegionPendingPeer @@ -91,6 +92,7 @@ func NewRegionDownFilter() RegionFilter { return ®ionDownFilter{} } +// Select implements the RegionFilter interface. func (*regionDownFilter) Select(region *core.RegionInfo) *plan.Status { if hasDownPeers(region) { return statusRegionDownPeer diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index 1852f292db0..e28e7de973a 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -959,6 +959,7 @@ func (p stepPlan) String() string { p.leaderBeforeAdd, p.add, p.promote, p.leaderBeforeRemove, p.demote, p.remove, p.nonWitness, p.promoteNonWitness, p.witness) } +// IsEmpty checks if the plan is empty. func (p stepPlan) IsEmpty() bool { return p.promote == nil && p.demote == nil && p.add == nil && p.remove == nil && p.nonWitness == nil && p.promoteNonWitness == nil && p.witness == nil } diff --git a/pkg/schedule/operator/operator_queue.go b/pkg/schedule/operator/operator_queue.go index 2233845724e..8643717d5ad 100644 --- a/pkg/schedule/operator/operator_queue.go +++ b/pkg/schedule/operator/operator_queue.go @@ -27,21 +27,26 @@ type operatorWithTime struct { type operatorQueue []*operatorWithTime +// Len implements heap.Interface. func (opn operatorQueue) Len() int { return len(opn) } +// Less implements heap.Interface. func (opn operatorQueue) Less(i, j int) bool { return opn[i].time.Before(opn[j].time) } +// Swap implements heap.Interface. func (opn operatorQueue) Swap(i, j int) { opn[i], opn[j] = opn[j], opn[i] } +// Push implements heap.Interface. func (opn *operatorQueue) Push(x any) { item := x.(*operatorWithTime) *opn = append(*opn, item) } +// Pop implements heap.Interface. func (opn *operatorQueue) Pop() any { old := *opn n := len(old) diff --git a/pkg/schedule/schedulers/split_bucket.go b/pkg/schedule/schedulers/split_bucket.go index 4516dfe4433..7df3ee8f552 100644 --- a/pkg/schedule/schedulers/split_bucket.go +++ b/pkg/schedule/schedulers/split_bucket.go @@ -98,12 +98,12 @@ type splitBucketHandler struct { rd *render.Render } -func (h *splitBucketHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (h *splitBucketHandler) listConfig(w http.ResponseWriter, _ *http.Request) { conf := h.conf.Clone() h.rd.JSON(w, http.StatusOK, conf) } -func (h *splitBucketHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { +func (h *splitBucketHandler) updateConfig(w http.ResponseWriter, r *http.Request) { h.conf.Lock() defer h.conf.Unlock() rd := render.New(render.Options{IndentJSON: true}) @@ -148,8 +148,8 @@ func newSplitBucketHandler(conf *splitBucketSchedulerConfig) http.Handler { rd: render.New(render.Options{IndentJSON: true}), } router := mux.NewRouter() - router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) - router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet) + router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost) return router } diff --git a/pkg/schedule/splitter/region_splitter.go b/pkg/schedule/splitter/region_splitter.go index aeab4b70cf0..124ad935655 100644 --- a/pkg/schedule/splitter/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -187,6 +187,7 @@ type splitRegionsHandler struct { oc *operator.Controller } +// SplitRegionByKeys split region by keys. func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKeys [][]byte) error { op, err := operator.CreateSplitRegionOperator("region-splitter", region, 0, pdpb.CheckPolicy_USEKEY, splitKeys) if err != nil { @@ -200,6 +201,7 @@ func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKe return nil } +// ScanRegionsByKeyRange scans regions by key range. func (h *splitRegionsHandler) ScanRegionsByKeyRange(groupKeys *regionGroupKeys, results *splitKeyResults) { splitKeys := groupKeys.keys startKey, endKey := groupKeys.region.GetStartKey(), groupKeys.region.GetEndKey() diff --git a/pkg/schedule/type/type.go b/pkg/schedule/type/type.go index 26e1b6a737a..16910c631fd 100644 --- a/pkg/schedule/type/type.go +++ b/pkg/schedule/type/type.go @@ -14,10 +14,12 @@ package types +// CheckerSchedulerType is the type of checker/scheduler. type CheckerSchedulerType string -func (n CheckerSchedulerType) String() string { - return string(n) +// String implements fmt.Stringer. +func (t CheckerSchedulerType) String() string { + return string(t) } const ( @@ -93,6 +95,7 @@ var SchedulerTypeCompatibleMap = map[CheckerSchedulerType]string{ LabelScheduler: "label", } +// SchedulerStr2Type is a map to convert the scheduler string to the CheckerSchedulerType. var SchedulerStr2Type = map[string]CheckerSchedulerType{ "balance-leader-scheduler": BalanceLeaderScheduler, "balance-region-scheduler": BalanceRegionScheduler, diff --git a/pkg/statistics/collector.go b/pkg/statistics/collector.go index 88986b93d4b..4e3e2fa2c7a 100644 --- a/pkg/statistics/collector.go +++ b/pkg/statistics/collector.go @@ -22,12 +22,12 @@ import ( // storeCollector define the behavior of different engines of stores. type storeCollector interface { - // Engine returns the type of Store. - Engine() string - // Filter determines whether the Store needs to be handled by itself. - Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool - // GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. - GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) + // engine returns the type of Store. + engine() string + // filter determines whether the Store needs to be handled by itself. + filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool + // getLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. + getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) } type tikvCollector struct{} @@ -36,11 +36,11 @@ func newTikvCollector() storeCollector { return tikvCollector{} } -func (tikvCollector) Engine() string { +func (tikvCollector) engine() string { return core.EngineTiKV } -func (tikvCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { +func (tikvCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { if info.IsTiFlash() { return false } @@ -53,7 +53,7 @@ func (tikvCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) return false } -func (tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { +func (tikvCollector) getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { loads = make([]float64, utils.DimLen) switch rwTy { case utils.Read: @@ -87,11 +87,11 @@ func newTiFlashCollector(isTraceRegionFlow bool) storeCollector { return tiflashCollector{isTraceRegionFlow: isTraceRegionFlow} } -func (tiflashCollector) Engine() string { +func (tiflashCollector) engine() string { return core.EngineTiFlash } -func (tiflashCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { +func (tiflashCollector) filter(info *StoreSummaryInfo, kind constant.ResourceKind) bool { switch kind { case constant.LeaderKind: return false @@ -101,7 +101,7 @@ func (tiflashCollector) Filter(info *StoreSummaryInfo, kind constant.ResourceKin return false } -func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { +func (c tiflashCollector) getLoads(storeLoads, peerLoadSum []float64, rwTy utils.RWType, kind constant.ResourceKind) (loads []float64) { loads = make([]float64, utils.DimLen) switch rwTy { case utils.Read: diff --git a/pkg/statistics/hot_peer.go b/pkg/statistics/hot_peer.go index 79757d6e27f..8f92fbff542 100644 --- a/pkg/statistics/hot_peer.go +++ b/pkg/statistics/hot_peer.go @@ -41,7 +41,7 @@ func newDimStat(reportInterval time.Duration) *dimStat { } } -func (d *dimStat) Add(delta float64, interval time.Duration) { +func (d *dimStat) add(delta float64, interval time.Duration) { d.Lock() defer d.Unlock() d.lastIntervalSum += int(interval.Seconds()) @@ -74,13 +74,13 @@ func (d *dimStat) clearLastAverage() { d.lastDelta = 0 } -func (d *dimStat) Get() float64 { +func (d *dimStat) get() float64 { d.RLock() defer d.RUnlock() return d.rolling.Get() } -func (d *dimStat) Clone() *dimStat { +func (d *dimStat) clone() *dimStat { d.RLock() defer d.RUnlock() return &dimStat{ @@ -162,7 +162,7 @@ func (stat *HotPeerStat) GetActionType() utils.ActionType { // GetLoad returns denoising load if possible. func (stat *HotPeerStat) GetLoad(dim int) float64 { if stat.rollingLoads != nil { - return math.Round(stat.rollingLoads[dim].Get()) + return math.Round(stat.rollingLoads[dim].get()) } return math.Round(stat.Loads[dim]) } @@ -172,7 +172,7 @@ func (stat *HotPeerStat) GetLoads() []float64 { if stat.rollingLoads != nil { ret := make([]float64, len(stat.rollingLoads)) for dim := range ret { - ret[dim] = math.Round(stat.rollingLoads[dim].Get()) + ret[dim] = math.Round(stat.rollingLoads[dim].get()) } return ret } diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 4db0c304bb9..8d1f64ca540 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -102,6 +102,7 @@ func (f *HotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } +// UpdateStat updates the stat cache. func (f *HotPeerCache) UpdateStat(item *HotPeerStat) { switch item.actionType { case utils.Remove: @@ -439,7 +440,7 @@ func (f *HotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt if source == utils.Inherit { for _, dim := range oldItem.rollingLoads { - newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone()) + newItem.rollingLoads = append(newItem.rollingLoads, dim.clone()) } newItem.allowInherited = false } else { @@ -462,7 +463,7 @@ func (f *HotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt } for i, k := range regionStats { - newItem.rollingLoads[i].Add(deltaLoads[k], interval) + newItem.rollingLoads[i].add(deltaLoads[k], interval) } isFull := newItem.rollingLoads[0].isFull(f.interval()) // The intervals of dims are the same, so it is only necessary to determine whether any of them @@ -505,7 +506,7 @@ func (f *HotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { ds := newDimStat(f.interval()) - ds.Add(deltaLoads[k], interval) + ds.add(deltaLoads[k], interval) if ds.isFull(f.interval()) { ds.clearLastAverage() } diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 6d5df0bda62..f55c23b27b7 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -61,7 +61,7 @@ func newStoreStatistics(opt config.ConfProvider) *storeStatistics { } } -func (s *storeStatistics) Observe(store *core.StoreInfo) { +func (s *storeStatistics) observe(store *core.StoreInfo) { for _, k := range s.opt.GetLocationLabels() { v := store.GetLabelValue(k) if v == "" { @@ -147,6 +147,7 @@ func (s *storeStatistics) Observe(store *core.StoreInfo) { } } +// ObserveHotStat records the hot region metrics for the store. func ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { // Store flows. storeAddress := store.GetAddress() @@ -178,7 +179,7 @@ func ObserveHotStat(store *core.StoreInfo, stats *StoresStats) { storeStatusGauge.WithLabelValues(storeAddress, id, "store_regions_write_rate_keys_instant").Set(storeFlowStats.GetInstantLoad(utils.StoreRegionsWriteKeys)) } -func (s *storeStatistics) Collect() { +func (s *storeStatistics) collect() { placementStatusGauge.Reset() metrics := make(map[string]float64) @@ -307,12 +308,14 @@ func NewStoreStatisticsMap(opt config.ConfProvider) *storeStatisticsMap { } } +// Observe observes the store. func (m *storeStatisticsMap) Observe(store *core.StoreInfo) { - m.stats.Observe(store) + m.stats.observe(store) } +// Collect collects the metrics. func (m *storeStatisticsMap) Collect() { - m.stats.Collect() + m.stats.collect() } // Reset resets the metrics. diff --git a/pkg/statistics/store_hot_peers_infos.go b/pkg/statistics/store_hot_peers_infos.go index f7873bdd744..f64c7743d16 100644 --- a/pkg/statistics/store_hot_peers_infos.go +++ b/pkg/statistics/store_hot_peers_infos.go @@ -158,7 +158,7 @@ func summaryStoresLoadByEngine( store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] - if !ok || !collector.Filter(info, kind) { + if !ok || !collector.filter(info, kind) { continue } @@ -172,7 +172,7 @@ func summaryStoresLoadByEngine( } hotPeers = append(hotPeers, peer.Clone()) } - currentLoads := collector.GetLoads(storeLoads, peerLoadSum, rwTy, kind) + currentLoads := collector.getLoads(storeLoads, peerLoadSum, rwTy, kind) var historyLoads [][]float64 if storesHistoryLoads != nil { @@ -240,7 +240,7 @@ func summaryStoresLoadByEngine( { // Metric for debug. - engine := collector.Engine() + engine := collector.engine() ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() hotPeerSummary.WithLabelValues(ty, engine).Set(expectLoads[utils.ByteDim]) ty = "exp-key-rate-" + rwTy.String() + "-" + kind.String() diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index 7ab6c6eaf3e..cb97251edd9 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -58,35 +58,35 @@ func NewTopN(k, n int, ttl time.Duration) *TopN { func (tn *TopN) Len() int { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.ttlLst.Len() + return tn.ttlLst.len() } // GetTopNMin returns the min item in top N of the `k`th dimension. func (tn *TopN) GetTopNMin(k int) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[k].GetTopNMin() + return tn.topns[k].getTopNMin() } // GetAllTopN returns the top N items of the `k`th dimension. func (tn *TopN) GetAllTopN(k int) []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[k].GetAllTopN() + return tn.topns[k].getAllTopN() } // GetAll returns all items. func (tn *TopN) GetAll() []TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[0].GetAll() + return tn.topns[0].getAll() } // Get returns the item with given id, nil if there is no such item. func (tn *TopN) Get(id uint64) TopNItem { tn.rw.RLock() defer tn.rw.RUnlock() - return tn.topns[0].Get(id) + return tn.topns[0].get(id) } // Put inserts item or updates the old item if it exists. @@ -94,9 +94,9 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { tn.rw.Lock() defer tn.rw.Unlock() for _, stn := range tn.topns { - isUpdate = stn.Put(item) + isUpdate = stn.put(item) } - tn.ttlLst.Put(item.ID()) + tn.ttlLst.put(item.ID()) tn.maintain() return } @@ -113,17 +113,17 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { tn.rw.Lock() defer tn.rw.Unlock() for _, stn := range tn.topns { - item = stn.Remove(id) + item = stn.remove(id) } - _ = tn.ttlLst.Remove(id) + _ = tn.ttlLst.remove(id) tn.maintain() return } func (tn *TopN) maintain() { - for _, id := range tn.ttlLst.TakeExpired() { + for _, id := range tn.ttlLst.takeExpired() { for _, stn := range tn.topns { - stn.Remove(id) + stn.remove(id) } } } @@ -144,31 +144,27 @@ func newSingleTopN(k, n int) *singleTopN { } } -func (stn *singleTopN) Len() int { - return stn.topn.Len() + stn.rest.Len() -} - -func (stn *singleTopN) GetTopNMin() TopNItem { +func (stn *singleTopN) getTopNMin() TopNItem { return stn.topn.Top() } -func (stn *singleTopN) GetAllTopN() []TopNItem { +func (stn *singleTopN) getAllTopN() []TopNItem { return stn.topn.GetAll() } -func (stn *singleTopN) GetAll() []TopNItem { +func (stn *singleTopN) getAll() []TopNItem { topn := stn.topn.GetAll() return append(topn, stn.rest.GetAll()...) } -func (stn *singleTopN) Get(id uint64) TopNItem { +func (stn *singleTopN) get(id uint64) TopNItem { if item := stn.topn.Get(id); item != nil { return item } return stn.rest.Get(id) } -func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { +func (stn *singleTopN) put(item TopNItem) (isUpdate bool) { if stn.topn.Get(item.ID()) != nil { isUpdate = true stn.topn.Put(item) @@ -179,7 +175,7 @@ func (stn *singleTopN) Put(item TopNItem) (isUpdate bool) { return } -func (stn *singleTopN) Remove(id uint64) TopNItem { +func (stn *singleTopN) remove(id uint64) TopNItem { item := stn.topn.Remove(id) if item == nil { item = stn.rest.Remove(id) @@ -340,11 +336,11 @@ func newTTLList(ttl time.Duration) *ttlList { } } -func (tl *ttlList) Len() int { +func (tl *ttlList) len() int { return tl.lst.Len() } -func (tl *ttlList) TakeExpired() []uint64 { +func (tl *ttlList) takeExpired() []uint64 { expired := []uint64{} now := time.Now() for ele := tl.lst.Front(); ele != nil; ele = tl.lst.Front() { @@ -359,7 +355,7 @@ func (tl *ttlList) TakeExpired() []uint64 { return expired } -func (tl *ttlList) Put(id uint64) (isUpdate bool) { +func (tl *ttlList) put(id uint64) (isUpdate bool) { item := ttlItem{id: id} if ele, ok := tl.index[id]; ok { isUpdate = true @@ -370,7 +366,7 @@ func (tl *ttlList) Put(id uint64) (isUpdate bool) { return } -func (tl *ttlList) Remove(id uint64) (removed bool) { +func (tl *ttlList) remove(id uint64) (removed bool) { if ele, ok := tl.index[id]; ok { _ = tl.lst.Remove(ele) delete(tl.index, id) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 767aeff77a6..e2eb8c979eb 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -55,6 +55,7 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { } } +// NewEtcdKV creates a new etcd kv. func (kv *etcdKVBase) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) @@ -70,6 +71,7 @@ func (kv *etcdKVBase) Load(key string) (string, error) { return string(resp.Kvs[0].Value), nil } +// LoadRange loads a range of keys [key, endKey) from etcd. func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { // Note: reason to use `strings.Join` instead of `path.Join` is that the latter will // removes suffix '/' of the joined string. @@ -99,6 +101,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri return keys, values, nil } +// Save puts a key-value pair to etcd. func (kv *etcdKVBase) Save(key, value string) error { failpoint.Inject("etcdSaveFailed", func() { failpoint.Return(errors.New("save failed")) @@ -117,6 +120,7 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } +// Remove removes the key from etcd. func (kv *etcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index 91d13c04e61..b97a3d6cfa1 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -41,10 +41,12 @@ type memoryKVItem struct { key, value string } +// Less compares two memoryKVItem. func (s *memoryKVItem) Less(than *memoryKVItem) bool { return s.key < than.key } +// Load loads the value for the key. func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() @@ -55,6 +57,7 @@ func (kv *memoryKV) Load(key string) (string, error) { return item.value, nil } +// LoadRange loads the keys in the range of [key, endKey). func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) { failpoint.Inject("withRangeLimit", func(val failpoint.Value) { rangeLimit, ok := val.(int) @@ -77,6 +80,7 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string return keys, values, nil } +// Save saves the key-value pair. func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() @@ -84,6 +88,7 @@ func (kv *memoryKV) Save(key, value string) error { return nil } +// Remove removes the key. func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 00fa8dc389b..a94f5c41f3f 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -69,7 +69,7 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C err = syncStream.Send(&pdpb.SyncRegionRequest{ Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}, Member: s.server.GetMemberInfo(), - StartIndex: s.history.GetNextIndex(), + StartIndex: s.history.getNextIndex(), }) if err != nil { return nil, err @@ -154,7 +154,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) continue } - log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex())) + log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex())) for { resp, err := stream.Recv() if err != nil { @@ -166,14 +166,14 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { time.Sleep(time.Second) break } - if s.history.GetNextIndex() != resp.GetStartIndex() { + if s.history.getNextIndex() != resp.GetStartIndex() { log.Warn("server sync index not match the leader", zap.String("server", s.server.Name()), - zap.Uint64("own", s.history.GetNextIndex()), + zap.Uint64("own", s.history.getNextIndex()), zap.Uint64("leader", resp.GetStartIndex()), zap.Int("records-length", len(resp.GetRegions()))) // reset index - s.history.ResetWithIndex(resp.GetStartIndex()) + s.history.resetWithIndex(resp.GetStartIndex()) } stats := resp.GetRegionStats() regions := resp.GetRegions() @@ -224,7 +224,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { err = regionStorage.SaveRegion(r) } if err == nil { - s.history.Record(region) + s.history.record(region) } for _, old := range overlaps { _ = regionStorage.DeleteRegion(old.GetMeta()) diff --git a/pkg/syncer/history_buffer.go b/pkg/syncer/history_buffer.go index 08fe85cc8c5..7ff6f202ad3 100644 --- a/pkg/syncer/history_buffer.go +++ b/pkg/syncer/history_buffer.go @@ -84,7 +84,7 @@ func (h *historyBuffer) firstIndex() uint64 { return h.index - uint64(h.len()) } -func (h *historyBuffer) Record(r *core.RegionInfo) { +func (h *historyBuffer) record(r *core.RegionInfo) { h.Lock() defer h.Unlock() syncIndexGauge.Set(float64(h.index)) @@ -101,7 +101,7 @@ func (h *historyBuffer) Record(r *core.RegionInfo) { } } -func (h *historyBuffer) RecordsFrom(index uint64) []*core.RegionInfo { +func (h *historyBuffer) recordsFrom(index uint64) []*core.RegionInfo { h.RLock() defer h.RUnlock() var pos int @@ -117,7 +117,7 @@ func (h *historyBuffer) RecordsFrom(index uint64) []*core.RegionInfo { return records } -func (h *historyBuffer) ResetWithIndex(index uint64) { +func (h *historyBuffer) resetWithIndex(index uint64) { h.Lock() defer h.Unlock() h.index = index @@ -126,7 +126,7 @@ func (h *historyBuffer) ResetWithIndex(index uint64) { h.flushCount = defaultFlushCount } -func (h *historyBuffer) GetNextIndex() uint64 { +func (h *historyBuffer) getNextIndex() uint64 { h.RLock() defer h.RUnlock() return h.index diff --git a/pkg/syncer/history_buffer_test.go b/pkg/syncer/history_buffer_test.go index 4bca5b7f603..70a1caf13dc 100644 --- a/pkg/syncer/history_buffer_test.go +++ b/pkg/syncer/history_buffer_test.go @@ -34,7 +34,7 @@ func TestBufferSize(t *testing.T) { h := newHistoryBuffer(1, kv.NewMemoryKV()) re.Equal(0, h.len()) for _, r := range regions { - h.Record(r) + h.record(r) } re.Equal(1, h.len()) re.Equal(regions[h.nextIndex()-1], h.get(100)) @@ -43,7 +43,7 @@ func TestBufferSize(t *testing.T) { // size equals 2 h = newHistoryBuffer(2, kv.NewMemoryKV()) for _, r := range regions { - h.Record(r) + h.record(r) } re.Equal(2, h.len()) re.Equal(regions[h.nextIndex()-1], h.get(100)) @@ -54,7 +54,7 @@ func TestBufferSize(t *testing.T) { kvMem := kv.NewMemoryKV() h1 := newHistoryBuffer(100, kvMem) for i := 0; i < 6; i++ { - h1.Record(regions[i]) + h1.record(regions[i]) } re.Equal(6, h1.len()) re.Equal(uint64(6), h1.nextIndex()) @@ -68,7 +68,7 @@ func TestBufferSize(t *testing.T) { re.Equal(0, h2.len()) for _, r := range regions { index := h2.nextIndex() - h2.Record(r) + h2.record(r) re.Equal(r, h2.get(index)) } @@ -79,9 +79,9 @@ func TestBufferSize(t *testing.T) { // flush in index 106 re.Equal("106", s) - histories := h2.RecordsFrom(uint64(1)) + histories := h2.recordsFrom(uint64(1)) re.Empty(histories) - histories = h2.RecordsFrom(h2.firstIndex()) + histories = h2.recordsFrom(h2.firstIndex()) re.Len(histories, 100) re.Equal(uint64(7), h2.firstIndex()) re.Equal(regions[1:], histories) diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 132b06aec69..2cdc01053f6 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -136,8 +136,8 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor } buckets = append(buckets, bucket) leaders = append(leaders, first.GetLeader()) - startIndex := s.history.GetNextIndex() - s.history.Record(first) + startIndex := s.history.getNextIndex() + s.history.record(first) pending := len(regionNotifier) for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ { region := <-regionNotifier @@ -150,7 +150,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor } buckets = append(buckets, bucket) leaders = append(leaders, region.GetLeader()) - s.history.Record(region) + s.history.record(region) } regions := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, @@ -164,7 +164,7 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, - StartIndex: s.history.GetNextIndex(), + StartIndex: s.history.getNextIndex(), } s.broadcast(alive) } @@ -223,9 +223,9 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) error { startIndex := request.GetStartIndex() name := request.GetMember().GetName() - records := s.history.RecordsFrom(startIndex) + records := s.history.recordsFrom(startIndex) if len(records) == 0 { - if s.history.GetNextIndex() == startIndex { + if s.history.getNextIndex() == startIndex { log.Info("requested server has already in sync with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex)) // still send a response to follower to show the history region sync. @@ -306,7 +306,7 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync log.Info("sync the history regions with server", zap.String("server", name), zap.Uint64("from-index", startIndex), - zap.Uint64("last-index", s.history.GetNextIndex()), + zap.Uint64("last-index", s.history.getNextIndex()), zap.Int("records-length", len(records))) regions := make([]*metapb.Region, len(records)) stats := make([]*pdpb.RegionStat, len(records)) diff --git a/pkg/window/counter.go b/pkg/window/counter.go index 8eaf164b7c0..84325cdc14b 100644 --- a/pkg/window/counter.go +++ b/pkg/window/counter.go @@ -76,34 +76,44 @@ func NewRollingCounter(opts RollingCounterOpts) RollingCounter { } } +// Add adds the given value to the counter. func (r *rollingCounter) Add(val int64) { r.policy.Add(float64(val)) } +// Reduce applies the reduction function to all buckets within the window. func (r *rollingCounter) Reduce(f func(Iterator) float64) float64 { return r.policy.Reduce(f) } +// Avg computes average value within the window. func (r *rollingCounter) Avg() float64 { return r.policy.Reduce(Avg) } +// Min finds the min value within the window. func (r *rollingCounter) Min() float64 { return r.policy.Reduce(Min) } +// Max finds the max value within the window. func (r *rollingCounter) Max() float64 { return r.policy.Reduce(Max) } +// Sum computes sum value within the window. func (r *rollingCounter) Sum() float64 { return r.policy.Reduce(Sum) } +// Value gets the current value. func (r *rollingCounter) Value() int64 { return int64(r.Sum()) } +// Timespan returns passed bucket number since lastAppendTime, +// if it is one bucket duration earlier than the last recorded +// time, it will return the size. func (r *rollingCounter) Timespan() int { r.policy.mu.RLock() defer r.policy.mu.RUnlock() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index a37874a8461..9ad797e0ae4 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -96,6 +96,7 @@ type evictLeaderSchedulerConfig struct { cluster *core.BasicCluster } +// BuildWithArgs builds the config with the args. func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { if len(args) != 1 { return errors.New("should specify the store-id") @@ -115,6 +116,7 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { return nil } +// Clone clones the config. func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { conf.mu.RLock() defer conf.mu.RUnlock() @@ -123,6 +125,7 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { } } +// Persist saves the config. func (conf *evictLeaderSchedulerConfig) Persist() error { name := conf.getScheduleName() conf.mu.RLock() @@ -166,24 +169,29 @@ func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeade } } +// ServeHTTP implements the http.Handler interface. func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) } +// GetName returns the scheduler name. func (*evictLeaderScheduler) GetName() string { return EvictLeaderName } +// GetType returns the scheduler type. func (*evictLeaderScheduler) GetType() string { return EvictLeaderType } +// EncodeConfig serializes the config. func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() return schedulers.EncodeConfig(s.conf) } +// PrepareConfig ensures the scheduler config is valid. func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() @@ -196,6 +204,7 @@ func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro return res } +// CleanConfig is used to clean the scheduler config. func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() @@ -204,6 +213,7 @@ func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { } } +// IsScheduleAllowed checks if the scheduler is allowed to schedule. func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit() if !allowed { @@ -212,6 +222,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) return allowed } +// Schedule schedules the evict leader operator. func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWitRanges)) s.conf.mu.RLock() @@ -246,6 +257,7 @@ type evictLeaderHandler struct { config *evictLeaderSchedulerConfig } +// UpdateConfig updates the config. func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { var input map[string]any if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { @@ -286,11 +298,13 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R handler.rd.JSON(w, http.StatusOK, nil) } +// ListConfig lists the config. func (handler *evictLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } +// DeleteConfig deletes the config. func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.Request) { idStr := mux.Vars(r)["store_id"] id, err := strconv.ParseUint(idStr, 10, 64)