From 13f1592b7aa1b44b7ec51ad8808c7c4352991a0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E8=BF=8E=E6=9D=BE?= Date: Mon, 16 Oct 2017 15:53:49 +0800 Subject: [PATCH] rename --- tcp_server.go => async_server.go | 0 conn.go | 190 +++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) rename tcp_server.go => async_server.go (100%) create mode 100644 conn.go diff --git a/tcp_server.go b/async_server.go similarity index 100% rename from tcp_server.go rename to async_server.go diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..9678ccc --- /dev/null +++ b/conn.go @@ -0,0 +1,190 @@ +package tcp + +import ( + "errors" + "io" + "net" + "strings" + "sync" + "sync/atomic" + "time" +) + +var ( + ErrConnClosing = errors.New("use of closed network connection") + ErrBufferFull = errors.New("the async send buffer is full") +) + +type TCPConn struct { + callback CallBack + protocol Protocol + + conn *net.TCPConn + readChan chan Packet + writeChan chan Packet + errChan chan error + + readDeadline time.Duration + writeDeadline time.Duration + maxPacketSize uint32 + + exitChan chan struct{} + closeOnce sync.Once + exitFlag int32 +} + +func NewTCPConn(conn *net.TCPConn, callback CallBack, protocol Protocol) *TCPConn { + c := &TCPConn{ + conn: conn, + callback: callback, + protocol: protocol, + + readChan: make(chan Packet, readChanSize), + writeChan: make(chan Packet, writeChanSize), + errChan: make(chan error, 10), + + exitChan: make(chan struct{}), + exitFlag: 0, + } + return c +} + +func (c *TCPConn) Serve() { + defer func() { + if r := recover(); r != nil { + logger.Println("tcp conn(%v) Serve error, %v ", c.GetRemoteIPAddress(), r) + } + }() + atomic.StoreInt32(&c.exitFlag, 1) + c.callback.OnConnected(c) + go c.readLoop() + go c.writeLoop() + go c.handleLoop() +} + +func (c *TCPConn) readLoop() { + defer func() { + recover() + c.Close() + }() + + for { + select { + case <-c.exitChan: + return + default: + if c.readDeadline > 0 { + c.conn.SetReadDeadline(time.Now().Add(c.readDeadline)) + } + p, err := c.protocol.ReadPacket(c.conn) + if err != nil { + if err != io.EOF { + c.errChan <- err + } + return + } + c.readChan <- p + } + } +} + +func (c *TCPConn) ReadPacket() (Packet, error) { + if c.protocol == nil { + return nil, errors.New("no protocol impl") + } + return c.protocol.ReadPacket(c.conn) +} + +func (c *TCPConn) writeLoop() { + defer func() { + recover() + c.Close() + }() + + for pkt := range c.writeChan { + if pkt == nil { + continue + } + if c.writeDeadline > 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeDeadline)) + } + if err := c.protocol.WritePacket(c.conn, pkt); err != nil { + c.errChan <- err + return + } + } +} + +func (c *TCPConn) handleLoop() { + defer func() { + recover() + c.Close() + }() + for p := range c.readChan { + if p == nil { + continue + } + c.callback.OnMessage(c, p) + } +} + +func (c *TCPConn) AsyncWritePacket(p Packet) error { + if c.IsClosed() { + return ErrConnClosing + } + select { + case c.writeChan <- p: + return nil + default: + return ErrBufferFull + } +} + +func (c *TCPConn) Close() { + c.closeOnce.Do(func() { + close(c.exitChan) + close(c.errChan) + close(c.writeChan) + close(c.readChan) + c.callback.OnDisconnected(c) + atomic.StoreInt32(&c.exitFlag, 0) + c.conn.Close() + }) +} + +func (c *TCPConn) Errors() <-chan error { + return c.errChan +} + +func (c *TCPConn) GetRawConn() *net.TCPConn { + return c.conn +} + +func (c *TCPConn) IsClosed() bool { + return atomic.LoadInt32(&c.exitFlag) == 0 +} + +func (c *TCPConn) GetLocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +//LocalIPAddress 返回socket连接本地的ip地址 +func (c *TCPConn) GetLocalIPAddress() string { + return strings.Split(c.GetLocalAddr().String(), ":")[0] +} + +func (c *TCPConn) GetRemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +func (c *TCPConn) GetRemoteIPAddress() string { + return strings.Split(c.GetRemoteAddr().String(), ":")[0] +} + +func (c *TCPConn) setReadDeadline(t time.Duration) { + c.readDeadline = t +} + +func (c *TCPConn) setWriteDeadline(t time.Duration) { + c.writeDeadline = t +}