Skip to content

Commit

Permalink
feat: Add ratelimit support (#11)
Browse files Browse the repository at this point in the history
* feat: Add ratelimit support

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 23, 2021
1 parent 63f0abd commit 199770a
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 8 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ integration_test:
popd

tidy:
go mod tidy
go mod verify
go mod tidy && go mod verify
pushd tests && go mod tidy && go mod verify && popd

clean:
find . -type f -name 'generated.go' -delete
44 changes: 42 additions & 2 deletions branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"io"
"sync"
"time"

"github.com/beyondstorage/go-storage/v4/pairs"
"github.com/beyondstorage/go-storage/v4/types"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -35,10 +37,29 @@ type Branch struct {
}

func (br *Branch) Write(idx uint64, data []byte) (n int64, err error) {
var ps []types.Pair

// FIXME: we need to add default pari instead.
if br.s.limit != nil {
ps = append(ps, pairs.WithIoCallback(func(bs []byte) {
l := len(bs)

for l > 0 {
n := br.s.limit.Burst()
if n > l {
n = l
}
r := br.s.limit.ReserveN(time.Now(), n)
time.Sleep(r.Delay())
l -= n
}
}))
}

p := formatPath(br.id, idx)

size := int64(len(data))
n, err = br.s.upper.Write(p, bytes.NewReader(data), size)
n, err = br.s.upper.Write(p, bytes.NewReader(data), size, ps...)
if err != nil {
return
}
Expand All @@ -52,6 +73,25 @@ func (br *Branch) Write(idx uint64, data []byte) (n int64, err error) {
}

func (br *Branch) ReadFrom(r io.Reader) (n int64, err error) {
var ps []types.Pair

// FIXME: we need to add default pari instead.
if br.s.limit != nil {
ps = append(ps, pairs.WithIoCallback(func(bs []byte) {
l := len(bs)

for l > 0 {
n := br.s.limit.Burst()
if n > l {
n = l
}
r := br.s.limit.ReserveN(time.Now(), n)
time.Sleep(r.Delay())
l -= n
}
}))
}

// Use 4mb as chunk.
chunkSize := int64(4 * 1024 * 1024)
idx := uint64(0)
Expand All @@ -63,7 +103,7 @@ func (br *Branch) ReadFrom(r io.Reader) (n int64, err error) {
// Update index.
idx++

size, err := br.s.upper.Write(p, r, chunkSize)
size, err := br.s.upper.Write(p, r, chunkSize, ps...)
// No matter we read success or not, we both need to send data.
n += size
br.wg.Add(1)
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/beyondstorage/go-storage/v4/types"
"github.com/panjf2000/ants/v2"
"golang.org/x/time/rate"
)

const (
Expand All @@ -17,6 +18,8 @@ type Config struct {
Upper types.Storager
Under types.Storager

// SpeedLimit for upper storager, unit is B/s
SpeedLimit int
PersistMethod string
}

Expand Down Expand Up @@ -50,6 +53,10 @@ func NewWithConfig(cfg *Config) (s *Stream, err error) {
return nil, fmt.Errorf("not supported persis method: %v", cfg.PersistMethod)
}

if cfg.SpeedLimit != 0 {
s.limit = rate.NewLimiter(rate.Limit(cfg.SpeedLimit), cfg.SpeedLimit)
}

// FIXME: we will support setting workers later.
s.p, err = ants.NewPool(10)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ require (
github.com/panjf2000/ants/v2 v2.4.6
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.9.0
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
2 changes: 2 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"fmt"
"golang.org/x/time/rate"
"io"
"sync"

Expand All @@ -19,6 +20,7 @@ type Stream struct {
// Only valid if method is "append" and under supports.
underAppend types.Appender

limit *rate.Limiter
p *ants.Pool
ch chan op
errch chan error
Expand Down
4 changes: 2 additions & 2 deletions tests/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/beyondstorage/go-stream/tests

go 1.16
go 1.15

require (
github.com/beyondstorage/go-service-memory v0.2.0
github.com/beyondstorage/go-service-memory v0.2.1-0.20210823092007-eb22951d1ec2
github.com/beyondstorage/go-service-s3/v2 v2.3.0
github.com/beyondstorage/go-storage/v4 v4.5.0
github.com/beyondstorage/go-stream v0.1.0
Expand Down
6 changes: 4 additions & 2 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ github.com/beyondstorage/go-endpoint v1.0.1 h1:F8x2dGLMu9je6g7zPbKoxCXDlug97K26S
github.com/beyondstorage/go-endpoint v1.0.1/go.mod h1:P2hknaGrziOJJKySv/XnAiVw/d3v12/LZu2gSxEx4nM=
github.com/beyondstorage/go-integration-test/v4 v4.2.0/go.mod h1:jLyYWSGUjQRH7U1HdaLbXE5sxBgqrtK73q+Q7PGIuSs=
github.com/beyondstorage/go-integration-test/v4 v4.3.0/go.mod h1:HKgzemQZpxoHBL49JYEUnLTb5eteUhzcvmmPL7EDT/Y=
github.com/beyondstorage/go-service-memory v0.2.0 h1:VYIgkKc25XpudmBozu5FJWjuW0+WhBn/uAb+RDhxpKQ=
github.com/beyondstorage/go-service-memory v0.2.0/go.mod h1:XQwDQ2AwNxt2ImUHgPyPedHcQxUUgxY3EwUEJUvfY1o=
github.com/beyondstorage/go-service-memory v0.2.1-0.20210823092007-eb22951d1ec2 h1:zrZioFFso7bm/ChxUFMldC3NXFNyvvwSkuqsWrkMNks=
github.com/beyondstorage/go-service-memory v0.2.1-0.20210823092007-eb22951d1ec2/go.mod h1:XQwDQ2AwNxt2ImUHgPyPedHcQxUUgxY3EwUEJUvfY1o=
github.com/beyondstorage/go-service-s3/v2 v2.3.0 h1:mCdCuGmQ/26A1Xhj9tnFcRy3bXf1h5MT/TyFrJW8CAI=
github.com/beyondstorage/go-service-s3/v2 v2.3.0/go.mod h1:B4gLy8g/uhjkLkVymkRES/tIKH2UhB0zmkzqzGaN5LU=
github.com/beyondstorage/go-storage/v4 v4.3.0/go.mod h1:0fdcRCzLKMQe7Ve4zPlyTGgoPYwuINiV79Gx9tCt9tQ=
Expand Down Expand Up @@ -101,6 +101,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
1 change: 1 addition & 0 deletions tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func setup(t *testing.T) (s *stream.Stream, upper, under types.Storager) {
s, err = stream.NewWithConfig(&stream.Config{
Upper: upperStore,
Under: underStore,
SpeedLimit: 10 * 1024 * 1024, // Use 10M/s for test.
PersistMethod: stream.PersistMethodMultipart,
})
if err != nil {
Expand Down

0 comments on commit 199770a

Please sign in to comment.