-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathio.go
309 lines (277 loc) · 8.9 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bwlimit
import (
"context"
"errors"
"io"
"math"
"os"
"time"
"golang.org/x/time/rate"
)
// Reader wraps an io.Reader and imposes a bandwidth limit on calls to Read.
type Reader struct {
io.Reader
limiter *rate.Limiter
reservation *rate.Reservation
deadline time.Time
}
// NewReader wraps an existing io.Reader and returns a Reader that limits the
// bandwidth of reads.
// A zero value for bytesPerSecond means that Read will not have a bandwidth
// limit.
func NewReader(r io.Reader, bytesPerSecond Byte) *Reader {
bwr := &Reader{
Reader: r,
limiter: rate.NewLimiter(toLimit(bytesPerSecond), toBurst(bytesPerSecond)),
}
return bwr
}
// Deadline returns the configured deadline (see SetDeadline).
func (r *Reader) Deadline() time.Time {
return r.deadline
}
// SetDeadline sets the read deadline associated with the reader.
//
// A deadline is an absolute time after which Read fails instead of blocking.
// The deadline applies to all future and pending calls to Read, not just the
// immediately following call to Read. After a deadline has been exceeded, the
// reader can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Read will return an error that wraps
// os.ErrDeadlineExceeded. This can be tested using
// errors.Is(err, os.ErrDeadlineExceeded).
//
// An idle timeout can be implemented by repeatedly extending the deadline after
// successful Read calls.
//
// A zero value for t means that calls to Read will not time out.
func (r *Reader) SetDeadline(t time.Time) {
r.deadline = t
}
// BandwidthLimit returns the current bandwidth limit.
func (r *Reader) BandwidthLimit() Byte {
return Byte(r.limiter.Limit())
}
// SetBandwidthLimit sets the bandwidth limit for future Read calls and
// any currently-blocked Read call.
// A zero value for bytesPerSecond means the bandwidth limit is removed.
func (r *Reader) SetBandwidthLimit(bytesPerSecond Byte) {
r.limiter.SetLimit(toLimit(bytesPerSecond))
r.limiter.SetBurst(toBurst(bytesPerSecond))
}
// Close forwards the call to the wrapped io.Reader if it implements io.Closer,
// otherwise it is a noop.
func (r *Reader) Close() error {
if rc, ok := r.Reader.(io.Closer); ok {
return rc.Close()
}
return nil
}
// Read reads up to len(p) bytes into p. It returns the number of bytes
// read (0 <= n <= len(p)) and any error encountered.
//
// Read will limit the speed of the reads if a bandwidth limit is configured. If
// the size of p is bigger than the rate of bytes per second, reads will be
// split into smaller chunks.
// Note that since it's not known in advance how many bytes will be read, the
// bandwidth can burst up to 2x of the configured limit when reading the first 2
// chunks.
func (r *Reader) Read(p []byte) (n int, err error) {
if r.limiter.Limit() == rate.Inf {
// no limit, just pass the call through to the connection
return r.Reader.Read(p)
}
ctx := context.Background()
if !r.deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, r.deadline)
defer cancel()
}
for _, chunk := range split(p, int(r.limiter.Limit())) {
bytesRead, err := r.readWithRateLimit(ctx, chunk)
n += bytesRead
if err != nil {
return n, err
}
if bytesRead < len(p) {
// we did not read a whole chunk, we need to return and let the
// caller call Read again, so we split the bytes again in new chunks
return n, nil
}
}
return n, nil
}
// readWithRateLimit will delay the read if needed to match the configured
// bandwidth limit.
func (r *Reader) readWithRateLimit(ctx context.Context, p []byte) (int, error) {
// we first need to delay the read if there was a read that happened before
if r.reservation != nil && r.reservation.OK() {
err := r.wait(ctx, r.reservation.Delay())
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// according to net.Conn the error should be os.ErrDeadlineExceeded
err = os.ErrDeadlineExceeded
}
return 0, err
}
r.reservation = nil
}
at := time.Now()
n, err := r.Reader.Read(p)
if n > 0 {
// reserve the number of actually read bytes to delay future reads
r.reservation = r.limiter.ReserveN(at, n)
}
return n, err
}
func (r *Reader) wait(ctx context.Context, d time.Duration) error {
if d == 0 {
return nil
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Writer wraps an io.Writer and imposes a bandwidth limit on calls to Write.
type Writer struct {
io.Writer
limiter *rate.Limiter
deadline time.Time
}
// NewWriter wraps an existing io.Writer and returns a Writer that limits the
// bandwidth of writes.
// A zero value for bytesPerSecond means that Write will not have a bandwidth
// limit.
func NewWriter(w io.Writer, bytesPerSecond Byte) *Writer {
bwr := &Writer{
Writer: w,
limiter: rate.NewLimiter(toLimit(bytesPerSecond), toBurst(bytesPerSecond)),
}
return bwr
}
// Deadline returns the configured deadline (see SetDeadline).
func (w *Writer) Deadline() time.Time {
return w.deadline
}
// SetDeadline sets the write deadline associated with the writer.
//
// A deadline is an absolute time after which Write fails instead of blocking.
// The deadline applies to all future and pending calls to Write, not just the
// immediately following call to Write. After a deadline has been exceeded, the
// writer can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Write will return an error that wraps
// os.ErrDeadlineExceeded. This can be tested using
// errors.Is(err, os.ErrDeadlineExceeded).
//
// An idle timeout can be implemented by repeatedly extending the deadline after
// successful Write calls.
//
// A zero value for t means that calls to Write will not time out.
func (w *Writer) SetDeadline(t time.Time) {
w.deadline = t
}
// BandwidthLimit returns the current bandwidth limit.
func (w *Writer) BandwidthLimit() Byte {
return Byte(w.limiter.Limit())
}
// SetBandwidthLimit sets the bandwidth limit for future Write calls and
// any currently-blocked Write call.
// A zero value for bytesPerSecond means the bandwidth limit is removed.
func (w *Writer) SetBandwidthLimit(bytesPerSecond Byte) {
w.limiter.SetLimit(toLimit(bytesPerSecond))
w.limiter.SetBurst(toBurst(bytesPerSecond))
}
// Close forwards the call to the wrapped io.Writer if it implements io.Closer,
// otherwise it is a noop.
func (w *Writer) Close() error {
if wc, ok := w.Writer.(io.Closer); ok {
return wc.Close()
}
return nil
}
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
//
// Write will limit the speed of the writes if a bandwidth limit is configured.
// If the size of p is bigger than the rate of bytes per second, writes will be
// split into smaller chunks.
func (w *Writer) Write(p []byte) (n int, err error) {
if w.limiter.Limit() == rate.Inf {
// no limit, just pass the call through to the connection
return w.Writer.Write(p)
}
ctx := context.Background()
if !w.deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, w.deadline)
defer cancel()
}
for _, chunk := range split(p, int(w.limiter.Limit())) {
bytesWritten, err := w.writeWithRateLimit(ctx, chunk)
n += bytesWritten
if err != nil {
return n, err
}
}
return n, nil
}
// writeWithRateLimit will delay the write if needed to match the configured
// bandwidth limit.
func (w *Writer) writeWithRateLimit(ctx context.Context, b []byte) (int, error) {
err := w.limiter.WaitN(ctx, len(b))
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// according to net.Conn the error should be os.ErrDeadlineExceeded
err = os.ErrDeadlineExceeded
}
return 0, err
}
return w.Writer.Write(b)
}
// split takes a byte slice and splits it into chunks of size maxSize. If b is
// not divisible by maxSize, the last chunk will be smaller.
func split(b []byte, maxSize int) [][]byte {
var end int
out := make([][]byte, ((len(b)-1)/maxSize)+1)
for i := range out {
start := end
end += maxSize
if end > len(b) {
end = len(b)
}
out[i] = b[start:end]
}
return out
}
func toLimit(b Byte) rate.Limit {
if b <= 0 {
return rate.Inf
}
return rate.Limit(b)
}
func toBurst(b Byte) int {
if b <= 0 {
return math.MaxInt
}
return int(b)
}