Skip to content

Commit

Permalink
Merge pull request #220 from safing/feature/call-limiter
Browse files Browse the repository at this point in the history
Add call limiter
  • Loading branch information
dhaavi authored Oct 2, 2023
2 parents 900a654 + 2ca78b1 commit 433ad6b
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 4 deletions.
87 changes: 87 additions & 0 deletions utils/call_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package utils

import (
"sync"
"sync/atomic"
"time"
)

// CallLimiter bundles concurrent calls and optionally limits how fast a function is called.
type CallLimiter struct {
pause time.Duration

inLock sync.Mutex
lastExec time.Time

waiters atomic.Int32
outLock sync.Mutex
}

// NewCallLimiter returns a new call limiter.
// Set minPause to zero to disable the minimum pause between calls.
func NewCallLimiter(minPause time.Duration) *CallLimiter {
return &CallLimiter{
pause: minPause,
}
}

// Do executes the given function.
// All concurrent calls to Do are bundled and return when f() finishes.
// Waits until the minimum pause is over before executing f() again.
func (l *CallLimiter) Do(f func()) {
// Wait for the previous waiters to exit.
l.inLock.Lock()

// Defer final unlock to safeguard from panics.
defer func() {
// Execution is finished - leave.
// If we are the last waiter, let the next batch in.
if l.waiters.Add(-1) == 0 {
l.inLock.Unlock()
}
}()

// Check if we are the first waiter.
if l.waiters.Add(1) == 1 {
// Take the lead on this execution run.
l.lead(f)
} else {
// We are not the first waiter, let others in.
l.inLock.Unlock()
}

// Wait for execution to complete.
l.outLock.Lock()
l.outLock.Unlock() //nolint:staticcheck

// Last statement is in defer above.
}

func (l *CallLimiter) lead(f func()) {
// Make all others wait while we execute the function.
l.outLock.Lock()

// Unlock in lock until execution is finished.
l.inLock.Unlock()

// Transition from out lock to in lock when done.
defer func() {
// Update last execution time.
l.lastExec = time.Now().UTC()
// Stop newcomers from waiting on previous execution.
l.inLock.Lock()
// Allow waiters to leave.
l.outLock.Unlock()
}()

// Wait for the minimum duration between executions.
if l.pause > 0 {
sinceLastExec := time.Since(l.lastExec)
if sinceLastExec < l.pause {
time.Sleep(l.pause - sinceLastExec)
}
}

// Execute.
f()
}
71 changes: 71 additions & 0 deletions utils/call_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package utils

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/tevino/abool"
)

func TestCallLimiter(t *testing.T) {
t.Parallel()

pause := 10 * time.Millisecond
oa := NewCallLimiter(pause)
executed := abool.New()
var testWg sync.WaitGroup

// One execution should gobble up the whole batch.
// We are doing this without sleep in function, so dummy exec first to trigger first pause.
oa.Do(func() {})
// Start
for i := 0; i < 10; i++ {
testWg.Add(100)
for i := 0; i < 100; i++ {
go func() {
oa.Do(func() {
if !executed.SetToIf(false, true) {
t.Errorf("concurrent execution!")
}
})
testWg.Done()
}()
}
testWg.Wait()
// Check if function was executed at least once.
if executed.IsNotSet() {
t.Errorf("no execution!")
}
executed.UnSet() // reset check
}

// Wait for pause to reset.
time.Sleep(pause)

// Continuous use with re-execution.
// Choose values so that about 10 executions are expected
var execs uint32
testWg.Add(200)
for i := 0; i < 200; i++ {
go func() {
oa.Do(func() {
atomic.AddUint32(&execs, 1)
time.Sleep(10 * time.Millisecond)
})
testWg.Done()
}()

// Start one goroutine every 1ms.
time.Sleep(1 * time.Millisecond)
}

testWg.Wait()
if execs <= 8 {
t.Errorf("unexpected low exec count: %d", execs)
}
if execs >= 12 {
t.Errorf("unexpected high exec count: %d", execs)
}
}
8 changes: 7 additions & 1 deletion utils/onceagain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import (
"sync/atomic"
)

// OnceAgain is an object that will perform only one action "in flight". It's basically the same as sync.Once, but is automatically reused when the function was executed and everyone who waited has left.
// OnceAgain is an object that will perform only one action "in flight". It's
// basically the same as sync.Once, but is automatically reused when the
// function was executed and everyone who waited has left.
// Important: This is somewhat racy when used heavily as it only resets _after_
// everyone who waited has left. So, while some goroutines are waiting to be
// activated again to leave the waiting state, other goroutines will call Do()
// without executing the function again.
type OnceAgain struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.
Expand Down
10 changes: 7 additions & 3 deletions utils/onceagain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestOnceAgain(t *testing.T) {
executed := abool.New()
var testWg sync.WaitGroup

// basic
// One execution should gobble up the whole batch.
for i := 0; i < 10; i++ {
testWg.Add(100)
for i := 0; i < 100; i++ {
Expand All @@ -34,7 +34,8 @@ func TestOnceAgain(t *testing.T) {
executed.UnSet() // reset check
}

// streaming
// Continuous use with re-execution.
// Choose values so that about 10 executions are expected
var execs uint32
testWg.Add(100)
for i := 0; i < 100; i++ {
Expand All @@ -50,7 +51,10 @@ func TestOnceAgain(t *testing.T) {
}

testWg.Wait()
if execs >= 20 {
if execs <= 8 {
t.Errorf("unexpected low exec count: %d", execs)
}
if execs >= 12 {
t.Errorf("unexpected high exec count: %d", execs)
}
}

0 comments on commit 433ad6b

Please sign in to comment.