Skip to content

Commit

Permalink
limiter: naming refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Feb 20, 2024
1 parent 7f40736 commit a648e3e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 101 deletions.
63 changes: 32 additions & 31 deletions limiter/alimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
checkDelay = time.Second * 60
)

// calc reserved slots count based on load average (for protect overload)
func getWeighted(n, max int) int {
if n <= 0 {
return 0
Expand All @@ -35,31 +36,31 @@ func getWeighted(n, max int) int {

// ALimiter provide limiter amount of requests/concurrently executing requests (adaptive with load avg)
type ALimiter struct {
l limiter
cL limiter
max int
n int
limiter limiter
concurrentLimiter limiter
concurrent int
n int

m metrics.WaitMetric
}

// NewServerLimiter creates a limiter for specific servers list.
func NewALimiter(l, max, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 && max <= 0 {
func NewALimiter(capacity, concurrent, n int, enableMetrics bool, scope, sub string) ServerLimiter {
if capacity <= 0 && concurrent <= 0 {
return NoopLimiter{}
}
if n >= max {
n = max - 1
if n >= concurrent {
n = concurrent - 1
}
if n <= 0 {
return NewWLimiter(l, max, enableMetrics, scope, sub)
return NewWLimiter(capacity, concurrent, enableMetrics, scope, sub)
}

a := &ALimiter{
m: metrics.NewWaitMetric(enableMetrics, scope, sub), max: max, n: n,
m: metrics.NewWaitMetric(enableMetrics, scope, sub), concurrent: concurrent, n: n,
}
a.cL.ch = make(chan struct{}, max)
a.cL.cap = max
a.concurrentLimiter.ch = make(chan struct{}, concurrent)
a.concurrentLimiter.cap = concurrent

go a.balance()

Expand All @@ -70,17 +71,17 @@ func (sl *ALimiter) balance() int {
var last int
for {
start := time.Now()
n := getWeighted(sl.n, sl.max)
n := getWeighted(sl.n, sl.concurrent)
if n > last {
for i := 0; i < n-last; i++ {
if sl.cL.enter(ctxMain, "balance") != nil {
if sl.concurrentLimiter.enter(ctxMain, "balance") != nil {
break
}
}
last = n
} else if n < last {
for i := 0; i < last-n; i++ {
sl.cL.leave(ctxMain, "balance")
sl.concurrentLimiter.leave(ctxMain, "balance")
}
last = n
}
Expand All @@ -92,20 +93,20 @@ func (sl *ALimiter) balance() int {
}

func (sl *ALimiter) Capacity() int {
return sl.l.capacity()
return sl.limiter.capacity()
}

func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.enter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.enter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -117,16 +118,16 @@ func (sl *ALimiter) Enter(ctx context.Context, s string) (err error) {

// TryEnter claims one of free slots without blocking.
func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {
if sl.l.cap > 0 {
if err = sl.l.tryEnter(ctx, s); err != nil {
if sl.limiter.cap > 0 {
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
return
}
}
if sl.cL.cap > 0 {
if sl.cL.tryEnter(ctx, s) != nil {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.concurrentLimiter.cap > 0 {
if sl.concurrentLimiter.tryEnter(ctx, s) != nil {
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.m.WaitErrors.Add(1)
err = ErrTimeout
Expand All @@ -138,10 +139,10 @@ func (sl *ALimiter) TryEnter(ctx context.Context, s string) (err error) {

// Frees a slot in limiter
func (sl *ALimiter) Leave(ctx context.Context, s string) {
if sl.l.cap > 0 {
sl.l.leave(ctx, s)
if sl.limiter.cap > 0 {
sl.limiter.leave(ctx, s)
}
sl.cL.leave(ctx, s)
sl.concurrentLimiter.leave(ctx, s)
}

// SendDuration send StatsD duration iming
Expand Down
24 changes: 12 additions & 12 deletions limiter/alimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,70 +47,70 @@ func Test_getWeighted(t *testing.T) {
}

func TestNewALimiter(t *testing.T) {
l := 14
c := 12
capacity := 14
concurrent := 12
n := 10
checkDelay = time.Millisecond * 10
limiter := NewALimiter(l, c, n, false, "", "")
limiter := NewALimiter(capacity, concurrent, n, false, "", "")

// inital - load not collected
load_avg.Store(0)

var i int
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c; i++ {
for i = 0; i < concurrent; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// load_avg 0.5
load_avg.Store(0.5)
k := getWeighted(n, c)
k := getWeighted(n, concurrent)
require.Equal(t, 0, k)

// load_avg 0.6
load_avg.Store(0.6)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n*6/10, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100)
for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 0.5 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-k; i++ {
for i = 0; i < concurrent-k; i++ {
limiter.Leave(ctx, "render")
}

cancel()

// // load_avg 1
load_avg.Store(1)
k = getWeighted(n, c)
k = getWeighted(n, concurrent)
require.Equal(t, n, k)

time.Sleep(checkDelay * 2)

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
require.NoError(t, limiter.Enter(ctx, "render"), "try to lock with load_avg = 1 [%d]", i)
}

require.Error(t, limiter.Enter(ctx, "render"))

for i = 0; i < c-n; i++ {
for i = 0; i < concurrent-n; i++ {
limiter.Leave(ctx, "render")
}

Expand Down
38 changes: 19 additions & 19 deletions limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,62 @@ type limiter struct {

// Limiter provides interface to limit amount of requests
type Limiter struct {
l limiter
m metrics.WaitMetric
limiter limiter
metrics metrics.WaitMetric
}

// NewServerLimiter creates a limiter for specific servers list.
func NewLimiter(l int, enableMetrics bool, scope, sub string) ServerLimiter {
if l <= 0 {
func NewLimiter(capacity int, enableMetrics bool, scope, sub string) ServerLimiter {
if capacity <= 0 {
return NoopLimiter{}
}

return &Limiter{
l: limiter{
ch: make(chan struct{}, l),
cap: l,
limiter: limiter{
ch: make(chan struct{}, capacity),
cap: capacity,
},
m: metrics.NewWaitMetric(enableMetrics, scope, sub),
metrics: metrics.NewWaitMetric(enableMetrics, scope, sub),
}
}

func (sl *Limiter) Capacity() int {
return sl.l.capacity()
return sl.limiter.capacity()
}

// Enter claims one of free slots or blocks until there is one.
func (sl *Limiter) Enter(ctx context.Context, s string) (err error) {
if err = sl.l.enter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
if err = sl.limiter.enter(ctx, s); err != nil {
sl.metrics.WaitErrors.Add(1)
}
sl.m.Requests.Add(1)
sl.metrics.Requests.Add(1)
return
}

// TryEnter claims one of free slots without blocking.
func (sl *Limiter) TryEnter(ctx context.Context, s string) (err error) {
if err = sl.l.tryEnter(ctx, s); err != nil {
sl.m.WaitErrors.Add(1)
if err = sl.limiter.tryEnter(ctx, s); err != nil {
sl.metrics.WaitErrors.Add(1)
}
sl.m.Requests.Add(1)
sl.metrics.Requests.Add(1)
return
}

// Frees a slot in limiter
func (sl *Limiter) Leave(ctx context.Context, s string) {
sl.l.leave(ctx, s)
sl.limiter.leave(ctx, s)
}

// SendDuration send StatsD duration iming
func (sl *Limiter) SendDuration(queueMs int64) {
if sl.m.WaitTimeName != "" {
metrics.Gstatsd.Timing(sl.m.WaitTimeName, queueMs, 1.0)
if sl.metrics.WaitTimeName != "" {
metrics.Gstatsd.Timing(sl.metrics.WaitTimeName, queueMs, 1.0)
}
}

// Unregiter unregister graphite metric
func (sl *Limiter) Unregiter() {
sl.m.Unregister()
sl.metrics.Unregister()
}

// Enabled return enabled flag, if false - it's a noop limiter and can be safely skiped
Expand Down
Loading

0 comments on commit a648e3e

Please sign in to comment.