Skip to content

Commit

Permalink
This is an automated cherry-pick of #7722
Browse files Browse the repository at this point in the history
close #7726

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Apr 1, 2024
1 parent 8783927 commit dc62563
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 31 deletions.
103 changes: 88 additions & 15 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
maxSpeedCalculationWindow = 2 * time.Hour
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
Expand All @@ -46,12 +52,28 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity.
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1].
// After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
// It helps us avoid calculation results jumping change when patrol-region-interval changes.
windowLength int
// front is the first element which should be used.
// currentWindowLength indicates where the front is currently in the queue.
// Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1.
// After update 3 times with 2, 3, 4 separately.
// The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4]
// ^ front
// - - currentWindowLength = len([3,4]) = 2
// We will always keep the currentWindowLength equal to windowLength if the actual size is enough.
front *list.Element
currentWindowLength int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
Expand All @@ -62,52 +84,103 @@ func (m *Manager) Reset() {
m.progesses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > maxSpeedCalculationWindow {
dur = maxSpeedCalculationWindow
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
<<<<<<< HEAD

Check failure on line 109 in pkg/progress/progress.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected <<, expected }
if _, exist = m.progesses[progress]; !exist {
m.progesses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
=======
if _, exist = m.progresses[progress]; !exist {
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722))

Check failure on line 126 in pkg/progress/progress.go

View workflow job for this annotation

GitHub Actions / tso-function-test

invalid character U+0023 '#'
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.currentWindowLength = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

<<<<<<< HEAD

Check failure on line 143 in pkg/progress/progress.go

View workflow job for this annotation

GitHub Actions / tso-function-test

syntax error: unexpected <<, expected }
if p, exist := m.progesses[progress]; exist {
=======
if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722))

Check failure on line 150 in pkg/progress/progress.go

View workflow job for this annotation

GitHub Actions / tso-function-test

invalid character U+0023 '#'
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.currentWindowLength++

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand Down
138 changes: 131 additions & 7 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

func TestProgress(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -41,15 +40,17 @@ func TestProgress(t *testing.T) {
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
// 30/(70/1s+) > 30/70
re.Greater(ls, 30.0/70.0)
// 70/1s+ > 70
re.Less(cs, 70.0)
re.Less(math.Abs(ls-30.0/7.0), 1e-6)
re.Less(math.Abs(cs-7), 1e-6)
// there is no scheduling
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
<<<<<<< HEAD

Check failure on line 49 in pkg/progress/progress_test.go

View workflow job for this annotation

GitHub Actions / chunks (5)

expected statement, found '<<'
re.Equal(61, m.progesses[n].history.Len())
=======
re.Equal(721, m.progresses[n].history.Len())
>>>>>>> 945e29c03 (cluster: dynamic progress time window for offline scene (#7722))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
Expand All @@ -70,7 +71,6 @@ func TestProgress(t *testing.T) {
}

func TestAbnormal(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -95,3 +95,127 @@ func TestAbnormal(t *testing.T) {
re.Equal(0.0, ls)
re.Equal(0.0, cs)
}

func TestProgressWithDynamicWindow(t *testing.T) {
// The full capacity of queue is 721.
re := require.New(t)
n := "test"
m := NewManager()
re.False(m.AddProgress(n, 100, 100, 10*time.Second))
p, ls, cs, err := m.Status(n)
re.NoError(err)
re.Equal(0.0, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
time.Sleep(time.Second)
re.True(m.AddProgress(n, 100, 100, 10*time.Second))

m.UpdateProgress(n, 31, 31, false)
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.69, p)
re.Less(math.Abs(ls-31.0/6.9), 1e-6)
re.Less(math.Abs(cs-6.9), 1e-6)
re.Equal(2, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))

m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20))
re.Equal(3, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6)
re.Less(math.Abs(cs-3.5), 1e-6)

for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.Equal(721, m.progresses[n].history.Len())

for i := 0; i < 60; i++ {
m.UpdateProgress(n, 28, 28, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10))
re.Equal(721, m.progresses[n].history.Len())
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12))
re.Equal(73, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./72)*10.), ls)
re.Equal(float64(29./72/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5))
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(27./60)*10.), ls)
re.Equal(float64(27./60/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180))
p, ls, cs, err = m.Status(n)
re.Equal(721, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./720)*10.), ls)
re.Equal(float64(29./720/10.), cs)
for i := 0; i < 2000; i++ {
m.UpdateProgress(n, 1, 1, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
re.Len(ps, 1)
re.Equal(n, ps[0])
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
re.Empty(ps)
re.True(m.RemoveProgress(n))
re.False(m.RemoveProgress(n))
}
22 changes: 21 additions & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Coordinator struct {
cancel context.CancelFunc

schedulersInitialized bool
patrolRegionsDuration time.Duration

cluster sche.ClusterInformer
prepareChecker *prepareChecker
Expand Down Expand Up @@ -110,6 +111,22 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Coordinator) GetPatrolRegionsDuration() time.Duration {
if c == nil {
return 0
}
c.RLock()
defer c.RUnlock()
return c.patrolRegionsDuration
}

func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) {
c.Lock()
defer c.Unlock()
c.patrolRegionsDuration = dur
}

// markSchedulersInitialized marks the scheduler initialization is finished.
func (c *Coordinator) markSchedulersInitialized() {
c.Lock()
Expand Down Expand Up @@ -157,6 +174,7 @@ func (c *Coordinator) PatrolRegions() {
ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
log.Info("patrol regions has been stopped")
return
}
Expand All @@ -178,7 +196,9 @@ func (c *Coordinator) PatrolRegions() {
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
if len(key) == 0 {
patrolCheckRegionsGauge.Set(time.Since(start).Seconds())
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
Expand Down
Loading

0 comments on commit dc62563

Please sign in to comment.