diff --git a/nocopy.go b/nocopy.go index ad74634a..3d196816 100644 --- a/nocopy.go +++ b/nocopy.go @@ -208,11 +208,19 @@ func NewReader(r io.Reader) Reader { return newZCReader(r) } +func NewReaderReuseBuffer(r io.Reader) Reader { + return newZCReaderWithSize(r, defaultLinkBufferSize) +} + // NewWriter convert io.Writer to nocopy Writer func NewWriter(w io.Writer) Writer { return newZCWriter(w) } +func NewWriterReuseBuffer(w io.Writer) Writer { + return newZCWriterWithSize(w, defaultLinkBufferSize) +} + // NewReadWriter convert io.ReadWriter to nocopy ReadWriter func NewReadWriter(rw io.ReadWriter) ReadWriter { return &zcReadWriter{ diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 89b13eb9..7fb2e0c0 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -38,6 +38,28 @@ var ( _ Writer = &LinkBuffer{} ) +var linkBufferPool = sync.Pool{ + New: func() interface{} { + return &LinkBuffer{} + }, +} + +func NewLinkBufferFromPool(size ...int) *LinkBuffer { + buf := linkBufferPool.Get().(*LinkBuffer) + var l int + if len(size) > 0 { + l = size[0] + } + node := newLinkBufferNode(l) + buf.head, buf.read, buf.flush, buf.write = node, node, node, node + return buf +} + +func ReleaseLinkBuffer(buf *LinkBuffer) { + buf.Release() + buf.head, buf.read, buf.flush, buf.write = nil, nil, nil, nil +} + // NewLinkBuffer size defines the initial capacity, but there is no readable data. func NewLinkBuffer(size ...int) *LinkBuffer { buf := &LinkBuffer{} @@ -205,6 +227,7 @@ func (b *UnsafeLinkBuffer) Skip(n int) (err error) { // Release the node that has been read. // b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice func (b *UnsafeLinkBuffer) Release() (err error) { + //logger.Printf("Release the buffer\n") for b.read != b.flush && b.read.Len() == 0 { b.read = b.read.next } @@ -225,6 +248,15 @@ func (b *UnsafeLinkBuffer) Release() (err error) { return nil } +func (b *UnsafeLinkBuffer) ReleaseWritten() error { + for b.flush != b.write.next { + node := b.flush + b.flush = b.flush.next + node.Release() + } + return nil +} + // ReadString implements Reader. func (b *UnsafeLinkBuffer) ReadString(n int) (s string, err error) { if n <= 0 { @@ -871,6 +903,7 @@ func (node *linkBufferNode) Release() (err error) { if atomic.AddInt32(&node.refer, -1) == 0 { // readonly nodes cannot recycle node.buf, other node.buf are recycled to mcache. if node.reusable() { + //logger.Printf("reuse the buffer\n") free(node.buf) } node.buf, node.origin, node.next = nil, nil, nil diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index c731d76b..8e42826f 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -26,6 +26,10 @@ import ( "testing" ) +func TestLinkBufferReuse(t *testing.T) { + //buf := NewLinkBuffer() +} + func TestLinkBuffer(t *testing.T) { // clean & new LinkBufferCap = 128 diff --git a/nocopy_readwriter.go b/nocopy_readwriter.go index 2a07c04a..b115a9c1 100644 --- a/nocopy_readwriter.go +++ b/nocopy_readwriter.go @@ -28,6 +28,13 @@ func newZCReader(r io.Reader) *zcReader { } } +func newZCReaderWithSize(r io.Reader, s int) *zcReader { + return &zcReader{ + r: r, + buf: NewLinkBufferFromPool(s), + } +} + var _ Reader = &zcReader{} // zcReader implements Reader. @@ -62,7 +69,8 @@ func (r *zcReader) Skip(n int) (err error) { // Release implements Reader. func (r *zcReader) Release() (err error) { - return r.buf.Release() + //return r.buf.Release() + return r.buf.ReleaseWritten() } // Slice implements Reader. @@ -123,7 +131,7 @@ func (r *zcReader) waitRead(n int) (err error) { func (r *zcReader) fill(n int) (err error) { var buf []byte var num int - for i := 0; i < maxReadCycle && r.buf.Len() < n && err == nil; i++ { + for i := 0; i < maxReadCycle && r.buf.Len() < n; i++ { buf, err = r.buf.Malloc(block4k) if err != nil { return err @@ -151,6 +159,13 @@ func newZCWriter(w io.Writer) *zcWriter { } } +func newZCWriterWithSize(w io.Writer, s int) *zcWriter { + return &zcWriter{ + w: w, + buf: NewLinkBufferFromPool(s), + } +} + var _ Writer = &zcWriter{} // zcWriter implements Writer. @@ -172,10 +187,15 @@ func (w *zcWriter) MallocLen() (length int) { // Flush implements Writer. func (w *zcWriter) Flush() (err error) { w.buf.Flush() + bufLen := w.buf.Len() n, err := w.w.Write(w.buf.Bytes()) if n > 0 { w.buf.Skip(n) - w.buf.Release() + } + if n == bufLen { + w.buf.ReleaseWritten() + } else { + logger.Printf("write partial\n") } return err } diff --git a/nocopy_readwriter_test.go b/nocopy_readwriter_test.go index 45d6a7ef..c33b183a 100644 --- a/nocopy_readwriter_test.go +++ b/nocopy_readwriter_test.go @@ -50,6 +50,36 @@ func TestZCReader(t *testing.T) { MustNil(t, err) } +func TestZCReaderReuse(t *testing.T) { + reader := &MockIOReadWriter{ + read: func(p []byte) (n int, err error) { + copy(p, make([]byte, len(p))) + return len(p), nil + }, + } + r := newZCReaderWithSize(reader, 4096) + buf, err := r.buf.Malloc(10) + MustNil(t, err) + buf[0] = 1 + r.buf.write.buf = r.buf.write.buf[:10] + MustTrue(t, r.buf.write.buf[0] == buf[0]) + r.Release() +} + +func TestZCWriterReuse(t *testing.T) { + writer := &MockIOReadWriter{ + write: func(p []byte) (n int, err error) { + return len(p), nil + }, + } + w := newZCWriterWithSize(writer, 4096) + + p, err := w.WriteBinary(make([]byte, 10)) + MustTrue(t, p == 10) + MustNil(t, err) + MustNil(t, w.Flush()) +} + func TestZCWriter(t *testing.T) { writer := &MockIOReadWriter{ write: func(p []byte) (n int, err error) {