Skip to content

Commit

Permalink
Fix ratelimit counter issue when using multiple descriptors (#443)
Browse files Browse the repository at this point in the history
Signed-off-by: chashikajw <[email protected]>
  • Loading branch information
chashikajw authored Sep 14, 2023
1 parent b1f66f2 commit e0f9f0e
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 41 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,12 @@ As well Ratelimit supports TLS connections and authentication. These can be conf
1. `REDIS_AUTH` & `REDIS_PERSECOND_AUTH`: set to `"username:password"` to enable username-password authentication to the redis host.
1. `CACHE_KEY_PREFIX`: a string to prepend to all cache keys
For controlling the behavior of cache key incrementation when any of them is already over the limit, you can use the following configuration:
1. `STOP_CACHE_KEY_INCREMENT_WHEN_OVERLIMIT`: Set this configuration to `true` to disallow key incrementation when one of the keys is already over the limit.
`STOP_CACHE_KEY_INCREMENT_WHEN_OVERLIMIT` is useful when multiple descriptors are included in a single request. Setting this to `true` can prevent the incrementation of other descriptors' counters if any of the descriptors is already over the limit.
## Redis type
Ratelimit supports different types of redis deployments:
Expand Down
8 changes: 8 additions & 0 deletions src/limiter/base_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (this *BaseRateLimiter) IsOverLimitWithLocalCache(key string) bool {
return false
}

func (this *BaseRateLimiter) IsOverLimitThresholdReached(limitInfo *LimitInfo) bool {
limitInfo.overLimitThreshold = limitInfo.limit.Limit.RequestsPerUnit
if limitInfo.limitAfterIncrease > limitInfo.overLimitThreshold {
return true
}
return false
}

// Generates response descriptor status based on cache key, over the limit with local cache, over the limit and
// near the limit thresholds. Thresholds are checked in order and are mutually exclusive.
func (this *BaseRateLimiter) GetResponseDescriptorStatus(key string, limitInfo *LimitInfo,
Expand Down
1 change: 1 addition & 0 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
s.NearLimitRatio,
s.CacheKeyPrefix,
statsManager,
s.StopCacheKeyIncrementWhenOverlimit,
)
}
133 changes: 107 additions & 26 deletions src/redis/fixed_cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,20 @@ type fixedRateLimitCacheImpl struct {
// If this client is nil, then the Cache will use the client for all
// limits regardless of unit. If this client is not nil, then it
// is used for limits that have a SECOND unit.
perSecondClient Client
baseRateLimiter *limiter.BaseRateLimiter
perSecondClient Client
stopCacheKeyIncrementWhenOverlimit bool
baseRateLimiter *limiter.BaseRateLimiter
}

func pipelineAppend(client Client, pipeline *Pipeline, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) {
*pipeline = client.PipeAppend(*pipeline, result, "INCRBY", key, hitsAddend)
*pipeline = client.PipeAppend(*pipeline, nil, "EXPIRE", key, expirationSeconds)
}

func pipelineAppendtoGet(client Client, pipeline *Pipeline, key string, result *uint32) {
*pipeline = client.PipeAppend(*pipeline, result, "GET", key)
}

func (this *fixedRateLimitCacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
Expand All @@ -51,31 +56,97 @@ func (this *fixedRateLimitCacheImpl) DoLimit(

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))
results := make([]uint32, len(request.Descriptors))
var pipeline, perSecondPipeline Pipeline
currentCount := make([]uint32, len(request.Descriptors))
var pipeline, perSecondPipeline, pipelineToGet, perSecondPipelineToGet Pipeline

hitsAddendForRedis := hitsAddend
overlimitIndexes := make([]bool, len(request.Descriptors))
nearlimitIndexes := make([]bool, len(request.Descriptors))
isCacheKeyOverlimit := false

if this.stopCacheKeyIncrementWhenOverlimit {
// Check if any of the keys are reaching to the over limit in redis cache.
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}

hitAddendForRedis := hitsAddend
overlimitIndex := -1
// Now, actually setup the pipeline, skipping empty cache keys.
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
// Check if key is over the limit in local cache.
if this.baseRateLimiter.IsOverLimitWithLocalCache(cacheKey.Key) {
if limits[i].ShadowMode {
logger.Debugf("Cache key %s would be rate limited but shadow mode is enabled on this rule", cacheKey.Key)
} else {
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
}
isOverLimitWithLocalCache[i] = true
hitsAddendForRedis = 0
overlimitIndexes[i] = true
isCacheKeyOverlimit = true
continue
} else {
if this.perSecondClient != nil && cacheKey.PerSecond {
if perSecondPipelineToGet == nil {
perSecondPipelineToGet = Pipeline{}
}
pipelineAppendtoGet(this.perSecondClient, &perSecondPipelineToGet, cacheKey.Key, &currentCount[i])
} else {
if pipelineToGet == nil {
pipelineToGet = Pipeline{}
}
pipelineAppendtoGet(this.client, &pipelineToGet, cacheKey.Key, &currentCount[i])
}
}
}

