Skip to content

Commit

Permalink
Add throttle limit distribution (#587)
Browse files Browse the repository at this point in the history
* Add distributed throttling

* Fix linter

* Rename test

* Fix comment

* Optimizations

* Beautify test

* limit in distribution

* rework rebuildBuckets

* Sync distribution with redis

* Fix distribution receivers

* fix update key limit

* rename sharded to distributed

* recreate buckets in updateDistribution

* fix race

* Beautify

* Add limit distribution metrics

* Fix

* Fix linter
  • Loading branch information
kirillov6 authored Aug 15, 2024
1 parent 11cbb41 commit 2533c1b
Show file tree
Hide file tree
Showing 13 changed files with 1,715 additions and 412 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ linters-settings:
govet:
enable:
- composites
disable:
- printf
dupl:
threshold: 120
goconst:
Expand Down
65 changes: 63 additions & 2 deletions plugin/action/throttle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ Time interval to check event throughput.
**`rules`** *`[]RuleConfig`*

Rules to override the `default_limit` for different group of event. It's a list of objects.
Each object has the `limit` and `conditions` fields.
Each object has the `limit`, `limit_kind` and `conditions` fields as well as an optional `limit_distribution` field.
* `limit` – the value which will override the `default_limit`, if `conditions` are met.
* `limit_kind` – the type of a limit: `count` - number of messages, `size` - total size from all messages
* `limit_kind` – the type of limit: `count` - number of messages, `size` - total size from all messages
* `conditions` – the map of `event field name => event field value`. The conditions are checked using `AND` operator.
* `limit_distribution` – see `LimitDistributionConfig` for details.

<br>

Expand All @@ -78,6 +79,41 @@ Time interval after which unused limiters are removed.

<br>

**`limit_distribution`** *`LimitDistributionConfig`*

It allows to distribute the `default_limit` between events by condition.

`LimitDistributionConfig` params:
* `field` - the event field on which the distribution will be based.
* `ratios` - the list of objects. Each object has:
* `ratio` - distribution ratio, value must be in range [0.0;1.0].
* `values` - the list of strings which contains all `field` values that fall into this distribution.
* `metric_labels` - list of metric labels.

> Notes:
> 1. Sum of ratios must be in range [0.0;1.0].
> 2. If sum of ratios less than 1, then adding **default distribution** with ratio **1-sum**,
> otherwise **default distribution** isn't used.
> All events for which the value in the `field` doesn't fall into any of the distributions:
> * fall into default distribution, if it exists
> * throttled, otherwise
`LimitDistributionConfig` example:
```yaml
field: log.level
ratios:
- ratio: 0.5
values: ['error']
- ratio: 0.3
values: ['warn', 'info']
```
For this config and the `default_limit=100`:
* events with `log.level=error` will have a limit of 50
* events with `log.level=warn` or `log.level=info` will have a limit of 30
* all other events will have a limit of 20

<br>

**`endpoint`** *`string`*


Expand Down Expand Up @@ -140,5 +176,30 @@ If not set limiter values are considered as non-json data.

<br>

**`limiter_distribution_field`** *`string`*

Defines field with limit distribution inside json object stored in value
(e.g. if set to "distribution", value must be of kind `{"distribution":{<object>},...}`).
Distribution object example:
```json
{
"field": "log.level",
"ratios": [
{
"ratio": 0.5,
"values": ["error"]
},
{
"ratio": 0.3,
"values": ["warn", "info"]
}
],
"enabled": true
}
```
> If `limiter_value_field` and `limiter_distribution_field` not set, distribution will not be stored.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
225 changes: 225 additions & 0 deletions plugin/action/throttle/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package throttle

import (
"time"
)

type buckets interface {
get(index, distrIndex int) int64
getAll(index int, buf []int64) []int64
set(index, distrIndex int, value int64)
reset(index int)
add(index, distrIndex int, value int64)
isEmpty(index int) bool
// rebuild will rebuild buckets and returns actual bucket id
rebuild(currentTs, ts time.Time) int

// actualizeIndex checks probable shift of buckets and returns actual bucket index and actuality
actualizeIndex(maxID, index int) (int, bool)
getCount() int
getInterval() time.Duration
getMinID() int

// for testing only
isSimple() bool
}

func newBuckets(count, distributionSize int, interval time.Duration) buckets {
if distributionSize == 1 {
return newSimpleBuckets(count, interval)
}
return newDistributedBuckets(count, distributionSize, interval)
}

type bucketsMeta struct {
count int
interval time.Duration
minID int // min bucket id
maxID int // max bucket id
}

func newBucketsMeta(count int, interval time.Duration) bucketsMeta {
return bucketsMeta{
count: count,
interval: interval,
}
}

func (m bucketsMeta) getCount() int {
return m.count
}

func (m bucketsMeta) getInterval() time.Duration {
return m.interval
}

func (m bucketsMeta) getMinID() int {
return m.minID
}

func (m bucketsMeta) actualizeIndex(maxID, index int) (int, bool) {
if m.maxID == maxID {
return index, true
}

// buckets were rebuild during some operations, calc actual index
shift := m.maxID - maxID
actualIndex := index - shift
return actualIndex, actualIndex > 0
}

// timeToBucketID converts time to bucketID
func (m bucketsMeta) timeToBucketID(t time.Time) int {
return int(t.UnixNano() / m.interval.Nanoseconds())
}

type simpleBuckets struct {
bucketsMeta
b []int64
}

func newSimpleBuckets(count int, interval time.Duration) *simpleBuckets {
return &simpleBuckets{
bucketsMeta: newBucketsMeta(count, interval),
b: make([]int64, count),
}
}

func (b *simpleBuckets) get(index, _ int) int64 {
return b.b[index]
}

func (b *simpleBuckets) getAll(index int, buf []int64) []int64 {
return append(buf, b.b[index])
}

func (b *simpleBuckets) set(index, _ int, value int64) {
b.b[index] = value
}

func (b *simpleBuckets) reset(index int) {
b.b[index] = 0
}

func (b *simpleBuckets) add(index, _ int, value int64) {
b.b[index] += value
}

func (b *simpleBuckets) isEmpty(index int) bool {
return b.b[index] == 0
}

func (b *simpleBuckets) rebuild(currentTs, ts time.Time) int {
resetFn := func(count int) {
b.b = append(b.b[count:], b.b[:count]...)
for i := 0; i < count; i++ {
b.reset(b.getCount() - 1 - i)
}
}

return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
}

func (b *simpleBuckets) isSimple() bool {
return true
}

type distributedBuckets struct {
bucketsMeta
b []distributedBucket
}

func newDistributedBuckets(count, distributionSize int, interval time.Duration) *distributedBuckets {
db := &distributedBuckets{
bucketsMeta: newBucketsMeta(count, interval),
b: make([]distributedBucket, count),
}
for i := 0; i < count; i++ {
db.b[i] = newDistributedBucket(distributionSize)
}
return db
}

func (b *distributedBuckets) get(index, distrIndex int) int64 {
return b.b[index][distrIndex]
}

func (b *distributedBuckets) getAll(index int, buf []int64) []int64 {
return b.b[index].copyTo(buf)
}

func (b *distributedBuckets) set(index, distrIndex int, value int64) {
b.b[index][distrIndex] = value
}

func (b *distributedBuckets) reset(index int) {
for i := range b.b[index] {
b.b[index][i] = 0
}
}

func (b *distributedBuckets) add(index, distrIndex int, value int64) {
b.b[index][distrIndex] += value
}

func (b *distributedBuckets) isEmpty(index int) bool {
for _, v := range b.b[index] {
if v > 0 {
return false
}
}
return true
}

func (b *distributedBuckets) rebuild(currentTs, ts time.Time) int {
resetFn := func(count int) {
b.b = append(b.b[count:], b.b[:count]...)
for i := 0; i < count; i++ {
b.reset(b.getCount() - 1 - i)
}
}

return rebuildBuckets(&b.bucketsMeta, resetFn, currentTs, ts)
}

func (b *distributedBuckets) isSimple() bool {
return false
}

type distributedBucket []int64

func newDistributedBucket(size int) distributedBucket {
return make(distributedBucket, size)
}

func (s distributedBucket) copyTo(buf distributedBucket) distributedBucket {
return append(buf, s...)
}

func rebuildBuckets(meta *bucketsMeta, resetFn func(int), currentTs, ts time.Time) int {
currentID := meta.timeToBucketID(currentTs)
if meta.minID == 0 {
// min id weren't set yet. It MUST be extracted from currentTs, because ts from event can be invalid (e.g. from 1970 or 2077 year)
meta.maxID = currentID
meta.minID = meta.maxID - meta.count + 1
}
maxID := meta.minID + meta.count - 1

// currentID exceed maxID, actualize buckets
if currentID > maxID {
dif := currentID - maxID
n := min(dif, meta.count)
// reset old buckets
resetFn(n)
// update ids
meta.minID += dif
meta.maxID = currentID
}

id := meta.timeToBucketID(ts)
// events from past or future goes to the latest bucket
if id < meta.minID || id > meta.maxID {
id = meta.maxID
}
return id
}
Loading

0 comments on commit 2533c1b

Please sign in to comment.