Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: dynamic progress time window for offline scene #7722

Merged
merged 20 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 74 additions & 21 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

// speedStatisticalWindow is the speed calculation window
const speedStatisticalWindow = 10 * time.Minute
const (
// speedStatisticalWindowCapacity is the size of the time window used to calculate the speed,
// but it does not mean that all data in it will be used to calculate the speed,
// which data is used depends on the patrol region duration
speedStatisticalWindowCapacity = 2 * time.Hour
rleungx marked this conversation as resolved.
Show resolved Hide resolved
// minSpeedCalculationWindow is the minimum speed calculation window
minSpeedCalculationWindow = 10 * time.Minute
)

// Manager is used to maintain the progresses we care about.
type Manager struct {
Expand All @@ -46,12 +52,22 @@ type progressIndicator struct {
remaining float64
// We use a fixed interval's history to calculate the latest average speed.
history *list.List
// We use speedStatisticalWindow / updateInterval to get the windowLengthLimit.
// Assume that the windowLengthLimit is 3, the init value is 1. after update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// We use (speedStatisticalWindowCapacity / updateInterval + 1) to get the windowCapacity.
rleungx marked this conversation as resolved.
Show resolved Hide resolved
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
// Assume that the windowCapacity is 4, the init value is 1. After update 3 times with 2, 3, 4 separately. The window will become [1, 2, 3, 4].
// Then we update it again with 5, the window will become [2, 3, 4, 5].
windowLengthLimit int
updateInterval time.Duration
lastSpeed float64
windowCapacity int
// windowLength is used to determine what data will be computed.
// Assume that the windowLength is 2, the init value is 1. The value that will be calculated are [1]. After update 3 times with 2, 3, 4 separately. The value that will be calculated are [3,4] and the values in queue are [(1,2),3,4].
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
// It helps us avoid calculation results jumping change when patrol-region-duration changes.
windowLength int
// front is the first element which should be used.
// position indicates where the front is currently in the queue
// Assume that the windowLength is 2, the init value is 1. The front is [1] and position is 1. After update 3 times with 2, 3, 4 separately. The front is [3], and the position is 2.
front *list.Element
position int

updateInterval time.Duration
lastSpeed float64
}

// Reset resets the progress manager.
Expand All @@ -62,52 +78,89 @@ func (m *Manager) Reset() {
m.progresses = make(map[string]*progressIndicator)
}

// Option is used to do some action for progressIndicator.
type Option func(*progressIndicator)

// WindowDurationOption changes the time window size.
func WindowDurationOption(dur time.Duration) func(*progressIndicator) {
return func(pi *progressIndicator) {
if dur < minSpeedCalculationWindow {
dur = minSpeedCalculationWindow
} else if dur > speedStatisticalWindowCapacity {
dur = speedStatisticalWindowCapacity
}
pi.windowLength = int(dur/pi.updateInterval) + 1
}
}

// AddProgress adds a progress into manager if it doesn't exist.
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration) (exist bool) {
func (m *Manager) AddProgress(progress string, current, total float64, updateInterval time.Duration, opts ...Option) (exist bool) {
m.Lock()
defer m.Unlock()

history := list.New()
history.PushBack(current)
if _, exist = m.progresses[progress]; !exist {
m.progresses[progress] = &progressIndicator{
total: total,
remaining: total,
history: history,
windowLengthLimit: int(speedStatisticalWindow / updateInterval),
updateInterval: updateInterval,
pi := &progressIndicator{
total: total,
remaining: total,
history: history,
windowCapacity: int(speedStatisticalWindowCapacity/updateInterval) + 1,
windowLength: int(minSpeedCalculationWindow / updateInterval),
updateInterval: updateInterval,
}
for _, op := range opts {
op(pi)
}
m.progresses[progress] = pi
pi.front = history.Front()
pi.position = 1
}
return
}

// UpdateProgress updates the progress if it exists.
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool) {
func (m *Manager) UpdateProgress(progress string, current, remaining float64, isInc bool, opts ...Option) {
m.Lock()
defer m.Unlock()

if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

if p.history.Len() > p.windowLengthLimit {
p.history.PushBack(current)
p.position++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the current window or something like that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it


// try to move `front` into correct place.
for p.position > p.windowLength {
p.front = p.front.Next()
p.position--
}
for p.position < p.windowLength && p.front.Prev() != nil {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
p.front = p.front.Prev()
p.position++
}

for p.history.Len() > p.windowCapacity {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
p.history.Remove(p.history.Front())
}
p.history.PushBack(current)

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (p.history.Back().Value.(float64) - p.history.Front().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.history.Back().Value.(float64) - p.front.Value.(float64)) /
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
(float64(p.position-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.history.Front().Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.history.Len()-1) * p.updateInterval.Seconds())
p.lastSpeed = (p.front.Value.(float64) - p.history.Back().Value.(float64)) /
(float64(p.position-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
Expand Down
133 changes: 130 additions & 3 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func TestProgress(t *testing.T) {
// 30/(70/1s+) > 30/70
re.Greater(ls, 30.0/70.0)
// 70/1s+ > 70
re.Less(cs, 70.0)
re.Less(math.Abs(cs-7), 1e-6)
// there is no scheduling
for i := 0; i < 100; i++ {
for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(61, m.progresses[n].history.Len())
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
Expand Down Expand Up @@ -95,3 +95,130 @@ func TestAbnormal(t *testing.T) {
re.Equal(0.0, ls)
re.Equal(0.0, cs)
}

func TestProgressWithDynamicWindow(t *testing.T) {
// The full capacity of queue is 721.
t.Parallel()
re := require.New(t)
n := "test"
m := NewManager()
re.False(m.AddProgress(n, 100, 100, 10*time.Second))
p, ls, cs, err := m.Status(n)
re.NoError(err)
re.Equal(0.0, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
time.Sleep(time.Second)
re.True(m.AddProgress(n, 100, 100, 10*time.Second))

m.UpdateProgress(n, 31, 31, false)
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.69, p)
// 31/(69/1s+) > 31/69
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
re.Greater(ls, 31.0/69.0)
// 70/1s+ > 70
re.Less(math.Abs(cs-6.9), 1e-6)
re.Equal(2, m.progresses[n].position)
re.Equal(100.0, m.progresses[n].front.Value.(float64))

m.UpdateProgress(n, 30, 30, false, WindowDurationOption(time.Minute*20))
re.Equal(3, m.progresses[n].position)
re.Equal(100.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Greater(ls, 30.0/(70.0/2))
re.Less(math.Abs(cs-3.5), 1e-6)

for i := 0; i < 1000; i++ {
m.UpdateProgress(n, 30, 30, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.7, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)
m.UpdateProgress(n, 29, 29, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].position)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.Equal(721, m.progresses[n].history.Len())

for i := 0; i < 60; i++ {
m.UpdateProgress(n, 28, 28, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*10))
re.Equal(721, m.progresses[n].history.Len())
re.Equal(61, m.progresses[n].position)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

m.UpdateProgress(n, 28, 28, false, WindowDurationOption(time.Minute*20))
re.Equal(121, m.progresses[n].position)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.72, p)
re.Equal(float64(28/(2./120)*10.), ls)
re.Equal(float64(2./120/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*12))
re.Equal(73, m.progresses[n].position)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./72)*10.), ls)
re.Equal(float64(29./72/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*5))
re.Equal(61, m.progresses[n].position)
re.Equal(28.0, m.progresses[n].front.Value.(float64))
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(27./60)*10.), ls)
re.Equal(float64(27./60/10.), cs)

m.UpdateProgress(n, 1, 1, false, WindowDurationOption(time.Minute*180))
p, ls, cs, err = m.Status(n)
re.Equal(721, m.progresses[n].position)
re.Equal(30.0, m.progresses[n].front.Value.(float64))
re.NoError(err)
re.Equal(0.99, p)
re.Equal(float64(1/(29./720)*10.), ls)
re.Equal(float64(29./720/10.), cs)
for i := 0; i < 2000; i++ {
m.UpdateProgress(n, 1, 1, false)
}
re.Equal(721, m.progresses[n].history.Len())
p, ls, cs, err = m.Status(n)
re.NoError(err)
re.Equal(0.99, p)
re.Equal(math.MaxFloat64, ls)
re.Equal(0.0, cs)

ps := m.GetProgresses(func(p string) bool {
return strings.Contains(p, n)
})
re.Len(ps, 1)
re.Equal(n, ps[0])
ps = m.GetProgresses(func(p string) bool {
return strings.Contains(p, "a")
})
re.Empty(ps)
re.True(m.RemoveProgress(n))
re.False(m.RemoveProgress(n))
}
24 changes: 22 additions & 2 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Coordinator struct {
ctx context.Context
cancel context.CancelFunc

schedulersInitialized bool
schedulersInitialized bool
patrolCheckRegionsDuration time.Duration

cluster sche.ClusterInformer
prepareChecker *prepareChecker
Expand Down Expand Up @@ -110,6 +111,22 @@ func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbS
}
}

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Coordinator) GetPatrolRegionsDuration() time.Duration {
if c == nil {
return 0
}
c.RLock()
defer c.RUnlock()
return c.patrolCheckRegionsDuration
}

func (c *Coordinator) setPatrolRegionsDuration(dur time.Duration) {
c.Lock()
defer c.Unlock()
c.patrolCheckRegionsDuration = dur
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}

// markSchedulersInitialized marks the scheduler initialization is finished.
func (c *Coordinator) markSchedulersInitialized() {
c.Lock()
Expand Down Expand Up @@ -157,6 +174,7 @@ func (c *Coordinator) PatrolRegions() {
ticker.Reset(c.cluster.GetCheckerConfig().GetPatrolRegionInterval())
case <-c.ctx.Done():
patrolCheckRegionsGauge.Set(0)
c.setPatrolRegionsDuration(0)
log.Info("patrol regions has been stopped")
return
}
Expand All @@ -178,7 +196,9 @@ func (c *Coordinator) PatrolRegions() {
// Updates the label level isolation statistics.
c.cluster.UpdateRegionsLabelLevelStats(regions)
if len(key) == 0 {
patrolCheckRegionsGauge.Set(time.Since(start).Seconds())
dur := time.Since(start)
patrolCheckRegionsGauge.Set(dur.Seconds())
c.setPatrolRegionsDuration(dur)
start = time.Now()
}
failpoint.Inject("break-patrol", func() {
Expand Down
18 changes: 10 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@
if err == nil {
regionSize := float64(c.core.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval)
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration()))
// record the current store limit in memory
c.prevStoreLimit[storeID] = map[storelimit.Type]float64{
storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer),
Expand Down Expand Up @@ -1935,21 +1935,23 @@

func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string, current, remaining float64, isInc bool) {
storeLabel := strconv.FormatUint(storeID, 10)
var progress string
var progressName string
var opts []progress.Option
switch action {
case removingAction:
progress = encodeRemovingProgressKey(storeID)
progressName = encodeRemovingProgressKey(storeID)
opts = []progress.Option{progress.WindowDurationOption(c.coordinator.GetPatrolRegionsDuration())}
case preparingAction:
progress = encodePreparingProgressKey(storeID)
progressName = encodePreparingProgressKey(storeID)
}

if exist := c.progressManager.AddProgress(progress, current, remaining, nodeStateCheckJobInterval); !exist {
if exist := c.progressManager.AddProgress(progressName, current, remaining, nodeStateCheckJobInterval, opts...); !exist {
return
}
c.progressManager.UpdateProgress(progress, current, remaining, isInc)
process, ls, cs, err := c.progressManager.Status(progress)
c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...)
process, ls, cs, err := c.progressManager.Status(progressName)
if err != nil {
log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err))
log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err))

Check warning on line 1954 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L1954

Added line #L1954 was not covered by tests
return
}
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process)
Expand Down
Loading