// Check if key is over the limit in local cache.
if this.baseRateLimiter.IsOverLimitWithLocalCache(cacheKey.Key) {
if limits[i].ShadowMode {
logger.Debugf("Cache key %s would be rate limited but shadow mode is enabled on this rule", cacheKey.Key)
} else {
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
// Only if none of the cache keys are over the limit, call Redis to check whether cache keys are getting overlimited.
if len(cacheKeys) > 1 && !isCacheKeyOverlimit {
if pipelineToGet != nil {
checkError(this.client.PipeDo(pipelineToGet))
}
if perSecondPipelineToGet != nil {
checkError(this.perSecondClient.PipeDo(perSecondPipelineToGet))
}

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}
// Now fetch the pipeline.
limitBeforeIncrease := currentCount[i]
limitAfterIncrease := limitBeforeIncrease + hitsAddend

limitInfo := limiter.NewRateLimitInfo(limits[i], limitBeforeIncrease, limitAfterIncrease, 0, 0)

if this.baseRateLimiter.IsOverLimitThresholdReached(limitInfo) {
hitsAddendForRedis = 0
nearlimitIndexes[i] = true
}
}
}
} else {
// Check if any of the keys are reaching to the over limit in redis cache.
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}

// Check if key is over the limit in local cache.
if this.baseRateLimiter.IsOverLimitWithLocalCache(cacheKey.Key) {
if limits[i].ShadowMode {
logger.Debugf("Cache key %s would be rate limited but shadow mode is enabled on this rule", cacheKey.Key)
} else {
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
}
isOverLimitWithLocalCache[i] = true
overlimitIndexes[i] = true
continue
}
isOverLimitWithLocalCache[i] = true
hitAddendForRedis = 0
overlimitIndex = i
continue
}
}

// Now, actually setup the pipeline, skipping empty cache keys.
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || overlimitIndex == i {
if cacheKey.Key == "" || overlimitIndexes[i] {
continue
}

Expand All @@ -91,12 +162,20 @@ func (this *fixedRateLimitCacheImpl) DoLimit(
if perSecondPipeline == nil {
perSecondPipeline = Pipeline{}
}
pipelineAppend(this.perSecondClient, &perSecondPipeline, cacheKey.Key, hitAddendForRedis, &results[i], expirationSeconds)
if nearlimitIndexes[i] {
pipelineAppend(this.perSecondClient, &perSecondPipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds)
} else {
pipelineAppend(this.perSecondClient, &perSecondPipeline, cacheKey.Key, hitsAddendForRedis, &results[i], expirationSeconds)
}
} else {
if pipeline == nil {
pipeline = Pipeline{}
}
pipelineAppend(this.client, &pipeline, cacheKey.Key, hitAddendForRedis, &results[i], expirationSeconds)
if nearlimitIndexes[i] {
pipelineAppend(this.client, &pipeline, cacheKey.Key, hitsAddend, &results[i], expirationSeconds)
} else {
pipelineAppend(this.client, &pipeline, cacheKey.Key, hitsAddendForRedis, &results[i], expirationSeconds)
}
}
}

