Skip to content

Commit

Permalink
client/controller: wait for tokens on response to reduce the debt (#8460
Browse files Browse the repository at this point in the history
)

close #8457

client/controller: wait for tokens on response to reduce the debt 
- Reduce the debt ceiling by the big request should wait for the tokens
- Add more metrics

Signed-off-by: nolouch <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Aug 5, 2024
1 parent f758f8e commit 39d8159
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 43 deletions.
162 changes: 123 additions & 39 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
slowNotifyFilterDuration = 10 * time.Millisecond

watchRetryInterval = 30 * time.Second

bigRequestThreshold = 4 * 1024 * 1024 // 4MB -> 16 RRU
)

type selectType int
Expand All @@ -69,6 +71,9 @@ type ResourceGroupKVInterceptor interface {
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// OnResponseWait is used to consume tokens after receiving a response. If the response requires many tokens, we need to wait for the tokens.
// This is an optimized version of OnResponse for cases where the response requires many tokens, making the debt smaller and smoother.
OnResponseWait(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, time.Duration, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}
Expand Down Expand Up @@ -642,7 +647,7 @@ func (c *ResourceGroupsController) OnRequestWait(
if err != nil {
return nil, nil, time.Duration(0), 0, err
}
return gc.onRequestWait(ctx, info)
return gc.onRequestWaitImpl(ctx, info)
}

// OnResponse is used to consume tokens after receiving response
Expand All @@ -654,7 +659,19 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return gc.onResponse(req, resp)
return gc.onResponseImpl(req, resp)
}

// OnResponseWait is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponseWait(
ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, time.Duration(0), nil
}
return gc.onResponseWaitImpl(ctx, req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -722,6 +739,8 @@ type groupCostController struct {

// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool
// is throttled
isThrottled *atomic.Bool

lowRUNotifyChan chan<- notifyMsg
tokenBucketUpdateChan chan<- *groupCostController
Expand Down Expand Up @@ -770,6 +789,8 @@ type groupMetricsCollection struct {
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
runningKVRequestCounter prometheus.Gauge
consumeTokenHistogram prometheus.Observer
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Expand All @@ -784,6 +805,8 @@ func initMetrics(oldName, name string) *groupMetricsCollection {
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
runningKVRequestCounter: groupRunningKVRequestCounter.WithLabelValues(name),
consumeTokenHistogram: tokenConsumedHistogram.WithLabelValues(name),
}
}

Expand Down Expand Up @@ -841,6 +864,7 @@ func newGroupCostController(
tokenBucketUpdateChan: tokenBucketUpdateChan,
lowRUNotifyChan: lowRUNotifyChan,
burstable: &atomic.Bool{},
isThrottled: &atomic.Bool{},
}

switch gc.mode {
Expand Down Expand Up @@ -937,7 +961,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption), zap.Bool("is-throttled", gc.isThrottled.Load()))
gc.run.now = newTime
}

Expand Down Expand Up @@ -1018,7 +1042,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
}
gc.burstable.Store(isBurstable)
}
Expand All @@ -1032,7 +1056,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load()))
}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -1179,6 +1203,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
if cfg.NewBurst < 0 {
cfg.NewTokens = float64(counter.getTokenBucketFunc().Settings.FillRate)
}
gc.isThrottled.Store(false)
} else {
// Otherwise the granted token is delivered to the client by fill rate.
cfg.NewTokens = 0
Expand All @@ -1199,6 +1224,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
counter.notify.setupNotificationThreshold = 1
counter.notify.mu.Unlock()
counter.lastDeadline = deadline
gc.isThrottled.Store(true)
select {
case gc.tokenBucketUpdateChan <- gc:
default:
Expand Down Expand Up @@ -1317,7 +1343,55 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {
return value
}

func (gc *groupCostController) onRequestWait(
func (gc *groupCostController) acquireTokens(ctx context.Context, delta *rmpb.Consumption, waitDuration *time.Duration, allowDebt bool) (time.Duration, error) {
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
var (
err error
d time.Duration
)
retryLoop:
for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ {
now := time.Now()
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
gc.metrics.consumeTokenHistogram.Observe(v)
}
// allow debt for small request or not in throttled. remove tokens directly.
if allowDebt {
counter.limiter.RemoveTokens(now, v)
break retryLoop
}
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
*waitDuration += gc.mainCfg.WaitRetryInterval
}
return d, err
}

func (gc *groupCostController) onRequestWaitImpl(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
delta := &rmpb.Consumption{}
Expand All @@ -1331,38 +1405,7 @@ func (gc *groupCostController) onRequestWait(
var waitDuration time.Duration

if !gc.burstable.Load() {
var err error
now := time.Now()
var i int
var d time.Duration
retryLoop:
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
d, err := gc.acquireTokens(ctx, delta, &waitDuration, false)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
Expand Down Expand Up @@ -1399,7 +1442,7 @@ func (gc *groupCostController) onRequestWait(
return delta, penalty, waitDuration, gc.getMeta().GetPriority(), nil
}

func (gc *groupCostController) onResponse(
func (gc *groupCostController) onResponseImpl(
req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
Expand Down Expand Up @@ -1440,6 +1483,47 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

func (gc *groupCostController) onResponseWaitImpl(
ctx context.Context, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
add(gc.mu.storeCounter[req.StoreID()], count)
add(gc.mu.globalCounter, count)
gc.mu.Unlock()

return delta, waitDuration, nil
}

// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
Expand Down
46 changes: 43 additions & 3 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, _, priority, err := gc.onRequestWaitImpl(context.TODO(), testCase.req)
re.NoError(err, caseNum)
re.Equal(priority, gc.meta.Priority)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
kvCalculator.calculateWriteCost(expectedConsumption, testCase.req)
re.Equal(expectedConsumption.WRU, consumption.WRU)
}
consumption, err = gc.onResponse(testCase.req, testCase.resp)
consumption, err = gc.onResponseImpl(testCase.req, testCase.resp)
re.NoError(err, caseNum)
kvCalculator.calculateReadCost(expectedConsumption, testCase.resp)
kvCalculator.calculateCPUCost(expectedConsumption, testCase.resp)
Expand All @@ -121,6 +121,46 @@ func TestRequestAndResponseConsumption(t *testing.T) {
}
}

func TestOnResponseWaitConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)

req := &TestRequestInfo{
isWrite: false,
}
resp := &TestResponseInfo{
readBytes: 2000 * 64 * 1024, // 2000RU
succeed: true,
}

consumption, waitTIme, err := gc.onResponseWaitImpl(context.TODO(), req, resp)
re.NoError(err)
re.Zero(waitTIme)
verify := func() {
expectedConsumption := &rmpb.Consumption{}
kvCalculator := gc.getKVCalculator()
kvCalculator.calculateReadCost(expectedConsumption, resp)
re.Equal(expectedConsumption.RRU, consumption.RRU)
}
verify()

// modify the counter, then on response should has wait time.
counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU]
gc.modifyTokenCounter(counter, &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 1000,
BurstLimit: 1000,
},
},
int64(5*time.Second/time.Millisecond),
)

consumption, waitTIme, err = gc.onResponseWaitImpl(context.TODO(), req, resp)
re.NoError(err)
re.NotZero(waitTIme)
verify()
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
Expand All @@ -129,7 +169,7 @@ func TestResourceGroupThrottledError(t *testing.T) {
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
_, _, _, _, err := gc.onRequestWaitImpl(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand Down
Loading

0 comments on commit 39d8159

Please sign in to comment.