Skip to content

Commit

Permalink
Add option to reuse buffers for interleaved frames
Browse files Browse the repository at this point in the history
  • Loading branch information
kevmo314 committed Aug 6, 2024
1 parent 208d828 commit 2e3e0f6
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 4 deletions.
8 changes: 7 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Client struct {
BytesReceived *uint64
// pointer to a variable that stores sent bytes.
BytesSent *uint64
// enable reuse of buffers for incoming packets.
BufferReuseEnable bool

//
// system functions (all optional)
Expand Down Expand Up @@ -883,7 +885,11 @@ func (c *Client) connOpen() error {

c.nconn = nconn
bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent)
c.conn = conn.NewConn(bc)
if c.BufferReuseEnable {
c.conn = conn.NewConn(bc, conn.ConnOptionFrameBufferReuseEnable(true))
} else {
c.conn = conn.NewConn(bc)
}
c.reader = &clientReader{
c: c,
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/base/interleaved_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ func (f *InterleavedFrame) Unmarshal(br *bufio.Reader) error {
payloadLen := int(uint16(header[2])<<8 | uint16(header[3]))

f.Channel = int(header[1])
f.Payload = make([]byte, payloadLen)

if cap(f.Payload) < payloadLen {
// if there's not enough space, extend the buffer
f.Payload = append(f.Payload[:cap(f.Payload)], make([]byte, payloadLen-cap(f.Payload))...)
} else {
// otherwise, set the len of the buffer to the payloadLen
f.Payload = f.Payload[:payloadLen]
}
_, err = io.ReadFull(br, f.Payload)
return err
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@ type Conn struct {

// reuse interleaved frames. they should never be passed to secondary routines
fr base.InterleavedFrame

frameBufferReuseEnable bool
}

// NewConn allocates a Conn.
func NewConn(rw io.ReadWriter) *Conn {
func NewConn(rw io.ReadWriter, opts ...ConnOption) *Conn {
return &Conn{
w: rw,
br: bufio.NewReaderSize(rw, readBufferSize),
}
}

// ConnOption is an option for Conn.
type ConnOption func(c *Conn)

// ConnOptionFrameBufferReuseEnable enables buffer reuse.
func ConnOptionFrameBufferReuseEnable(v bool) ConnOption {
return func(c *Conn) {
c.frameBufferReuseEnable = v
}
}

// Read reads a Request, a Response or an Interleaved frame.
func (c *Conn) Read() (interface{}, error) {
byts, err := c.br.Peek(2)
Expand Down Expand Up @@ -63,6 +75,9 @@ func (c *Conn) ReadResponse() (*base.Response, error) {

// ReadInterleavedFrame reads a InterleavedFrame.
func (c *Conn) ReadInterleavedFrame() (*base.InterleavedFrame, error) {
if !c.frameBufferReuseEnable {
c.fr.Payload = nil // reset the payload, causing a new buffer to be allocated
}
err := c.fr.Unmarshal(c.br)
return &c.fr, err
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ func TestRead(t *testing.T) {
}
}

func TestReadInterleavedFrameWithBufferReuse(t *testing.T) {
buf := bytes.NewBuffer([]byte{0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x3, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x5, 0x3, 0x4, 0x5, 0x6, 0x7})
conn := NewConn(buf)
// read first packet
dec1, err1 := conn.Read()
require.NoError(t, err1)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, dec1)
p1 := &dec1.(*base.InterleavedFrame).Payload
// read second packet
dec2, err2 := conn.Read()
require.NoError(t, err2)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x02, 0x03, 0x04},
}, dec2)
p2 := &dec2.(*base.InterleavedFrame).Payload
// read third packet
dec3, err3 := conn.Read()
require.NoError(t, err3)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x03, 0x04, 0x05, 0x06, 0x07},
}, dec3)
p3 := &dec3.(*base.InterleavedFrame).Payload
// assert that the buffer was reused
require.Equal(t, p1, p2)
require.Equal(t, p2, p3)
}

func TestReadError(t *testing.T) {
var buf bytes.Buffer
conn := NewConn(&buf)
Expand Down

0 comments on commit 2e3e0f6

Please sign in to comment.