Expand Down Expand Up @@ -138,10 +217,12 @@ func (this *fixedRateLimitCacheImpl) DoLimit(
func (this *fixedRateLimitCacheImpl) Flush() {}

func NewFixedRateLimitCacheImpl(client Client, perSecondClient Client, timeSource utils.TimeSource,
jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string, statsManager stats.Manager) limiter.RateLimitCache {
jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string, statsManager stats.Manager,
stopCacheKeyIncrementWhenOverlimit bool) limiter.RateLimitCache {
return &fixedRateLimitCacheImpl{
client: client,
perSecondClient: perSecondClient,
baseRateLimiter: limiter.NewBaseRateLimit(timeSource, jitterRand, expirationJitterMaxSeconds, localCache, nearLimitRatio, cacheKeyPrefix, statsManager),
client: client,
perSecondClient: perSecondClient,
stopCacheKeyIncrementWhenOverlimit: stopCacheKeyIncrementWhenOverlimit,
baseRateLimiter: limiter.NewBaseRateLimit(timeSource, jitterRand, expirationJitterMaxSeconds, localCache, nearLimitRatio, cacheKeyPrefix, statsManager),
}
}
11 changes: 6 additions & 5 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ type Settings struct {
RuntimeWatchRoot bool `envconfig:"RUNTIME_WATCH_ROOT" default:"true"`

// Settings for all cache types
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"`
NearLimitRatio float32 `envconfig:"NEAR_LIMIT_RATIO" default:"0.8"`
CacheKeyPrefix string `envconfig:"CACHE_KEY_PREFIX" default:""`
BackendType string `envconfig:"BACKEND_TYPE" default:"redis"`
ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"`
LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"`
NearLimitRatio float32 `envconfig:"NEAR_LIMIT_RATIO" default:"0.8"`
CacheKeyPrefix string `envconfig:"CACHE_KEY_PREFIX" default:""`
BackendType string `envconfig:"BACKEND_TYPE" default:"redis"`
StopCacheKeyIncrementWhenOverlimit bool `envconfig:"STOP_CACHE_KEY_INCREMENT_WHEN_OVERLIMIT" default:"false"`

// Settings for optional returning of custom headers
RateLimitResponseHeadersEnabled bool `envconfig:"LIMIT_RESPONSE_HEADERS_ENABLED" default:"false"`
Expand Down
15 changes: 13 additions & 2 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func makeSimpleRedisSettings(redisPort int, perSecondPort int, perSecond bool, l
return s
}

func makeSimpleRedisSettingsWithStopCacheKeyIncrementWhenOverlimit(redisPort int, perSecondPort int, perSecond bool, localCacheSize int) settings.Settings {
s := makeSimpleRedisSettings(redisPort, perSecondPort, perSecond, localCacheSize)

s.StopCacheKeyIncrementWhenOverlimit = true
return s
}

func TestBasicConfig(t *testing.T) {
common.WithMultiRedis(t, []common.RedisConfig{
{Port: 6383},
Expand All @@ -93,6 +100,10 @@ func TestBasicConfig(t *testing.T) {
cacheSettings := makeSimpleRedisSettings(6383, 6380, false, 0)
cacheSettings.CacheKeyPrefix = "prefix:"
t.Run("WithoutPerSecondRedisWithCachePrefix", testBasicConfig(cacheSettings))
t.Run("WithoutPerSecondRedisWithstopCacheKeyIncrementWhenOverlimitConfig", testBasicConfig(makeSimpleRedisSettingsWithStopCacheKeyIncrementWhenOverlimit(6383, 6380, false, 0)))
t.Run("WithPerSecondRedisWithstopCacheKeyIncrementWhenOverlimitConfig", testBasicConfig(makeSimpleRedisSettingsWithStopCacheKeyIncrementWhenOverlimit(6383, 6380, true, 0)))
t.Run("WithoutPerSecondRedisWithLocalCacheAndstopCacheKeyIncrementWhenOverlimitConfig", testBasicConfig(makeSimpleRedisSettingsWithStopCacheKeyIncrementWhenOverlimit(6383, 6380, false, 1000)))
t.Run("WithPerSecondRedisWithLocalCacheAndstopCacheKeyIncrementWhenOverlimitConfig", testBasicConfig(makeSimpleRedisSettingsWithStopCacheKeyIncrementWhenOverlimit(6383, 6380, true, 1000)))
})
}

Expand Down Expand Up @@ -594,8 +605,8 @@ func testBasicBaseConfig(s settings.Settings) func(*testing.T) {
limitRemaining2 = 0
// Ceased incrementing cached keys upon exceeding the overall rate limit in the Redis cache flow.
// Consequently, the remaining limit should remain unaltered.
if enable_local_cache && s.BackendType != "memcache" {
limitRemaining1 = 9
if s.StopCacheKeyIncrementWhenOverlimit && s.BackendType != "memcache" {
limitRemaining1 = 10
}
}
durRemaining1 := response.GetStatuses()[0].DurationUntilReset
Expand Down
2 changes: 1 addition & 1 deletion test/redis/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil)
defer client.Close()

cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm)
cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true)
request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1)
limits := []*config.RateLimit{config.NewRateLimit(1000000000, pb.RateLimitResponse_RateLimit_SECOND, sm.NewStats("key_value"), false, false, "", nil, false)}

Expand Down
Loading

0 comments on commit e0f9f0e

Please sign in to comment.