From 451f015f9d0bd9bb8660754969c7020d58722263 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sat, 16 Nov 2024 22:59:56 +0800 Subject: [PATCH] opt: eliminate the inuse eventloop.cache and memory leaks for idle connections (#660) Fixes #659 Updates #420 --- connection_unix.go | 45 +++++++++------------ connection_windows.go | 45 ++++++++++----------- eventloop_unix.go | 2 - eventloop_windows.go | 2 - pkg/buffer/linkedlist/linked_list_buffer.go | 18 ++++----- 5 files changed, 49 insertions(+), 63 deletions(-) diff --git a/connection_unix.go b/connection_unix.go index dc568ad80..7babbeeac 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -49,6 +49,7 @@ type conn struct { pollAttachment netpoll.PollAttachment // connection attachment for poller inboundBuffer elastic.RingBuffer // buffer for leftover data from the remote buffer []byte // buffer for the latest bytes + cache []byte // temporary cache for the inbound data isDatagram bool // UDP protocol opened bool // connection opened event fired isEOF bool // whether the connection has reached EOF @@ -290,6 +291,7 @@ func (c *conn) sendTo(buf []byte) error { func (c *conn) resetBuffer() { c.buffer = c.buffer[:0] c.inboundBuffer.Reset() + c.inboundBuffer.Done() } func (c *conn) Read(p []byte) (n int, err error) { @@ -325,22 +327,9 @@ func (c *conn) Next(n int) (buf []byte, err error) { return } - head, tail := c.inboundBuffer.Peek(n) - defer c.inboundBuffer.Discard(n) //nolint:errcheck - c.loop.cache.Reset() - c.loop.cache.Write(head) - if len(head) == n { - return c.loop.cache.Bytes(), err - } - c.loop.cache.Write(tail) - if inBufferLen >= n { - return c.loop.cache.Bytes(), err - } - - remaining := n - inBufferLen - c.loop.cache.Write(c.buffer[:remaining]) - c.buffer = c.buffer[remaining:] - return c.loop.cache.Bytes(), err + buf = bsPool.Get(n) + _, err = c.Read(buf) + return } func (c *conn) Peek(n int) (buf []byte, err error) { @@ -359,25 +348,31 @@ func (c *conn) Peek(n int) (buf []byte, err error) { if len(head) == n { return head, err } - c.loop.cache.Reset() - c.loop.cache.Write(head) - c.loop.cache.Write(tail) + buf = bsPool.Get(n)[:0] + buf = append(buf, head...) + buf = append(buf, tail...) if inBufferLen >= n { - return c.loop.cache.Bytes(), err + return } remaining := n - inBufferLen - c.loop.cache.Write(c.buffer[:remaining]) - return c.loop.cache.Bytes(), err + buf = append(buf, c.buffer[:remaining]...) + c.cache = buf + return } func (c *conn) Discard(n int) (int, error) { + if len(c.cache) > 0 { + bsPool.Put(c.cache) + c.cache = nil + } + inBufferLen := c.inboundBuffer.Buffered() - tempBufferLen := len(c.buffer) - if inBufferLen+tempBufferLen < n || n <= 0 { + if totalLen := inBufferLen + len(c.buffer); n >= totalLen || n <= 0 { c.resetBuffer() - return inBufferLen + tempBufferLen, nil + return totalLen, nil } + if c.inboundBuffer.IsEmpty() { c.buffer = c.buffer[n:] return n, nil diff --git a/connection_windows.go b/connection_windows.go index 443404c4a..fe8ccb187 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -27,6 +27,7 @@ import ( "github.com/panjf2000/gnet/v2/pkg/buffer/elastic" errorx "github.com/panjf2000/gnet/v2/pkg/errors" bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer" + bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice" goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine" ) @@ -54,6 +55,7 @@ type conn struct { ctx any // user-defined context loop *eventloop // owner event-loop buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer + cache []byte // temporary cache for the inbound data rawConn net.Conn // original connection localAddr net.Addr // local server addr remoteAddr net.Addr // remote addr @@ -116,6 +118,7 @@ func newUDPConn(el *eventloop, pc net.PacketConn, localAddr, remoteAddr net.Addr func (c *conn) resetBuffer() { c.buffer.Reset() c.inboundBuffer.Reset() + c.inboundBuffer.Done() } func (c *conn) Read(p []byte) (n int, err error) { @@ -149,22 +152,10 @@ func (c *conn) Next(n int) (buf []byte, err error) { c.buffer.B = c.buffer.B[n:] return } - head, tail := c.inboundBuffer.Peek(n) - defer c.inboundBuffer.Discard(n) //nolint:errcheck - c.loop.cache.Reset() - c.loop.cache.Write(head) - if len(head) == n { - return c.loop.cache.Bytes(), err - } - c.loop.cache.Write(tail) - if inBufferLen >= n { - return c.loop.cache.Bytes(), err - } - remaining := n - inBufferLen - c.loop.cache.Write(c.buffer.B[:remaining]) - c.buffer.B = c.buffer.B[remaining:] - return c.loop.cache.Bytes(), err + buf = bsPool.Get(n) + _, err = c.Read(buf) + return } func (c *conn) Peek(n int) (buf []byte, err error) { @@ -181,25 +172,31 @@ func (c *conn) Peek(n int) (buf []byte, err error) { if len(head) == n { return head, err } - c.loop.cache.Reset() - c.loop.cache.Write(head) - c.loop.cache.Write(tail) + buf = bsPool.Get(n)[:0] + buf = append(buf, head...) + buf = append(buf, tail...) if inBufferLen >= n { - return c.loop.cache.Bytes(), err + return } remaining := n - inBufferLen - c.loop.cache.Write(c.buffer.B[:remaining]) - return c.loop.cache.Bytes(), err + buf = append(buf, c.buffer.B[:remaining]...) + c.cache = buf + return } func (c *conn) Discard(n int) (int, error) { + if len(c.cache) > 0 { + bsPool.Put(c.cache) + c.cache = nil + } + inBufferLen := c.inboundBuffer.Buffered() - tempBufferLen := c.buffer.Len() - if inBufferLen+tempBufferLen < n || n <= 0 { + if totalLen := inBufferLen + c.buffer.Len(); n >= totalLen || n <= 0 { c.resetBuffer() - return inBufferLen + tempBufferLen, nil + return totalLen, nil } + if c.inboundBuffer.IsEmpty() { c.buffer.B = c.buffer.B[n:] return n, nil diff --git a/eventloop_unix.go b/eventloop_unix.go index ff73ef18b..dd43bc172 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -18,7 +18,6 @@ package gnet import ( - "bytes" "context" "errors" "fmt" @@ -39,7 +38,6 @@ import ( type eventloop struct { listeners map[int]*listener // listeners idx int // loop index in the engine loops list - cache bytes.Buffer // temporary buffer for scattered bytes engine *engine // engine in loop poller *netpoll.Poller // epoll or kqueue buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB diff --git a/eventloop_windows.go b/eventloop_windows.go index 460652981..95da083af 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -15,7 +15,6 @@ package gnet import ( - "bytes" "context" "errors" "fmt" @@ -31,7 +30,6 @@ type eventloop struct { ch chan any // channel for event-loop idx int // index of event-loop in event-loops eng *engine // engine in loop - cache bytes.Buffer // temporary buffer for scattered bytes connCount int32 // number of active connections in event-loop connections map[*conn]struct{} // TCP connection map: fd -> conn eventHandler EventHandler // user eventHandler diff --git a/pkg/buffer/linkedlist/linked_list_buffer.go b/pkg/buffer/linkedlist/linked_list_buffer.go index 0095422ae..8916e9182 100644 --- a/pkg/buffer/linkedlist/linked_list_buffer.go +++ b/pkg/buffer/linkedlist/linked_list_buffer.go @@ -32,7 +32,6 @@ func (b *node) len() int { // Buffer is a linked list of node. type Buffer struct { - bs [][]byte head *node tail *node size int @@ -123,19 +122,19 @@ func (llb *Buffer) Peek(maxBytes int) ([][]byte, error) { } else if maxBytes > llb.Buffered() { return nil, io.ErrShortBuffer } - llb.bs = llb.bs[:0] + var bs [][]byte var cum int for iter := llb.head; iter != nil; iter = iter.next { offset := iter.len() if cum+offset > maxBytes { offset = maxBytes - cum } - llb.bs = append(llb.bs, iter.buf[:offset]) + bs = append(bs, iter.buf[:offset]) if cum += offset; cum == maxBytes { break } } - return llb.bs, nil + return bs, nil } // PeekWithBytes is like Peek but accepts [][]byte and puts them onto head. @@ -145,7 +144,7 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) { } else if maxBytes > llb.Buffered() { return nil, io.ErrShortBuffer } - llb.bs = llb.bs[:0] + var bss [][]byte var cum int for _, b := range bs { if n := len(b); n > 0 { @@ -153,9 +152,9 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) { if cum+offset > maxBytes { offset = maxBytes - cum } - llb.bs = append(llb.bs, b[:offset]) + bss = append(bss, b[:offset]) if cum += offset; cum == maxBytes { - return llb.bs, nil + return bss, nil } } } @@ -164,12 +163,12 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) { if cum+offset > maxBytes { offset = maxBytes - cum } - llb.bs = append(llb.bs, iter.buf[:offset]) + bss = append(bss, iter.buf[:offset]) if cum += offset; cum == maxBytes { break } } - return llb.bs, nil + return bss, nil } // Discard removes some nodes based on n bytes. @@ -266,7 +265,6 @@ func (llb *Buffer) Reset() { llb.tail = nil llb.size = 0 llb.bytes = 0 - llb.bs = nil } // pop returns and removes the head of l. If l is empty, it returns nil.