diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 345e4928c41..855aa793a83 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -24,19 +24,25 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -// speedStatisticalWindow is the speed calculation window -const speedStatisticalWindow = 10 * time.Minute +const ( + // maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed, + // but it does not mean that all data in it will be used to calculate the speed, + // which data is used depends on the patrol region duration + maxSpeedCalculationWindow = 2 * time.Hour + // minSpeedCalculationWindow is the minimum speed calculation window + minSpeedCalculationWindow = 10 * time.Minute +) // Manager is used to maintain the progresses we care about. type Manager struct { syncutil.RWMutex - progesses map[string]*progressIndicator + progresses map[string]*progressIndicator } // NewManager creates a new Manager. func NewManager() *Manager { return &Manager{ - progesses: make(map[string]*progressIndicator), + progresses: make(map[string]*progressIndicator), } } @@ -46,12 +52,28 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use speedStatisticalWindow / updateInterval to get the windowLengthLimit. - // Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity. + // Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. - windowLengthLimit int - updateInterval time.Duration - lastSpeed float64 + windowCapacity int + // windowLength is used to determine what data will be computed. + // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. + // After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. + // It helps us avoid calculation results jumping change when patrol-region-interval changes. + windowLength int + // front is the first element which should be used. + // currentWindowLength indicates where the front is currently in the queue. + // Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1. + // After update 3 times with 2, 3, 4 separately. + // The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4] + // ^ front + // - - currentWindowLength = len([3,4]) = 2 + // We will always keep the currentWindowLength equal to windowLength if the actual size is enough. + front *list.Element + currentWindowLength int + + updateInterval time.Duration + lastSpeed float64 } // Reset resets the progress manager. @@ -59,55 +81,92 @@ func (m *Manager) Reset() { m.Lock() defer m.Unlock() - m.progesses = make(map[string]*progressIndicator) + m.progresses = make(map[string]*progressIndicator) +} + +// Option is used to do some action for progressIndicator. +type Option func(*progressIndicator) + +// WindowDurationOption changes the time window size. +func WindowDurationOption(dur time.Duration) func(*progressIndicator) { + return func(pi *progressIndicator) { + if dur < minSpeedCalculationWindow { + dur = minSpeedCalculationWindow + } else if dur > maxSpeedCalculationWindow { + dur = maxSpeedCalculationWindow + } + pi.windowLength = int(dur/pi.updateInterval) + 1 + } } // AddProgress adds a progress into manager if it doesn't exist. -func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) { +func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) { m.Lock() defer m.Unlock() history := list.New() history.PushBack(current) - if _, exist = m.progesses[progress]; !exist { - m.progesses[progress] = &progressIndicator{ - total: total, - remaining: total, - history: history, - windowLengthLimit: int(speedStatisticalWindow / updateInterval), - updateInterval: updateInterval, + if _, exist = m.progresses[progress]; !exist { + pi := &progressIndicator{ + total: total, + remaining: total, + history: history, + windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1, + windowLength: int(minSpeedCalculationWindow / updateInterval), + updateInterval: updateInterval, + } + for _, op := range opts { + op(pi) } + m.progresses[progress] = pi + pi.front = history.Front() + pi.currentWindowLength = 1 } return } // UpdateProgress updates the progress if it exists. -func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) { +func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) { m.Lock() defer m.Unlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { + for _, op := range opts { + op(p) + } p.remaining = remaining if p.total < remaining { p.total = remaining } - if p.history.Len() > p.windowLengthLimit { + p.history.PushBack(current) + p.currentWindowLength++ + + // try to move `front` into correct place. + for p.currentWindowLength > p.windowLength { + p.front = p.front.Next() + p.currentWindowLength-- + } + for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { + p.front = p.front.Prev() + p.currentWindowLength++ + } + + for p.history.Len() > p.windowCapacity { p.history.Remove(p.history.Front()) } - p.history.PushBack(current) // It means it just init and we haven't update the progress if p.history.Len() <= 1 { p.lastSpeed = 0 } else if isInc { // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (current - p.front.Value.(float64)) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (p.front.Value.(float64) - current) / + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { p.lastSpeed = 0 @@ -120,7 +179,7 @@ func (m *Manager) UpdateProgressTotal(progress string, total float64) { m.Lock() defer m.Unlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { p.total = total } } @@ -130,8 +189,8 @@ func (m *Manager) RemoveProgress(progress string) (exist bool) { m.Lock() defer m.Unlock() - if _, exist = m.progesses[progress]; exist { - delete(m.progesses, progress) + if _, exist = m.progresses[progress]; exist { + delete(m.progresses, progress) return } return @@ -143,7 +202,7 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string { defer m.RUnlock() processes := []string{} - for p := range m.progesses { + for p := range m.progresses { if filter(p) { processes = append(processes, p) } @@ -156,7 +215,7 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl m.RLock() defer m.RUnlock() - if p, exist := m.progesses[progress]; exist { + if p, exist := m.progresses[progress]; exist { process = 1 - p.remaining/p.total if process < 0 { process = 0 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index e6799fb0ff8..a7b159bc907 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -24,7 +24,6 @@ import ( ) func TestProgress(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -41,15 +40,13 @@ func TestProgress(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) - // 30/(70/1s+) > 30/70 - re.Greater(ls, 30.0/70.0) - // 70/1s+ > 70 - re.Less(cs, 70.0) + re.Less(math.Abs(ls-30.0/7.0), 1e-6) + re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { m.UpdateProgress(n, 30, 30, false) } - re.Equal(61, m.progesses[n].history.Len()) + re.Equal(721, m.progresses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) @@ -70,7 +67,6 @@ func TestProgress(t *testing.T) { } func TestAbnormal(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -95,3 +91,127 @@ func TestAbnormal(t *testing.T) { re.Equal(0.0, ls) re.Equal(0.0, cs) } + +func TestProgressWithDynamicWindow(t *testing.T) { + // The full capacity of queue is 721. + re := require.New(t) + n := "test" + m := NewManager() + re.False(m.AddProgress(n, 100, 100, 10*time.Second)) + p, ls, cs, err := m.Status(n) + re.NoError(err) + re.Equal(0.0, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + time.Sleep(time.Second) + re.True(m.AddProgress(n, 100, 100, 10*time.Second)) + + m.UpdateProgress(n, 31, 31, false) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.69, p) + re.Less(math.Abs(ls-31.0/6.9), 1e-6) + re.Less(math.Abs(cs-6.9), 1e-6) + re.Equal(2, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + + m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20)) + re.Equal(3, m.progresses[n].currentWindowLength) + re.Equal(100.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6) + re.Less(math.Abs(cs-3.5), 1e-6) + + for i := 0; i < 1000; i++ { + m.UpdateProgress(n, 30, 30, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.Equal(721, m.progresses[n].history.Len()) + + for i := 0; i < 60; i++ { + m.UpdateProgress(n, 28, 28, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) + re.Equal(721, m.progresses[n].history.Len()) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12)) + re.Equal(73, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./72)*10.), ls) + re.Equal(float64(29./72/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) + re.Equal(61, m.progresses[n].currentWindowLength) + re.Equal(28.0, m.progresses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(27./60)*10.), ls) + re.Equal(float64(27./60/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) + p, ls, cs, err = m.Status(n) + re.Equal(721, m.progresses[n].currentWindowLength) + re.Equal(30.0, m.progresses[n].front.Value.(float64)) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./720)*10.), ls) + re.Equal(float64(29./720/10.), cs) + for i := 0; i < 2000; i++ { + m.UpdateProgress(n, 1, 1, false) + } + re.Equal(721, m.progresses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + ps := m.GetProgresses(func(p string) bool { + return strings.Contains(p, n) + }) + re.Len(ps, 1) + re.Equal(n, ps[0]) + ps = m.GetProgresses(func(p string) bool { + return strings.Contains(p, "a") + }) + re.Empty(ps) + re.True(m.RemoveProgress(n)) + re.False(m.RemoveProgress(n)) +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 1637c2a8612..660e7347bd2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1430,7 +1430,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if err == nil { regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), @@ -1969,21 +1969,23 @@ func updateTopology(topology map[string]interface{}, sortedLabels []*metapb.Stor func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) { storeLabel := strconv.FormatUint(storeID, 10) - var progress string + var progressName string + var opts []progress.Option switch action { case removingAction: - progress = encodeRemovingProgressKey(storeID) + progressName = encodeRemovingProgressKey(storeID) + opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())} case preparingAction: - progress = encodePreparingProgressKey(storeID) + progressName = encodePreparingProgressKey(storeID) } - if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist { + if exist := c.progressManager.AddProgress(progressName, current, remaining, nodeStateCheckJobInterval, opts...); !exist { return } - c.progressManager.UpdateProgress(progress, current, remaining, isInc) - process, ls, cs, err := c.progressManager.Status(progress) + c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...) + process, ls, cs, err := c.progressManager.Status(progressName) if err != nil { - log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) + log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err)) return } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 157931c9db4..2f8ebcb3a45 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -81,6 +81,8 @@ type coordinator struct { hbStreams *hbstream.HeartbeatStreams pluginInterface *schedule.PluginInterface diagnosticManager *diagnosticManager + + patrolRegionsDuration time.Duration } // newCoordinator creates a new coordinator. @@ -104,6 +106,22 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre } } +// GetPatrolRegionsDuration returns the duration of the last patrol region round. +func (c *coordinator) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 + } + c.RLock() + defer c.RUnlock() + return c.patrolRegionsDuration +} + +func (c *coordinator) setPatrolRegionsDuration(dur time.Duration) { + c.Lock() + defer c.Unlock() + c.patrolRegionsDuration = dur +} + func (c *coordinator) GetWaitingRegions() []*cache.Item { return c.checkers.GetWaitingRegions() } @@ -132,6 +150,7 @@ func (c *coordinator) patrolRegions() { case <-timer.C: timer.Reset(c.cluster.GetOpts().GetPatrolRegionInterval()) case <-c.ctx.Done(): + c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") return } @@ -154,7 +173,9 @@ func (c *coordinator) patrolRegions() { // Updates the label level isolation statistics. c.cluster.updateRegionsLabelLevelStats(regions) if len(key) == 0 { - patrolCheckRegionsGauge.Set(time.Since(start).Seconds()) + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) start = time.Now() } failpoint.Inject("break-patrol", func() {