Skip to content

Commit

Permalink
rate limiter support
Browse files Browse the repository at this point in the history
  • Loading branch information
Neur0toxine authored Feb 16, 2024
2 parents 08cb056 + f278b73 commit c80d4bf
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 112 deletions.
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@ go 1.22

require (
github.com/google/go-querystring v1.0.0
github.com/maypok86/otter v1.0.0
github.com/stretchr/testify v1.8.1
gopkg.in/h2non/gock.v1 v1.1.2
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/dolthub/swiss v0.2.1 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/maypok86/otter v1.0.0 h1:nP13eaFQrfRQHD1vxEgdlqR9gLHvfW2VcS0hFitglIY=
github.com/maypok86/otter v1.0.0/go.mod h1:koSPT30yWtqMNrFohaywMlgSHCuUg6IVqeDerwIM/Mg=
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4=
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
6 changes: 6 additions & 0 deletions v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func (c *MgClient) WithLogger(logger BasicLogger) *MgClient {
return c
}

// WithLimiter sets the provided limiter instance into the Client.
func (c *MgClient) WithLimiter(limiter *TokensBucket) *MgClient {
c.limiter = limiter
return c
}

// writeLog writes a message to the log.
func (c *MgClient) writeLog(format string, v ...interface{}) {
if c.logger != nil {
Expand Down
81 changes: 81 additions & 0 deletions v1/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package v1

import (
"runtime"
"sync"
"sync/atomic"
"time"
)

type token struct {
rps uint32
lastUse time.Time
}

type TokensBucket struct {
maxRPS uint32
mux sync.Mutex
tokens map[string]*token
unusedTokenTime time.Duration
checkTokenTime time.Duration
cancel atomic.Bool
}

func NewTokensBucket(maxRPS uint32, unusedTokenTime, checkTokenTime time.Duration) *TokensBucket {
bucket := &TokensBucket{
maxRPS: maxRPS,
tokens: map[string]*token{},
unusedTokenTime: unusedTokenTime,
checkTokenTime: checkTokenTime,
}

go bucket.deleteUnusedToken()
runtime.SetFinalizer(bucket, destructBasket)
return bucket
}

func (m *TokensBucket) Obtain(id string) {
m.mux.Lock()
defer m.mux.Unlock()

if _, ok := m.tokens[id]; !ok {
m.tokens[id] = &token{
lastUse: time.Now(),
rps: 1,
}
return
}

sleepTime := time.Second - time.Since(m.tokens[id].lastUse)
if sleepTime < 0 {
m.tokens[id].lastUse = time.Now()
m.tokens[id].rps = 0
} else if m.tokens[id].rps >= m.maxRPS {
time.Sleep(sleepTime)
m.tokens[id].lastUse = time.Now()
m.tokens[id].rps = 0
}
m.tokens[id].rps++
}

func destructBasket(m *TokensBucket) {
m.cancel.Store(true)
}

func (m *TokensBucket) deleteUnusedToken() {
for {
if m.cancel.Load() {
return
}
m.mux.Lock()

for id, token := range m.tokens {
if time.Since(token.lastUse) >= m.unusedTokenTime {
delete(m.tokens, id)
}
}
m.mux.Unlock()

time.Sleep(m.checkTokenTime)
}
}
27 changes: 12 additions & 15 deletions v1/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"net/http"
"strings"
"time"
)

const MaxRPS = 100
Expand Down Expand Up @@ -53,6 +52,12 @@ func (c *MgClient) DeleteRequest(url string, parameters []byte) ([]byte, int, er
)
}

func (c *MgClient) WaitForRateLimit() {
if c.limiter != nil && c.Token != "" {
c.limiter.Obtain(c.Token)
}
}

func makeRequest(reqType, url string, buf io.Reader, c *MgClient) ([]byte, int, error) {
var res []byte
req, err := http.NewRequest(reqType, url, buf)
Expand All @@ -63,22 +68,14 @@ func makeRequest(reqType, url string, buf io.Reader, c *MgClient) ([]byte, int,
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Transport-Token", c.Token)

defer c.mux.Unlock()
c.mux.Lock()
maxAttempt := 1
if c.limiter != nil && c.Token != "" {
maxAttempt = 3
}

attempt := 0
tryAgain:
sleepTime := time.Second - time.Since(c.lastTime)
if sleepTime < 0 {
c.lastTime = time.Now()
c.rps = 0
} else if c.rps == MaxRPS {
time.Sleep(sleepTime)
c.lastTime = time.Now()
c.rps = 0
}
c.rps++

c.WaitForRateLimit()
if c.Debug {
if strings.Contains(url, "/files/upload") {
c.writeLog("MG TRANSPORT API Request: %s %s %s [file data]", reqType, url, c.Token)
Expand All @@ -92,7 +89,7 @@ tryAgain:
return res, 0, NewCriticalHTTPError(err)
}

if resp.StatusCode == http.StatusTooManyRequests && attempt < 3 {
if resp.StatusCode == http.StatusTooManyRequests && attempt < maxAttempt {
attempt++
c.writeLog("MG TRANSPORT API Request rate limit hit on attempt %d, retrying", attempt)
goto tryAgain
Expand Down
45 changes: 0 additions & 45 deletions v1/storage.go

This file was deleted.

31 changes: 0 additions & 31 deletions v1/storage_test.go

This file was deleted.

15 changes: 6 additions & 9 deletions v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"time"
)

Expand Down Expand Up @@ -79,14 +78,12 @@ const (

// MgClient type.
type MgClient struct {
URL string `json:"url"`
Token string `json:"token"`
Debug bool `json:"debug"`
httpClient *http.Client `json:"-"`
logger BasicLogger `json:"-"`
mux sync.Mutex `json:"-"`
lastTime time.Time `json:"-"`
rps int `json:"-"`
URL string `json:"url"`
Token string `json:"token"`
Debug bool `json:"debug"`
httpClient *http.Client `json:"-"`
logger BasicLogger `json:"-"`
limiter *TokensBucket `json:"-"`
}

// Channel type.
Expand Down

0 comments on commit c80d4bf

Please sign in to comment.