Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: dynamic progress time window for offline scene (#7722) #8007

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 90 additions & 31 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// maxSpeedCalculationWindow is the maximum size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
maxSpeedCalculationWindow = 2 * time.Hour
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
syncutil.RWMutex
progesses map[string]*progressIndicator
progresses map[string]*progressIndicator
}

// NewManager creates a new Manager.
func NewManager() *Manager {
return &Manager{
progesses: make(map[string]*progressIndicator),
progresses: make(map[string]*progressIndicator),
}
}

Expand All @@ -46,68 +52,121 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (maxSpeedCalculationWindow / updateInterval + 1) to get the windowCapacity.
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1].
// After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
// It helps us avoid calculation results jumping change when patrol-region-interval changes.
windowLength int
// front is the first element which should be used.
// currentWindowLength indicates where the front is currently in the queue.
// Assume that the windowLength is 2, the init value is 1. The front is [1] and currentWindowLength is 1.
// After update 3 times with 2, 3, 4 separately.
// The front is [3], the currentWindowLength is 2, and values in queue are [(1,2),3,4]
// ^ front
// - - currentWindowLength = len([3,4]) = 2
// We will always keep the currentWindowLength equal to windowLength if the actual size is enough.
front *list.Element
currentWindowLength int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
func (m *Manager) Reset() {
m.Lock()
defer m.Unlock()

m.progesses = make(map[string]*progressIndicator)
m.progresses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > maxSpeedCalculationWindow {
dur = maxSpeedCalculationWindow
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progesses[progress]; !exist {
m.progesses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
if _, exist = m.progresses[progress]; !exist {
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(maxSpeedCalculationWindow/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.currentWindowLength = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.currentWindowLength++

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand All @@ -120,7 +179,7 @@ func (m *Manager) UpdateProgressTotal(progress string, total float64) {
m.Lock()
defer m.Unlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
p.total = total
}
}
Expand All @@ -130,8 +189,8 @@ func (m *Manager) RemoveProgress(progress string) (exist bool) {
m.Lock()
defer m.Unlock()

if _, exist = m.progesses[progress]; exist {
delete(m.progesses, progress)
if _, exist = m.progresses[progress]; exist {
delete(m.progresses, progress)
return
}
return
Expand All @@ -143,7 +202,7 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string {
defer m.RUnlock()

processes := []string{}
for p := range m.progesses {
for p := range m.progresses {
if filter(p) {
processes = append(processes, p)
}
Expand All @@ -156,7 +215,7 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl
m.RLock()
defer m.RUnlock()

if p, exist := m.progesses[progress]; exist {
if p, exist := m.progresses[progress]; exist {
process = 1 - p.remaining/p.total
if process < 0 {
process = 0
Expand Down
136 changes: 128 additions & 8 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

func TestProgress(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -41,15 +40,13 @@ func TestProgress(t *testing.T) {
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
// 30/(70/1s+) > 30/70
re.Greater(ls, 30.0/70.0)
// 70/1s+ > 70
re.Less(cs, 70.0)
re.Less(math.Abs(ls-30.0/7.0), 1e-6)
re.Less(math.Abs(cs-7), 1e-6)
// there is no scheduling
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(61, m.progesses[n].history.Len())
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
Expand All @@ -70,7 +67,6 @@ func TestProgress(t *testing.T) {
}

func TestAbnormal(t *testing.T) {
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
Expand All @@ -95,3 +91,127 @@ func TestAbnormal(t *testing.T) {
re.Equal(0.0, ls)
re.Equal(0.0, cs)
}

func TestProgressWithDynamicWindow(t *testing.T) {
// The full capacity of queue is 721.
re := require.New(t)
n := "test"
m := NewManager()
re.False(m.AddProgress(n, 100, 100, 10*time.Second))
p, ls, cs, err := m.Status(n)
re.NoError(err)
re.Equal(0.0, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
time.Sleep(time.Second)
re.True(m.AddProgress(n, 100, 100, 10*time.Second))

m.UpdateProgress(n, 31, 31, false)
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.69, p)
re.Less(math.Abs(ls-31.0/6.9), 1e-6)
re.Less(math.Abs(cs-6.9), 1e-6)
re.Equal(2, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))

m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20))
re.Equal(3, m.progresses[n].currentWindowLength)
re.Equal(100.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Less(math.Abs(ls-30.0/(7.0/2)), 1e-6)
re.Less(math.Abs(cs-3.5), 1e-6)

for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.Equal(721, m.progresses[n].history.Len())

for i := 0; i < 60; i++ {
m.UpdateProgress(n, 28, 28, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10))
re.Equal(721, m.progresses[n].history.Len())
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12))
re.Equal(73, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./72)*10.), ls)
re.Equal(float64(29./72/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5))
re.Equal(61, m.progresses[n].currentWindowLength)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(27./60)*10.), ls)
re.Equal(float64(27./60/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180))
p, ls, cs, err = m.Status(n)
re.Equal(721, m.progresses[n].currentWindowLength)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./720)*10.), ls)
re.Equal(float64(29./720/10.), cs)
for i := 0; i < 2000; i++ {
m.UpdateProgress(n, 1, 1, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
re.Len(ps, 1)
re.Equal(n, ps[0])
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
re.Empty(ps)
re.True(m.RemoveProgress(n))
re.False(m.RemoveProgress(n))
}
Loading
Loading