Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: dynamic progress time window for offline scene #7722

Merged
merged 20 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 80 additions & 21 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
maxSpeedCalculationWindow = 2 * time.Hour
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
Expand All @@ -46,12 +52,28 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity.
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1].
// After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
// It helps us avoid calculation results jumping change when patrol-region-interval changes.
windowLength int
// front is the first element which should be used.
// currentWindowLength indicates where the front is currently in the queue.
// Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1.
// After update 3 times with 2, 3, 4 separately.
// The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4]
// ^ front
// - - currentWindowLength = len([3,4]) = 2
// We will always keep the currentWindowLength equal to windowLength if the actual size is enough.
front *list.Element
currentWindowLength int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
Expand All @@ -62,52 +84,89 @@ func (m *Manager) Reset() {
m.progresses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > maxSpeedCalculationWindow {
dur = maxSpeedCalculationWindow
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progresses[progress]; !exist {
m.progresses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.currentWindowLength = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.currentWindowLength++

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

for p.history.Len() > p.windowCapacity {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand Down
136 changes: 128 additions & 8 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

func TestProgress(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -41,15 +40,13 @@ func TestProgress(t *testing.T) {
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
// 30/(70/1s+) > 30/70
re.Greater(ls, 30.0/70.0)
// 70/1s+ > 70
re.Less(cs, 70.0)
re.Less(math.Abs(ls-30.0/7.0), 1e-6)
re.Less(math.Abs(cs-7), 1e-6)
// there is no scheduling
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(61, m.progresses[n].history.Len())
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
Expand All @@ -70,7 +67,6 @@ func TestProgress(t *testing.T) {
}

func TestAbnormal(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -95,3 +91,127 @@ func TestAbnormal(t *testing.T) {
re.Equal(0.0, ls)
re.Equal(0.0, cs)
}

func TestProgressWithDynamicWindow(t *testing.T) {
// The full capacity of queue is 721.
re := require.New(t)
n := "test"
m := NewManager()
re.False(m.AddProgress(n, 100, 100, 10*time.Second))
p, ls, cs, err := m.Status(n)
re.NoError(err)
re.Equal(0.0, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
time.Sleep(time.Second)
re.True(m.AddProgress(n, 100, 100, 10*time.Second))

m.UpdateProgress(n, 31, 31, false)
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.69, p)
re.Less(math.Abs(ls-31.0/6.9), 1e-6)
re.Less(math.Abs(cs-6.9), 1e-6)
re.Equal(2, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))

m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20))
re.Equal(3, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6)
re.Less(math.Abs(cs-3.5), 1e-6)

for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.Equal(721, m.progresses[n].history.Len())

for i := 0; i < 60; i++ {
m.UpdateProgress(n, 28, 28, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10))
re.Equal(721, m.progresses[n].history.Len())
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12))
re.Equal(73, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./72)*10.), ls)
re.Equal(float64(29./72/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5))
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(27./60)*10.), ls)
re.Equal(float64(27./60/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180))
p, ls, cs, err = m.Status(n)
re.Equal(721, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./720)*10.), ls)
re.Equal(float64(29./720/10.), cs)
for i := 0; i < 2000; i++ {
m.UpdateProgress(n, 1, 1, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
re.Len(ps, 1)
re.Equal(n, ps[0])
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
re.Empty(ps)
re.True(m.RemoveProgress(n))
re.False(m.RemoveProgress(n))
}
24 changes: 22 additions & 2 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Coordinator struct {
ctx context.Context
cancel context.CancelFunc

schedulersInitialized bool
schedulersInitialized bool
patrolCheckRegionsDuration time.Duration

cluster sche.ClusterInformer
prepareChecker *prepareChecker
Expand Down Expand Up @@ -110,6 +111,22 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Coordinator) GetPatrolRegionsDuration() time.Duration {
if c == nil {
return 0
}
c.RLock()
defer c.RUnlock()
return c.patrolCheckRegionsDuration
}

func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) {
c.Lock()
defer c.Unlock()
c.patrolCheckRegionsDuration = dur
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}

// markSchedulersInitialized marks the scheduler initialization is finished.
func (c *Coordinator) markSchedulersInitialized() {
c.Lock()
Expand Down Expand Up @@ -157,6 +174,7 @@ func (c *Coordinator) PatrolRegions() {
ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
log.Info("patrol regions has been stopped")
return
}
Expand All @@ -178,7 +196,9 @@ func (c *Coordinator) PatrolRegions() {
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
if len(key) == 0 {
patrolCheckRegionsGauge.Set(time.Since(start).Seconds())
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
Expand Down
Loading
Loading