forked from inconshreveable/muxado
-
Notifications
You must be signed in to change notification settings - Fork 0
/
buffer.go
106 lines (90 loc) · 1.57 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package muxado
import (
"bytes"
"errors"
"io"
"io/ioutil"
"sync"
"time"
)
var (
bufferFull = errors.New("buffer is full")
bufferClosed = errors.New("buffer closed previously")
)
type buffer interface {
Read([]byte) (int, error)
ReadFrom(io.Reader) (int, error)
SetError(error)
SetDeadline(time.Time)
}
type inboundBuffer struct {
cond sync.Cond
mu sync.Mutex
bytes.Buffer
err error
maxSize int
}
func (b *inboundBuffer) Init(maxSize int) {
b.cond.L = &b.mu
b.maxSize = maxSize
}
func (b *inboundBuffer) ReadFrom(rd io.Reader) (n int, err error) {
var n64 int64
b.mu.Lock()
if b.err != nil {
if _, err = ioutil.ReadAll(rd); err == nil {
err = bufferClosed
}
goto DONE
}
n64, err = b.Buffer.ReadFrom(rd)
n += int(n64)
if b.Buffer.Len() > b.maxSize {
err = bufferFull
b.err = bufferFull
}
b.cond.Broadcast()
DONE:
b.mu.Unlock()
return int(n), err
}
func (b *inboundBuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
for {
if b.Len() != 0 {
n, err = b.Buffer.Read(p)
break
}
if b.err != nil {
err = b.err
break
}
b.cond.Wait()
}
b.mu.Unlock()
return
}
func (b *inboundBuffer) SetError(err error) {
b.mu.Lock()
b.err = err
b.mu.Unlock()
b.cond.Broadcast()
}
func (b *inboundBuffer) SetDeadline(t time.Time) {
// XXX: implement
/*
b.L.Lock()
// set the deadline
b.deadline = t
// how long until the deadline
delay := t.Sub(time.Now())
if b.timer != nil {
b.timer.Stop()
}
// after the delay, wake up waiters
b.timer = time.AfterFunc(delay, func() {
b.Broadcast()
})
b.L.Unlock()
*/
}