From 2974a3d2785753e1543fa43c7967b592cff6af3a Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 16 Jan 2024 23:59:20 +0800 Subject: [PATCH 01/16] dynamic progress time window Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 38 ++++++++++++--- pkg/progress/progress_test.go | 87 ++++++++++++++++++++++++++++++++++- pkg/schedule/coordinator.go | 20 +++++++- server/cluster/cluster.go | 31 +++++++------ 4 files changed, 152 insertions(+), 24 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 345e4928c41..beece5c60bf 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -24,8 +24,12 @@ import ( "github.com/tikv/pd/pkg/utils/syncutil" ) -// speedStatisticalWindow is the speed calculation window -const speedStatisticalWindow = 10 * time.Minute +const ( + // minSpeedStatisticalWindow is the minimum speed calculation window + minSpeedStatisticalWindow = 10 * time.Minute + // maxSpeedStatisticalWindow is the maximum speed calculation window + maxSpeedStatisticalWindow = 2 * time.Hour +) // Manager is used to maintain the progresses we care about. type Manager struct { @@ -62,37 +66,57 @@ func (m *Manager) Reset() { m.progesses = make(map[string]*progressIndicator) } +type Option func(*progressIndicator) + +func WindowDurationOption(dur time.Duration) func(*progressIndicator) { + return func(pi *progressIndicator) { + if dur < minSpeedStatisticalWindow { + dur = minSpeedStatisticalWindow + } else if dur > maxSpeedStatisticalWindow { + dur = maxSpeedStatisticalWindow + } + pi.windowLengthLimit = int(dur / pi.updateInterval) + } +} + // AddProgress adds a progress into manager if it doesn't exist. -func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) { +func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) { m.Lock() defer m.Unlock() history := list.New() history.PushBack(current) if _, exist = m.progesses[progress]; !exist { - m.progesses[progress] = &progressIndicator{ + pi := &progressIndicator{ total: total, remaining: total, history: history, - windowLengthLimit: int(speedStatisticalWindow / updateInterval), + windowLengthLimit: int(minSpeedStatisticalWindow / updateInterval), updateInterval: updateInterval, } + for _, op := range opts { + op(pi) + } + m.progesses[progress] = pi } return } // UpdateProgress updates the progress if it exists. -func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) { +func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) { m.Lock() defer m.Unlock() if p, exist := m.progesses[progress]; exist { + for _, op := range opts { + op(p) + } p.remaining = remaining if p.total < remaining { p.total = remaining } - if p.history.Len() > p.windowLengthLimit { + for p.history.Len() > p.windowLengthLimit { p.history.Remove(p.history.Front()) } p.history.PushBack(current) diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index e6799fb0ff8..5cf9b121da6 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -44,7 +44,7 @@ func TestProgress(t *testing.T) { // 30/(70/1s+) > 30/70 re.Greater(ls, 30.0/70.0) // 70/1s+ > 70 - re.Less(cs, 70.0) + re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling for i := 0; i < 100; i++ { m.UpdateProgress(n, 30, 30, false) @@ -95,3 +95,88 @@ func TestAbnormal(t *testing.T) { re.Equal(0.0, ls) re.Equal(0.0, cs) } + +func TestProgressWithDynamicWindow(t *testing.T) { + t.Parallel() + re := require.New(t) + n := "test" + m := NewManager() + re.False(m.AddProgress(n, 100, 100, 10*time.Second)) + p, ls, cs, err := m.Status(n) + re.NoError(err) + re.Equal(0.0, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + time.Sleep(time.Second) + re.True(m.AddProgress(n, 100, 100, 10*time.Second)) + + m.UpdateProgress(n, 30, 30, false) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + // 30/(70/1s+) > 30/70 + re.Greater(ls, 30.0/70.0) + // 70/1s+ > 70 + re.Less(math.Abs(cs-7), 1e-6) + // there is no scheduling + for i := 0; i < 100; i++ { + m.UpdateProgress(n, 30, 30, false) + } + re.Equal(61, m.progesses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) + re.Equal(62, m.progesses[n].history.Len()) + for i := 0; i < 60; i++ { + m.UpdateProgress(n, 28, 28, false) + } + re.Equal(121, m.progesses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) + re.Equal(61, m.progesses[n].history.Len()) + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*10)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(27./60)*10.), ls) + re.Equal(float64(27./60/10.), cs) + + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) + re.Equal(61, m.progesses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(27./60)*10.), ls) + re.Equal(float64(27./60/10.), cs) + + re.Equal(61, m.progesses[n].history.Len()) + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) + for i := 0; i < 2000; i++ { + m.UpdateProgress(n, 1, 1, false) + } + re.Equal(721, m.progesses[n].history.Len()) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + ps := m.GetProgresses(func(p string) bool { + return strings.Contains(p, n) + }) + re.Len(ps, 1) + re.Equal(n, ps[0]) + ps = m.GetProgresses(func(p string) bool { + return strings.Contains(p, "a") + }) + re.Empty(ps) + re.True(m.RemoveProgress(n)) + re.False(m.RemoveProgress(n)) +} diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index c7c77cabf3f..305f0c28712 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -73,7 +73,8 @@ type Coordinator struct { ctx context.Context cancel context.CancelFunc - schedulersInitialized bool + schedulersInitialized bool + patrolCheckRegionsDuration time.Duration cluster sche.ClusterInformer prepareChecker *prepareChecker @@ -110,6 +111,18 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS } } +func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { + c.RLock() + defer c.RUnlock() + return c.patrolCheckRegionsDuration +} + +func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { + c.RLock() + defer c.RUnlock() + c.patrolCheckRegionsDuration = dur +} + // markSchedulersInitialized marks the scheduler initialization is finished. func (c *Coordinator) markSchedulersInitialized() { c.Lock() @@ -157,6 +170,7 @@ func (c *Coordinator) PatrolRegions() { ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval()) case <-c.ctx.Done(): patrolCheckRegionsGauge.Set(0) + c.setPatrolRegionsDuration(0) log.Info("patrol regions has been stopped") return } @@ -178,7 +192,9 @@ func (c *Coordinator) PatrolRegions() { // Updates the label level isolation statistics. c.cluster.UpdateRegionsLabelLevelStats(regions) if len(key) == 0 { - patrolCheckRegionsGauge.Set(time.Since(start).Seconds()) + dur := time.Since(start) + patrolCheckRegionsGauge.Set(dur.Seconds()) + c.setPatrolRegionsDuration(dur) start = time.Now() } failpoint.Inject("break-patrol", func() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 20a4a7f0bfc..6abcdc4117b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -139,12 +139,13 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isAPIServiceMode bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS uint64 - externalTS uint64 + running bool + isAPIServiceMode bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS uint64 + externalTS uint64 + lastPatrolRegionsDuration time.Duration // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -1409,7 +1410,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if err == nil { regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), @@ -1935,21 +1936,23 @@ func updateTopology(topology map[string]interface{}, sortedLabels []*metapb.Stor func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) { storeLabel := strconv.FormatUint(storeID, 10) - var progress string + var progressName string + var opts []progress.Option switch action { case removingAction: - progress = encodeRemovingProgressKey(storeID) + progressName = encodeRemovingProgressKey(storeID) + opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())} case preparingAction: - progress = encodePreparingProgressKey(storeID) + progressName = encodePreparingProgressKey(storeID) } - if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist { + if exist := c.progressManager.AddProgress(progressName, current, remaining, nodeStateCheckJobInterval, opts...); !exist { return } - c.progressManager.UpdateProgress(progress, current, remaining, isInc) - process, ls, cs, err := c.progressManager.Status(progress) + c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...) + process, ls, cs, err := c.progressManager.Status(progressName) if err != nil { - log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err)) + log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err)) return } storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process) From 1740480de0e3fc903c43882347144d192115d18a Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 17 Jan 2024 15:54:45 +0800 Subject: [PATCH 02/16] dynamic progress time window Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 2 ++ pkg/schedule/coordinator.go | 1 + server/cluster/cluster.go | 13 ++++++------- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index beece5c60bf..082325b56b6 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -66,8 +66,10 @@ func (m *Manager) Reset() { m.progesses = make(map[string]*progressIndicator) } +// Option is used to do some action for progressIndicator. type Option func(*progressIndicator) +// WindowDurationOption changes the time window size. func WindowDurationOption(dur time.Duration) func(*progressIndicator) { return func(pi *progressIndicator) { if dur < minSpeedStatisticalWindow { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 305f0c28712..82a4ab8504f 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -111,6 +111,7 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS } } +// GetPatrolRegionsDuration returns the duration of the last patrol region round. func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { c.RLock() defer c.RUnlock() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 6abcdc4117b..763de934cce 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -139,13 +139,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isAPIServiceMode bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS uint64 - externalTS uint64 - lastPatrolRegionsDuration time.Duration + running bool + isAPIServiceMode bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS uint64 + externalTS uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 From 0fb44f9a3986f76bd2dd812231062d89c567bc3e Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 17 Jan 2024 16:05:22 +0800 Subject: [PATCH 03/16] dynamic progress time window Signed-off-by: Cabinfever_B --- pkg/schedule/coordinator.go | 3 +++ server/cluster/cluster.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 82a4ab8504f..b3617162951 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -113,6 +113,9 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS // GetPatrolRegionsDuration returns the duration of the last patrol region round. func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { + if c == nil { + return 0 + } c.RLock() defer c.RUnlock() return c.patrolCheckRegionsDuration diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 763de934cce..aad09a25c79 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1409,7 +1409,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro if err == nil { regionSize := float64(c.core.GetStoreRegionSize(storeID)) c.resetProgress(storeID, store.GetAddress()) - c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())) + c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration())) // record the current store limit in memory c.prevStoreLimit[storeID] = map[storelimit.Type]float64{ storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer), From d572853f16acd9101e63b8e982ef38faa0d9a934 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 17 Jan 2024 17:23:36 +0800 Subject: [PATCH 04/16] address comment Signed-off-by: Cabinfever_B --- pkg/statistics/region_collection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 21af8e152fd..b7ec51c936e 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -130,8 +130,8 @@ func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core // IsRegionStatsType returns whether the status of the region is the given type. func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisticType) bool { - r.RLock() - defer r.RUnlock() + r.Lock() + defer r.Unlock() _, exist := r.stats[typ][regionID] return exist } From 32b7aa663fb91188507668a7db4126ffe0ec8a26 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 22 Jan 2024 09:45:18 +0800 Subject: [PATCH 05/16] address comment Signed-off-by: Cabinfever_B --- pkg/schedule/coordinator.go | 4 ++-- pkg/statistics/region_collection.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index b3617162951..ecfac22a277 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -122,8 +122,8 @@ func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { } func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { - c.RLock() - defer c.RUnlock() + c.Lock() + defer c.Unlock() c.patrolCheckRegionsDuration = dur } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index b7ec51c936e..21af8e152fd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -130,8 +130,8 @@ func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core // IsRegionStatsType returns whether the status of the region is the given type. func (r *RegionStatistics) IsRegionStatsType(regionID uint64, typ RegionStatisticType) bool { - r.Lock() - defer r.Unlock() + r.RLock() + defer r.RUnlock() _, exist := r.stats[typ][regionID] return exist } From 1a49ec9614cf41ddd79641cbc55c9feab6e2cdef Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 1 Feb 2024 17:30:57 +0800 Subject: [PATCH 06/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 73 +++++++++++++++++++++++------------ pkg/progress/progress_test.go | 67 +++++++++++++++++++++++++------- 2 files changed, 100 insertions(+), 40 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 082325b56b6..49dbc3d9c4f 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -25,10 +25,12 @@ import ( ) const ( - // minSpeedStatisticalWindow is the minimum speed calculation window - minSpeedStatisticalWindow = 10 * time.Minute - // maxSpeedStatisticalWindow is the maximum speed calculation window - maxSpeedStatisticalWindow = 2 * time.Hour + // speedStatisticalWindowCapacity is the size of the time window used to calculate the speed, + // but it does not mean that all data in it will be used to calculate the speed, + // which data is used depends on the patrol region duration + speedStatisticalWindowCapacity = 2 * time.Hour + // minSpeedCalculationWindow is the minimum speed calculation window + minSpeedCalculationWindow = 10 * time.Minute ) // Manager is used to maintain the progresses we care about. @@ -50,12 +52,19 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use speedStatisticalWindow / updateInterval to get the windowLengthLimit. - // Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // We use speedStatisticalWindowCapacity / updateInterval to get the windowCapacity. + // Assume that the windowCapacity is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. - windowLengthLimit int - updateInterval time.Duration - lastSpeed float64 + windowCapacity int + // windowLength is used to determine what data will be computed. + windowLength int + // front is the first element which should be used. + front *list.Element + // position indicates where the front is currently in the queue + position int + + updateInterval time.Duration + lastSpeed float64 } // Reset resets the progress manager. @@ -72,12 +81,12 @@ type Option func(*progressIndicator) // WindowDurationOption changes the time window size. func WindowDurationOption(dur time.Duration) func(*progressIndicator) { return func(pi *progressIndicator) { - if dur < minSpeedStatisticalWindow { - dur = minSpeedStatisticalWindow - } else if dur > maxSpeedStatisticalWindow { - dur = maxSpeedStatisticalWindow + if dur < minSpeedCalculationWindow { + dur = minSpeedCalculationWindow + } else if dur > speedStatisticalWindowCapacity { + dur = speedStatisticalWindowCapacity } - pi.windowLengthLimit = int(dur / pi.updateInterval) + pi.windowLength = int(dur/pi.updateInterval) + 1 } } @@ -90,16 +99,19 @@ func (m *Manager) AddProgress(progress string, current, total float64, updateInt history.PushBack(current) if _, exist = m.progesses[progress]; !exist { pi := &progressIndicator{ - total: total, - remaining: total, - history: history, - windowLengthLimit: int(minSpeedStatisticalWindow / updateInterval), - updateInterval: updateInterval, + total: total, + remaining: total, + history: history, + windowCapacity: int(speedStatisticalWindowCapacity/updateInterval) + 1, + windowLength: int(minSpeedCalculationWindow / updateInterval), + updateInterval: updateInterval, } for _, op := range opts { op(pi) } m.progesses[progress] = pi + pi.front = history.Front() + pi.position = 1 } return } @@ -118,22 +130,33 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is p.total = remaining } - for p.history.Len() > p.windowLengthLimit { + p.history.PushBack(current) + p.position++ + + for p.position > p.windowLength { + p.front = p.front.Next() + p.position-- + } + for p.position < p.windowLength && p.front.Prev() != nil { + p.front = p.front.Prev() + p.position++ + } + + for p.history.Len() > p.windowCapacity { p.history.Remove(p.history.Front()) } - p.history.PushBack(current) // It means it just init and we haven't update the progress if p.history.Len() <= 1 { p.lastSpeed = 0 } else if isInc { // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (p.history.Back().Value.(float64) - p.front.Value.(float64)) / + (float64(p.position-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) / - (float64(p.history.Len()-1) * p.updateInterval.Seconds()) + p.lastSpeed = (p.front.Value.(float64) - p.history.Back().Value.(float64)) / + (float64(p.position-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { p.lastSpeed = 0 diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 5cf9b121da6..c908a418081 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -46,10 +46,10 @@ func TestProgress(t *testing.T) { // 70/1s+ > 70 re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling - for i := 0; i < 100; i++ { + for i := 0; i < 1000; i++ { m.UpdateProgress(n, 30, 30, false) } - re.Equal(61, m.progesses[n].history.Len()) + re.Equal(721, m.progesses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) @@ -110,37 +110,69 @@ func TestProgressWithDynamicWindow(t *testing.T) { time.Sleep(time.Second) re.True(m.AddProgress(n, 100, 100, 10*time.Second)) - m.UpdateProgress(n, 30, 30, false) + m.UpdateProgress(n, 31, 31, false) p, ls, cs, err = m.Status(n) re.NoError(err) - re.Equal(0.7, p) - // 30/(70/1s+) > 30/70 - re.Greater(ls, 30.0/70.0) + re.Equal(0.69, p) + // 31/(69/1s+) > 31/69 + re.Greater(ls, 31.0/69.0) // 70/1s+ > 70 - re.Less(math.Abs(cs-7), 1e-6) - // there is no scheduling - for i := 0; i < 100; i++ { + re.Less(math.Abs(cs-6.9), 1e-6) + re.Equal(2, m.progesses[n].position) + re.Equal(100.0, m.progesses[n].front.Value.(float64)) + + m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20)) + re.Equal(3, m.progesses[n].position) + re.Equal(100.0, m.progesses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.7, p) + re.Greater(ls, 30.0/(70.0/2)) + re.Less(math.Abs(cs-3.5), 1e-6) + + for i := 0; i < 1000; i++ { m.UpdateProgress(n, 30, 30, false) } - re.Equal(61, m.progesses[n].history.Len()) + re.Equal(721, m.progesses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) re.Equal(math.MaxFloat64, ls) re.Equal(0.0, cs) m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) - re.Equal(62, m.progesses[n].history.Len()) + re.Equal(121, m.progesses[n].position) + re.Equal(30.0, m.progesses[n].front.Value.(float64)) + re.Equal(721, m.progesses[n].history.Len()) + for i := 0; i < 60; i++ { m.UpdateProgress(n, 28, 28, false) } - re.Equal(121, m.progesses[n].history.Len()) + re.Equal(721, m.progesses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.72, p) re.Equal(float64(28/(2./120)*10.), ls) re.Equal(float64(2./120/10.), cs) + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) - re.Equal(61, m.progesses[n].history.Len()) + re.Equal(721, m.progesses[n].history.Len()) + re.Equal(61, m.progesses[n].position) + re.Equal(28.0, m.progesses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(math.MaxFloat64, ls) + re.Equal(0.0, cs) + + m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20)) + re.Equal(121, m.progesses[n].position) + re.Equal(30.0, m.progesses[n].front.Value.(float64)) + p, ls, cs, err = m.Status(n) + re.NoError(err) + re.Equal(0.72, p) + re.Equal(float64(28/(2./120)*10.), ls) + re.Equal(float64(2./120/10.), cs) + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*10)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -149,15 +181,20 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(float64(27./60/10.), cs) m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) - re.Equal(61, m.progesses[n].history.Len()) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.99, p) re.Equal(float64(1/(27./60)*10.), ls) re.Equal(float64(27./60/10.), cs) - re.Equal(61, m.progesses[n].history.Len()) m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) + p, ls, cs, err = m.Status(n) + re.Equal(721, m.progesses[n].position) + re.Equal(30.0, m.progesses[n].front.Value.(float64)) + re.NoError(err) + re.Equal(0.99, p) + re.Equal(float64(1/(29./720)*10.), ls) + re.Equal(float64(29./720/10.), cs) for i := 0; i < 2000; i++ { m.UpdateProgress(n, 1, 1, false) } From e2e3641da9b378cd69223a99224644743442c2b6 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 2 Feb 2024 14:32:01 +0800 Subject: [PATCH 07/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 10 +++++++--- pkg/progress/progress_test.go | 11 ++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 49dbc3d9c4f..6caa7a76219 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -52,15 +52,18 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use speedStatisticalWindowCapacity / updateInterval to get the windowCapacity. - // Assume that the windowCapacity is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. + // We use (speedStatisticalWindowCapacity / updateInterval + 1) to get the windowCapacity. + // Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. windowCapacity int // windowLength is used to determine what data will be computed. + // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. + // It helps us avoid calculation results jumping change when patrol-region-duration changes. windowLength int // front is the first element which should be used. - front *list.Element // position indicates where the front is currently in the queue + // Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. After update 3 times with 2, 3, 4 separately. The front is [3], and the position is 2. + front *list.Element position int updateInterval time.Duration @@ -133,6 +136,7 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is p.history.PushBack(current) p.position++ + // try to move `front` into correct place. for p.position > p.windowLength { p.front = p.front.Next() p.position-- diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index c908a418081..f3acd128d74 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -97,6 +97,7 @@ func TestAbnormal(t *testing.T) { } func TestProgressWithDynamicWindow(t *testing.T) { + // The full capacity of queue is 721. t.Parallel() re := require.New(t) n := "test" @@ -173,14 +174,18 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(float64(28/(2./120)*10.), ls) re.Equal(float64(2./120/10.), cs) - m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*10)) + m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12)) + re.Equal(73, m.progesses[n].position) + re.Equal(30.0, m.progesses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.99, p) - re.Equal(float64(1/(27./60)*10.), ls) - re.Equal(float64(27./60/10.), cs) + re.Equal(float64(1/(29./72)*10.), ls) + re.Equal(float64(29./72/10.), cs) m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) + re.Equal(61, m.progesses[n].position) + re.Equal(28.0, m.progesses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.99, p) From fa816f5c5b0c4afd273fdc4c956da8cb68d72412 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 28 Mar 2024 14:03:45 +0800 Subject: [PATCH 08/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 9306e172d83..b13d4524a26 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -25,10 +25,10 @@ import ( ) const ( - // speedStatisticalWindowCapacity is the size of the time window used to calculate the speed, + // maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed, // but it does not mean that all data in it will be used to calculate the speed, // which data is used depends on the patrol region duration - speedStatisticalWindowCapacity = 2 * time.Hour + maxSpeedCalculationWindow = 2 * time.Hour // minSpeedCalculationWindow is the minimum speed calculation window minSpeedCalculationWindow = 10 * time.Minute ) @@ -86,8 +86,8 @@ func WindowDurationOption(dur time.Duration) func(*progressIndicator) { return func(pi *progressIndicator) { if dur < minSpeedCalculationWindow { dur = minSpeedCalculationWindow - } else if dur > speedStatisticalWindowCapacity { - dur = speedStatisticalWindowCapacity + } else if dur > maxSpeedCalculationWindow { + dur = maxSpeedCalculationWindow } pi.windowLength = int(dur/pi.updateInterval) + 1 } @@ -105,7 +105,7 @@ func (m *Manager) AddProgress(progress string, current, total float64, updateInt total: total, remaining: total, history: history, - windowCapacity: int(speedStatisticalWindowCapacity/updateInterval) + 1, + windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1, windowLength: int(minSpeedCalculationWindow / updateInterval), updateInterval: updateInterval, } From d83b3fe42917bf1a536769ec0ff33c1da4f04fc7 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 28 Mar 2024 16:52:50 +0800 Subject: [PATCH 09/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index b13d4524a26..d3231a8ce0c 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -61,8 +61,13 @@ type progressIndicator struct { // It helps us avoid calculation results jumping change when patrol-region-duration changes. windowLength int // front is the first element which should be used. - // position indicates where the front is currently in the queue - // Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. After update 3 times with 2, 3, 4 separately. The front is [3], and the position is 2. + // position indicates where the front is currently in the queue. + // Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. + // After update 3 times with 2, 3, 4 separately. + // The front is [3], the position is 2, and values in queue are [(1,2),3,4] + // ^ front + // - - position = len([3,4]) = 2 + // We will always keep the position equal to windowLength if the actual size is enough. front *list.Element position int From 562b30df5aefb8a7771cd9ccabab6320ae545591 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 16:07:50 +0800 Subject: [PATCH 10/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index d3231a8ce0c..f70400eb9ce 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -61,15 +61,15 @@ type progressIndicator struct { // It helps us avoid calculation results jumping change when patrol-region-duration changes. windowLength int // front is the first element which should be used. - // position indicates where the front is currently in the queue. + // currentWindowLength indicates where the front is currently in the queue. // Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. // After update 3 times with 2, 3, 4 separately. // The front is [3], the position is 2, and values in queue are [(1,2),3,4] // ^ front // - - position = len([3,4]) = 2 // We will always keep the position equal to windowLength if the actual size is enough. - front *list.Element - position int + front *list.Element + currentWindowLength int updateInterval time.Duration lastSpeed float64 @@ -119,7 +119,7 @@ func (m *Manager) AddProgress(progress string, current, total float64, updateInt } m.progresses[progress] = pi pi.front = history.Front() - pi.position = 1 + pi.currentWindowLength = 1 } return } @@ -139,16 +139,16 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is } p.history.PushBack(current) - p.position++ + p.currentWindowLength++ // try to move `front` into correct place. - for p.position > p.windowLength { + for p.currentWindowLength > p.windowLength { p.front = p.front.Next() - p.position-- + p.currentWindowLength-- } - for p.position < p.windowLength && p.front.Prev() != nil { + for p.currentWindowLength < p.windowLength && p.front.Prev() != nil { p.front = p.front.Prev() - p.position++ + p.currentWindowLength++ } for p.history.Len() > p.windowCapacity { @@ -161,11 +161,11 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is } else if isInc { // the value increases, e.g., [1, 2, 3] p.lastSpeed = (p.history.Back().Value.(float64) - p.front.Value.(float64)) / - (float64(p.position-1) * p.updateInterval.Seconds()) + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] p.lastSpeed = (p.front.Value.(float64) - p.history.Back().Value.(float64)) / - (float64(p.position-1) * p.updateInterval.Seconds()) + (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { p.lastSpeed = 0 From d7514c70a938ef40548b2bf1f2785025fd73d1fc Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 16:22:52 +0800 Subject: [PATCH 11/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 8 ++++---- pkg/progress/progress_test.go | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index f70400eb9ce..67406a223d3 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -62,12 +62,12 @@ type progressIndicator struct { windowLength int // front is the first element which should be used. // currentWindowLength indicates where the front is currently in the queue. - // Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. + // Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1. // After update 3 times with 2, 3, 4 separately. - // The front is [3], the position is 2, and values in queue are [(1,2),3,4] + // The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4] // ^ front - // - - position = len([3,4]) = 2 - // We will always keep the position equal to windowLength if the actual size is enough. + // - - currentWindowLength = len([3,4]) = 2 + // We will always keep the currentWindowLength equal to windowLength if the actual size is enough. front *list.Element currentWindowLength int diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 720599ebe11..882094e85a2 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -119,11 +119,11 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Greater(ls, 31.0/69.0) // 70/1s+ > 70 re.Less(math.Abs(cs-6.9), 1e-6) - re.Equal(2, m.progresses[n].position) + re.Equal(2, m.progresses[n].currentWindowLength) re.Equal(100.0, m.progresses[n].front.Value.(float64)) m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20)) - re.Equal(3, m.progresses[n].position) + re.Equal(3, m.progresses[n].currentWindowLength) re.Equal(100.0, m.progresses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -141,7 +141,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(math.MaxFloat64, ls) re.Equal(0.0, cs) m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20)) - re.Equal(121, m.progresses[n].position) + re.Equal(121, m.progresses[n].currentWindowLength) re.Equal(30.0, m.progresses[n].front.Value.(float64)) re.Equal(721, m.progresses[n].history.Len()) @@ -157,7 +157,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10)) re.Equal(721, m.progresses[n].history.Len()) - re.Equal(61, m.progresses[n].position) + re.Equal(61, m.progresses[n].currentWindowLength) re.Equal(28.0, m.progresses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -166,7 +166,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(0.0, cs) m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20)) - re.Equal(121, m.progresses[n].position) + re.Equal(121, m.progresses[n].currentWindowLength) re.Equal(30.0, m.progresses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -175,7 +175,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(float64(2./120/10.), cs) m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12)) - re.Equal(73, m.progresses[n].position) + re.Equal(73, m.progresses[n].currentWindowLength) re.Equal(30.0, m.progresses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -184,7 +184,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { re.Equal(float64(29./72/10.), cs) m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5)) - re.Equal(61, m.progresses[n].position) + re.Equal(61, m.progresses[n].currentWindowLength) re.Equal(28.0, m.progresses[n].front.Value.(float64)) p, ls, cs, err = m.Status(n) re.NoError(err) @@ -194,7 +194,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180)) p, ls, cs, err = m.Status(n) - re.Equal(721, m.progresses[n].position) + re.Equal(721, m.progresses[n].currentWindowLength) re.Equal(30.0, m.progresses[n].front.Value.(float64)) re.NoError(err) re.Equal(0.99, p) From 9cf28eec86c3f1b15228e02013f9291004ec910e Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 16:28:29 +0800 Subject: [PATCH 12/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index 67406a223d3..a3ab971cf06 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -65,8 +65,8 @@ type progressIndicator struct { // Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1. // After update 3 times with 2, 3, 4 separately. // The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4] - // ^ front - // - - currentWindowLength = len([3,4]) = 2 + // ^ front + // - - currentWindowLength = len([3,4]) = 2 // We will always keep the currentWindowLength equal to windowLength if the actual size is enough. front *list.Element currentWindowLength int From 5cccb7edb8e2a1003aa143636a5780e171b4e119 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 17:20:58 +0800 Subject: [PATCH 13/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go index a3ab971cf06..855aa793a83 100644 --- a/pkg/progress/progress.go +++ b/pkg/progress/progress.go @@ -52,13 +52,14 @@ type progressIndicator struct { remaining float64 // We use a fixed interval's history to calculate the latest average speed. history *list.List - // We use (speedStatisticalWindowCapacity / updateInterval + 1) to get the windowCapacity. + // We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity. // Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4]. // Then we update it again with 5, the window will become [2, 3, 4, 5]. windowCapacity int // windowLength is used to determine what data will be computed. - // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. - // It helps us avoid calculation results jumping change when patrol-region-duration changes. + // Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. + // After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4]. + // It helps us avoid calculation results jumping change when patrol-region-interval changes. windowLength int // front is the first element which should be used. // currentWindowLength indicates where the front is currently in the queue. @@ -160,11 +161,11 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is p.lastSpeed = 0 } else if isInc { // the value increases, e.g., [1, 2, 3] - p.lastSpeed = (p.history.Back().Value.(float64) - p.front.Value.(float64)) / + p.lastSpeed = (current - p.front.Value.(float64)) / (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } else { // the value decreases, e.g., [3, 2, 1] - p.lastSpeed = (p.front.Value.(float64) - p.history.Back().Value.(float64)) / + p.lastSpeed = (p.front.Value.(float64) - current) / (float64(p.currentWindowLength-1) * p.updateInterval.Seconds()) } if p.lastSpeed < 0 { From f67bdafebd769da2516fcefdd13c86470574f274 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 17:48:26 +0800 Subject: [PATCH 14/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress_test.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 882094e85a2..94b9239ef83 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -41,9 +41,7 @@ func TestProgress(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) - // 30/(70/1s+) > 30/70 - re.Greater(ls, 30.0/70.0) - // 70/1s+ > 70 + re.Less(math.Abs(ls-30.0/7.0), 1e-6) re.Less(math.Abs(cs-7), 1e-6) // there is no scheduling for i := 0; i < 1000; i++ { @@ -115,9 +113,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.69, p) - // 31/(69/1s+) > 31/69 - re.Greater(ls, 31.0/69.0) - // 70/1s+ > 70 + re.Less(math.Abs(ls-31.0/6.9), 1e-6) re.Less(math.Abs(cs-6.9), 1e-6) re.Equal(2, m.progresses[n].currentWindowLength) re.Equal(100.0, m.progresses[n].front.Value.(float64)) @@ -128,7 +124,7 @@ func TestProgressWithDynamicWindow(t *testing.T) { p, ls, cs, err = m.Status(n) re.NoError(err) re.Equal(0.7, p) - re.Greater(ls, 30.0/(70.0/2)) + re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6) re.Less(math.Abs(cs-3.5), 1e-6) for i := 0; i < 1000; i++ { From 8a8838f2ead78095e2da11db934aa6e445cee8ca Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 17:48:56 +0800 Subject: [PATCH 15/16] address comment Signed-off-by: Cabinfever_B --- pkg/progress/progress_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/progress/progress_test.go b/pkg/progress/progress_test.go index 94b9239ef83..a7b159bc907 100644 --- a/pkg/progress/progress_test.go +++ b/pkg/progress/progress_test.go @@ -24,7 +24,6 @@ import ( ) func TestProgress(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -68,7 +67,6 @@ func TestProgress(t *testing.T) { } func TestAbnormal(t *testing.T) { - t.Parallel() re := require.New(t) n := "test" m := NewManager() @@ -96,7 +94,6 @@ func TestAbnormal(t *testing.T) { func TestProgressWithDynamicWindow(t *testing.T) { // The full capacity of queue is 721. - t.Parallel() re := require.New(t) n := "test" m := NewManager() From 7e3cf2809fe0a616b1c0f810281278b0778c40ae Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 29 Mar 2024 20:39:02 +0800 Subject: [PATCH 16/16] address comment Signed-off-by: Cabinfever_B --- pkg/schedule/coordinator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index ecfac22a277..35d9c2029a1 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -73,8 +73,8 @@ type Coordinator struct { ctx context.Context cancel context.CancelFunc - schedulersInitialized bool - patrolCheckRegionsDuration time.Duration + schedulersInitialized bool + patrolRegionsDuration time.Duration cluster sche.ClusterInformer prepareChecker *prepareChecker @@ -118,13 +118,13 @@ func (c *Coordinator) GetPatrolRegionsDuration() time.Duration { } c.RLock() defer c.RUnlock() - return c.patrolCheckRegionsDuration + return c.patrolRegionsDuration } func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) { c.Lock() defer c.Unlock() - c.patrolCheckRegionsDuration = dur + c.patrolRegionsDuration = dur } // markSchedulersInitialized marks the scheduler initialization is finished.