diff --git a/daemon/pool.go b/daemon/pool.go index 09362c7..376eb46 100644 --- a/daemon/pool.go +++ b/daemon/pool.go @@ -43,14 +43,16 @@ var ( errDriverStopped = errors.NewKind("driver stopped") ) +const defaultPolicyTargetWindow = 5 // enough to prevent flickering + var ( // policyDefaultWindow is a window for the average function used in the default scaling // policy. The window will be divided by policyDefaultTick intervals to calculate the // size of the window buffer, so this should ideally be a multiple of policyDefaultTick. - policyDefaultWindow = mustEnvDur("BBLFSHD_POLICY_WINDOW", time.Second*3) + policyDefaultWindow = mustEnvDur("BBLFSHD_POLICY_WINDOW", 5*time.Second) // policyDefaultTick is a tick rate for the goroutine that re-evaluates the scaling // policy for each driver pool. - policyDefaultTick = mustEnvDur("BBLFSHD_POLICY_TICK", time.Millisecond*100) + policyDefaultTick = mustEnvDur("BBLFSHD_POLICY_TICK", 500*time.Millisecond) // policyDefaultScale is a default increment for an additive increase scaling. // // See AIMD for more details. @@ -253,9 +255,9 @@ func (dp *DriverPool) runPolicy(ctx context.Context) { return case <-ticker.C: } + dp.drivers.RLock() total := dp.running.Value() load := dp.requests.Value() - dp.drivers.RLock() idle := len(dp.drivers.idle) dp.drivers.RUnlock() @@ -847,45 +849,71 @@ func defaultScalingPolicy() ScalingPolicy { // DefaultScalingPolicy returns a new instance of the default scaling policy. // Instances returned by this function should not be reused. func DefaultScalingPolicy() ScalingPolicy { - window := int(policyDefaultWindow / policyDefaultTick) - return MovingAverage(window, defaultScalingPolicy()) + windowIn := int(policyDefaultWindow / policyDefaultTick) + return TargetMovingAverage(defaultPolicyTargetWindow, MovingAverage(windowIn, defaultScalingPolicy())) +} + +func newMovingAverage(window int) *movingAverage { + return &movingAverage{samples: make([]int, 0, window)} } type movingAverage struct { + samples []int + next int + sum int +} + +func (m *movingAverage) AddSample(v int) int { + if len(m.samples) < cap(m.samples) { + m.sum += v + m.samples = append(m.samples, v) + m.next++ + } else { + next := m.next % cap(m.samples) + m.sum = (m.sum + v) - m.samples[next] + m.samples[next] = v + m.next = next + 1 + } + return int(math.Ceil(float64(m.sum) / float64(len(m.samples)))) +} + +type loadMovingAverage struct { sub ScalingPolicy // policy that will use an average - loads []int - pos int + loads *movingAverage } // MovingAverage computes a moving average of the load and forwards it to the // underlying scaling policy. This policy is stateful and not thread-safe, do not // reuse its instances for multiple pools. func MovingAverage(window int, p ScalingPolicy) ScalingPolicy { - return &movingAverage{ + return &loadMovingAverage{ sub: p, - loads: make([]int, 0, window), - pos: 0, + loads: newMovingAverage(window), } } -func (p *movingAverage) Scale(total, idle, load int) int { - if len(p.loads) < cap(p.loads) { - p.loads = append(p.loads, load) - } else { - p.loads[p.pos] = load - } - p.pos++ - if p.pos >= cap(p.loads) { - p.pos = 0 - } +func (p *loadMovingAverage) Scale(total, idle, load int) int { + avg := p.loads.AddSample(load) + return p.sub.Scale(total, idle, avg) +} + +type targetMovingAverage struct { + sub ScalingPolicy // policy that we will average + targets *movingAverage +} - var sum int - for _, v := range p.loads { - sum += v +// TargetMovingAverage computes a moving average of the target instance count. +// This policy is stateful and not thread-safe, do not reuse its instances for multiple pools. +func TargetMovingAverage(window int, p ScalingPolicy) ScalingPolicy { + return &targetMovingAverage{ + sub: p, + targets: newMovingAverage(window), } +} - avg := int(float64(sum) / float64(len(p.loads))) - return p.sub.Scale(total, idle, avg) +func (p *targetMovingAverage) Scale(total, idle, load int) int { + target := p.sub.Scale(total, idle, load) + return p.targets.AddSample(target) } type minMax struct { diff --git a/daemon/pool_test.go b/daemon/pool_test.go index f36f1d3..86cc8d5 100644 --- a/daemon/pool_test.go +++ b/daemon/pool_test.go @@ -205,7 +205,8 @@ func TestDriverPoolExecute_Parallel(t *testing.T) { wg.Wait() require.Len(dp.Current(), runtime.NumCPU()) - time.Sleep(policyDefaultWindow) + // need approximately two full windows, times the inverse downscale factor + time.Sleep(policyDefaultWindow * defaultPolicyTargetWindow * time.Duration(1/policyDefaultDownscale)) require.Equal(1, dp.State().Running) err = dp.Stop()