forked from zeromq/goczmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
readwriter.go
123 lines (102 loc) · 2.58 KB
/
readwriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package goczmq
import (
"C"
"io"
)
// ReadWriter provides an io.ReadWriter compatible
// interface for goczmq.Sock
type ReadWriter struct {
sock *Sock
poller *Poller
clientIDs []string
frame []byte
currentIndex int
timeoutMillis int
}
// NewReadWriter accepts a sock and returns a goczmq.ReadWriter. The
// io.ReadWriter should now be considered responsible for this
// Sock.
func NewReadWriter(sock *Sock) (*ReadWriter, error) {
rw := &ReadWriter{
sock: sock,
timeoutMillis: -1,
}
var err error
rw.poller, err = NewPoller(rw.sock)
return rw, err
}
// SetTimeout sets the timeout on Read in millisecond. If no new
// data is received within the timeout period, Read will return
// an ErrTimeout
func (r *ReadWriter) SetTimeout(ms int) {
r.timeoutMillis = ms
}
// Read satisifies io.Read
func (r *ReadWriter) Read(p []byte) (int, error) {
var totalRead int
var totalFrame int
var flag int
var err error
if r.currentIndex == 0 {
s, err := r.poller.Wait(r.timeoutMillis)
if err != nil {
return totalRead, err
}
if s == nil {
return totalRead, ErrTimeout
}
r.frame, flag, err = s.RecvFrame()
if s.GetType() == Router && r.currentIndex == 0 {
r.clientIDs = append(r.clientIDs, string(r.frame))
r.frame, flag, err = s.RecvFrame()
}
if flag == FlagMore {
return totalRead, ErrMultiPartUnsupported
}
if err != nil {
return totalRead, io.EOF
}
}
totalRead += copy(p[:], r.frame[r.currentIndex:])
totalFrame += len(r.frame)
if totalFrame-r.currentIndex > len(p) {
r.currentIndex = totalRead
} else {
r.currentIndex = 0
err = io.EOF
}
return totalRead, err
}
// Write satisfies io.Write
func (r *ReadWriter) Write(p []byte) (int, error) {
var total int
if r.sock.GetType() == Router {
err := r.sock.SendFrame(r.GetLastClientID(), FlagMore)
if err != nil {
return total, err
}
}
err := r.sock.SendFrame(p, 0)
if err != nil {
return total, err
}
return len(p), nil
}
// GetLastClientID returns the id of the last client you received
// a message from if the underlying socket is a Router socket
func (r *ReadWriter) GetLastClientID() []byte {
id := []byte(r.clientIDs[0])
r.clientIDs = r.clientIDs[1:]
return id
}
// SetLastClientID lets you manually set the id of the client
// you last received a message from if the underlying socket
// is a Router socket
func (r *ReadWriter) SetLastClientID(id []byte) {
r.clientIDs = append(r.clientIDs, string(id))
}
// Destroy destroys both the ReadWriter and the underlying Sock
func (r *ReadWriter) Destroy() {
r.sock.Destroy()
r.poller.Destroy()
}