Skip to content

Commit

Permalink
Improve std server
Browse files Browse the repository at this point in the history
  • Loading branch information
Allenxuxu committed Dec 3, 2022
1 parent 7faaba8 commit 1d87764
Showing 1 changed file with 38 additions and 39 deletions.
77 changes: 38 additions & 39 deletions server_std.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build windows
// +build windows

package gev
Expand Down Expand Up @@ -28,7 +29,7 @@ type Handler interface {
type Server struct {
listener net.Listener
callback Handler
connections []*Connection
connections stdsync.Map

timingWheel *timingwheel.TimingWheel
opts *Options
Expand Down Expand Up @@ -71,33 +72,39 @@ func (s *Server) Start() {
sw := sync.WaitGroupWrapper{}
s.timingWheel.Start()

sw.AddAndRun(func() {
for {
select {
case <-s.dying:
return
if s.opts.NumLoops <= 0 {
s.opts.NumLoops = 1
}
for i := 0; i < s.opts.NumLoops; i++ {
sw.AddAndRun(func() {
for {
select {
case <-s.dying:
return

default:
conn, err := s.listener.Accept()
if err != nil {
log.Errorf("accept error: %v", err)
continue
default:
conn, err := s.listener.Accept()
if err != nil {
log.Errorf("accept error: %v", err)
continue
}

connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
s.connections.Store(connection, struct{}{})

sw.AddAndRun(func() {
connection.readLoop()
})
sw.AddAndRun(func() {
connection.writeLoop()
})
sw.AddAndRun(func() {
s.callback.OnConnect(connection)
})
}

connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
s.connections = append(s.connections, connection)

s.callback.OnConnect(connection)

sw.AddAndRun(func() {
connection.readLoop()
})
sw.AddAndRun(func() {
connection.writeLoop()
})
}
}
})
})
}

s.running.Set(true)

Expand All @@ -116,9 +123,13 @@ func (s *Server) Stop() {
log.Error(err)
}

for _, c := range s.connections {
s.connections.Range(func(key, value interface{}) bool {
c := key.(*Connection)
c.Close()
}

return true
})

}
}

Expand Down Expand Up @@ -253,8 +264,6 @@ func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error {
// Close 关闭连接
func (c *Connection) Close() error {
if c.connected.Get() {
log.Info("Close ", c.PeerAddr())

close(c.dying)
c.connected.Set(false)
c.callBack.OnClose(c)
Expand All @@ -272,9 +281,6 @@ func (c *Connection) Close() error {

// ShutdownWrite 关闭可写端,等待读取完接收缓冲区所有数据
func (c *Connection) ShutdownWrite() error {
log.Info("ShutdownWrite ", c.PeerAddr())

//return nil
return c.Close()
}

Expand Down Expand Up @@ -302,7 +308,6 @@ func (c *Connection) readLoop() {
return

default:
//c.conn.SetReadDeadline(time.Now().Add(time.Second))
n, err := c.conn.Read(buf)
if err != nil {
if err != io.EOF {
Expand Down Expand Up @@ -345,8 +350,6 @@ func (c *Connection) writeLoop() {
continue
}

c.conn.SetWriteDeadline(time.Now().Add(time.Second))

first, end := c.outBuffer.PeekAll()
n, err := c.conn.Write(first)
if err != nil {
Expand Down Expand Up @@ -429,14 +432,10 @@ func (c *Connection) closeTimeoutConn() func() {
return func() {
now := time.Now()
intervals := now.Sub(time.Unix(c.activeTime.Get(), 0))
log.Info("closeTimeoutConn ", intervals)

if intervals >= c.idleTime {
log.Info("closeTimeoutConn ", c.conn.RemoteAddr())
_ = c.Close()
} else {
log.Info("timingWheel.AfterFunc ", c.idleTime-intervals)

timer := c.timingWheel.AfterFunc(c.idleTime-intervals, c.closeTimeoutConn())
c.timer.Store(timer)
}
Expand Down

0 comments on commit 1d87764

Please sign in to comment.