-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlimiter_writer.go
140 lines (128 loc) · 3.41 KB
/
limiter_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright (c) 2020 Hirotsuna Mizuno. All rights reserved.
// Use of this source code is governed by the MIT license that can be found in
// the LICENSE file.
package speedio
import (
"io"
"sync"
"time"
"github.com/tunabay/go-infounit"
)
// LimiterWriter implements bit rate limiting for an io.Writer object.
type LimiterWriter struct {
wr io.Writer // underlying writer provided by the client
rate infounit.BitRate
resolution time.Duration
maxWait time.Duration
lim *limiter
closed bool
closedChan chan struct{}
mu sync.RWMutex
}
// NewLimiterWriter creates a new LimiterWriter with default configuration. The
// default configuration has a resolution of 1s and a max-wait duration of 500ms,
// which means that average bit rate for the last 1s is limited, and a partial
// writing is performed at least every 500ms.
func NewLimiterWriter(wr io.Writer, rate infounit.BitRate) (*LimiterWriter, error) {
return NewLimiterWriterWithConfig(wr, rate, nil)
}
// NewLimiterWriterWithConfig creates a new LimiterWriter with the specified
// configuration. If conf is nil, the default configuration will be used.
func NewLimiterWriterWithConfig(wr io.Writer, rate infounit.BitRate, conf *LimiterConfig) (*LimiterWriter, error) {
if conf == nil {
conf = DefaultLimiterConfig
}
w := &LimiterWriter{
wr: wr,
rate: rate,
resolution: conf.Resolution,
maxWait: conf.MaxWait,
closedChan: make(chan struct{}),
}
lim, err := newLimiter(w.rate, w.resolution, w.maxWait)
if err != nil {
return nil, err
}
w.lim = lim
return w, nil
}
// LimitingBitRate returns the current limiting bit rate.
func (w *LimiterWriter) LimitingBitRate() infounit.BitRate {
w.mu.RLock()
defer w.mu.RUnlock()
return w.rate
}
// SetBitRate sets a new limiting bit rate.
func (w *LimiterWriter) SetBitRate(rate infounit.BitRate) error {
w.mu.Lock()
defer w.mu.Unlock()
if err := w.lim.set(time.Now(), rate, w.resolution, w.maxWait); err != nil {
return err
}
w.rate = rate
return nil
}
// Close closes the writer.
// If the underlying writer implements io.WriteCloser, its Close method
// is also called.
func (w *LimiterWriter) Close() error {
return w.close(true)
}
// CloseSingle is the same as Close except that it does not close the underlying writer.
func (w *LimiterWriter) CloseSingle() error {
return w.close(false)
}
//
func (w *LimiterWriter) close(chain bool) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
close(w.closedChan)
if !chain {
return nil
}
if wrc, ok := w.wr.(io.Closer); ok {
if err := wrc.Close(); err != nil {
return err
}
}
return nil
}
// Write writes len(p) bytes from p to the underlying writer.
// It blocks until all the data in p is written, but it does not just wait
// until all the data is written. It repeatedly writes part of the divided p
// to the underlying writer.
func (w *LimiterWriter) Write(p []byte) (int, error) {
written := 0
for 0 < len(p) {
tc := time.Now()
wd, abc := w.lim.request(tc, len(p))
if 0 < wd {
timer := time.NewTimer(wd)
select {
case <-timer.C:
case <-w.closedChan:
if !timer.Stop() {
<-timer.C
}
return written, ErrClosed
}
}
n, err := w.wr.Write(p[:abc])
if n < abc {
w.lim.refund(abc - n)
}
if err != nil {
return written, err
}
if n == 0 {
return written, ErrZeroWrite
}
written += n
p = p[n:]
}
return written, nil
}