Skip to content

Commit

Permalink
Merge pull request #54 from Allenxuxu/feat-ringbuf-len
Browse files Browse the repository at this point in the history
add buffer length
  • Loading branch information
Allenxuxu authored Feb 20, 2021
2 parents d13351d + dd6a2d8 commit 48fe835
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 8 deletions.
31 changes: 23 additions & 8 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ type CallBack interface {

// Connection TCP 连接
type Connection struct {
fd int
connected atomic.Bool
outBuffer *ringbuffer.RingBuffer // write buffer
inBuffer *ringbuffer.RingBuffer // read buffer
callBack CallBack
loop *eventloop.EventLoop
peerAddr string
ctx interface{}
fd int
connected atomic.Bool
outBuffer *ringbuffer.RingBuffer // write buffer
outBufferLen atomic.Int64
inBuffer *ringbuffer.RingBuffer // read buffer
inBufferLen atomic.Int64
callBack CallBack
loop *eventloop.EventLoop
peerAddr string
ctx interface{}
KeyValueContext

idleTime time.Duration
Expand Down Expand Up @@ -129,6 +131,16 @@ func (c *Connection) ShutdownWrite() error {
return unix.Shutdown(c.fd, unix.SHUT_WR)
}

// ReadBufferLength read buffer 当前积压的数据长度
func (c *Connection) ReadBufferLength() int64 {
return c.inBufferLen.Get()
}

// WriteBufferLength write buffer 当前积压的数据长度
func (c *Connection) WriteBufferLength() int64 {
return c.outBufferLen.Get()
}

// HandleEvent 内部使用,event loop 回调
func (c *Connection) HandleEvent(fd int, events poller.Event) {
if c.idleTime > 0 {
Expand All @@ -147,6 +159,9 @@ func (c *Connection) HandleEvent(fd int, events poller.Event) {
} else if events&poller.EventRead != 0 {
c.handleRead(fd)
}

c.inBufferLen.Swap(int64(c.inBuffer.Length()))
c.outBufferLen.Swap(int64(c.outBuffer.Length()))
}

func (c *Connection) handlerProtocol(buffer *ringbuffer.RingBuffer) []byte {
Expand Down
106 changes: 106 additions & 0 deletions example/bufferlength/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"container/list"
"log"
"sync"
"time"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)

const clientsKey = "demo_push_message_key"

// Server example
type Server struct {
conn *list.List
mu sync.RWMutex
server *gev.Server
}

// New server
func New(ip, port string) (*Server, error) {
var err error
s := new(Server)
s.conn = list.New()
s.server, err = gev.NewServer(s,
gev.Address(ip+":"+port))
if err != nil {
return nil, err
}

return s, nil
}

// Start server
func (s *Server) Start() {
s.server.RunEvery(1*time.Second, s.RunPush)
s.server.Start()
}

// Stop server
func (s *Server) Stop() {
s.server.Stop()
}

// RunPush push message
func (s *Server) RunPush() {
var next *list.Element

s.mu.RLock()
defer s.mu.RUnlock()

for e := s.conn.Front(); e != nil; e = next {
next = e.Next()

c := e.Value.(*connection.Connection)
if c.WriteBufferLength() > 1024*10 {
log.Printf("write buffer length > 1024*10")
continue
}
_ = c.Send([]byte("hello\n"))
}
}

// OnConnect callback
func (s *Server) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())

s.mu.Lock()
e := s.conn.PushBack(c)
s.mu.Unlock()
c.Set(clientsKey, e)
}

// OnMessage callback
func (s *Server) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Printf("OnMessage, read buffer len %d, write buffer len %d \n", c.ReadBufferLength(), c.WriteBufferLength())

out = data
return
}

// OnClose callback
func (s *Server) OnClose(c *connection.Connection) {
log.Println("OnClose")
v, ok := c.Get(clientsKey)
if !ok {
log.Println("OnClose : get key fail")
return
}

s.mu.Lock()
s.conn.Remove(v.(*list.Element))
s.mu.Unlock()
}

func main() {
s, err := New("", "1833")
if err != nil {
panic(err)
}
defer s.Stop()

s.Start()
}

0 comments on commit 48fe835

Please sign in to comment.