Skip to content

Commit

Permalink
daemon: properly tune scaling policy and prevent flickering; fixes #194
Browse files Browse the repository at this point in the history
Signed-off-by: Denys Smirnov <[email protected]>
  • Loading branch information
Denys Smirnov authored and dennwc committed May 1, 2019
1 parent aa19e0f commit ba00b7e
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 26 deletions.
78 changes: 53 additions & 25 deletions daemon/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ var (
errDriverStopped = errors.NewKind("driver stopped")
)

const defaultPolicyTargetWindow = 5 // enough to prevent flickering

var (
// policyDefaultWindow is a window for the average function used in the default scaling
// policy. The window will be divided by policyDefaultTick intervals to calculate the
// size of the window buffer, so this should ideally be a multiple of policyDefaultTick.
policyDefaultWindow = mustEnvDur("BBLFSHD_POLICY_WINDOW", time.Second*3)
policyDefaultWindow = mustEnvDur("BBLFSHD_POLICY_WINDOW", 5*time.Second)
// policyDefaultTick is a tick rate for the goroutine that re-evaluates the scaling
// policy for each driver pool.
policyDefaultTick = mustEnvDur("BBLFSHD_POLICY_TICK", time.Millisecond*100)
policyDefaultTick = mustEnvDur("BBLFSHD_POLICY_TICK", 500*time.Millisecond)
// policyDefaultScale is a default increment for an additive increase scaling.
//
// See AIMD for more details.
Expand Down Expand Up @@ -253,9 +255,9 @@ func (dp *DriverPool) runPolicy(ctx context.Context) {
return
case <-ticker.C:
}
dp.drivers.RLock()
total := dp.running.Value()
load := dp.requests.Value()
dp.drivers.RLock()
idle := len(dp.drivers.idle)
dp.drivers.RUnlock()

Expand Down Expand Up @@ -847,45 +849,71 @@ func defaultScalingPolicy() ScalingPolicy {
// DefaultScalingPolicy returns a new instance of the default scaling policy.
// Instances returned by this function should not be reused.
func DefaultScalingPolicy() ScalingPolicy {
window := int(policyDefaultWindow / policyDefaultTick)
return MovingAverage(window, defaultScalingPolicy())
windowIn := int(policyDefaultWindow / policyDefaultTick)
return TargetMovingAverage(defaultPolicyTargetWindow, MovingAverage(windowIn, defaultScalingPolicy()))
}

func newMovingAverage(window int) *movingAverage {
return &movingAverage{samples: make([]int, 0, window)}
}

type movingAverage struct {
samples []int
next int
sum int
}

func (m *movingAverage) AddSample(v int) int {
if len(m.samples) < cap(m.samples) {
m.sum += v
m.samples = append(m.samples, v)
m.next++
} else {
next := m.next % cap(m.samples)
m.sum = (m.sum + v) - m.samples[next]
m.samples[next] = v
m.next = next + 1
}
return int(math.Ceil(float64(m.sum) / float64(len(m.samples))))
}

type loadMovingAverage struct {
sub ScalingPolicy // policy that will use an average
loads []int
pos int
loads *movingAverage
}

// MovingAverage computes a moving average of the load and forwards it to the
// underlying scaling policy. This policy is stateful and not thread-safe, do not
// reuse its instances for multiple pools.
func MovingAverage(window int, p ScalingPolicy) ScalingPolicy {
return &movingAverage{
return &loadMovingAverage{
sub: p,
loads: make([]int, 0, window),
pos: 0,
loads: newMovingAverage(window),
}
}

func (p *movingAverage) Scale(total, idle, load int) int {
if len(p.loads) < cap(p.loads) {
p.loads = append(p.loads, load)
} else {
p.loads[p.pos] = load
}
p.pos++
if p.pos >= cap(p.loads) {
p.pos = 0
}
func (p *loadMovingAverage) Scale(total, idle, load int) int {
avg := p.loads.AddSample(load)
return p.sub.Scale(total, idle, avg)
}

type targetMovingAverage struct {
sub ScalingPolicy // policy that we will average
targets *movingAverage
}

var sum int
for _, v := range p.loads {
sum += v
// TargetMovingAverage computes a moving average of the target instance count.
// This policy is stateful and not thread-safe, do not reuse its instances for multiple pools.
func TargetMovingAverage(window int, p ScalingPolicy) ScalingPolicy {
return &targetMovingAverage{
sub: p,
targets: newMovingAverage(window),
}
}

avg := int(float64(sum) / float64(len(p.loads)))
return p.sub.Scale(total, idle, avg)
func (p *targetMovingAverage) Scale(total, idle, load int) int {
target := p.sub.Scale(total, idle, load)
return p.targets.AddSample(target)
}

type minMax struct {
Expand Down
3 changes: 2 additions & 1 deletion daemon/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func TestDriverPoolExecute_Parallel(t *testing.T) {
wg.Wait()
require.Len(dp.Current(), runtime.NumCPU())

time.Sleep(policyDefaultWindow)
// need approximately two full windows, times the inverse downscale factor
time.Sleep(policyDefaultWindow * defaultPolicyTargetWindow * time.Duration(1/policyDefaultDownscale))
require.Equal(1, dp.State().Running)

err = dp.Stop()
Expand Down

0 comments on commit ba00b7e

Please sign in to comment.