diff --git a/proto/engine.go b/proto/engine.go index dcde9ea..04b3cba 100644 --- a/proto/engine.go +++ b/proto/engine.go @@ -99,6 +99,7 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) { } req := &Request[T]{ + Context: context.Background(), OpCode: params.OpCode, Writer: params.Writer, Conn: params.WsConn, @@ -142,6 +143,7 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) { }() handler(req) + } func (e *Engine[T, K]) UseHandler() qWebsocket.HandlerFunc { diff --git a/proto/request.go b/proto/request.go index 72ad70c..7d515fc 100644 --- a/proto/request.go +++ b/proto/request.go @@ -2,6 +2,7 @@ package proto import ( "bytes" + "context" qWebsocket "github.com/RealFax/q-websocket" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" @@ -9,6 +10,7 @@ import ( ) type Request[T any] struct { + Context context.Context OpCode ws.OpCode Writer io.Writer Conn *qWebsocket.Conn diff --git a/server.go b/server.go index c25125a..02dc960 100644 --- a/server.go +++ b/server.go @@ -14,7 +14,7 @@ import ( ) type Server struct { - connNum int64 + connNum atomic.Int64 addr string ctx context.Context @@ -79,13 +79,13 @@ func (s *Server) setupTimeoutHandler() { reason ttlcache.EvictionReason, item *ttlcache.Item[string, gnet.Conn], ) { - atomic.AddInt64(&s.connNum, -1) + s.connNum.Add(-1) upgraderConn, ok := item.Value().Context().(*Conn) if !ok { - item.Value().Close() + _ = item.Value().Close() return } - s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout")) + _ = s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout")) }) // start monitor connect ttl @@ -97,7 +97,7 @@ func (s *Server) setupTimeoutHandler() { } func (s *Server) Online() int64 { - return atomic.LoadInt64(&s.connNum) + return s.connNum.Load() } func (s *Server) CountConnections() int { @@ -108,6 +108,13 @@ func (s *Server) Stop(ctx context.Context) error { return s.engine.Stop(ctx) } +func (s *Server) ListenAndServe(opts ...gnet.Option) error { + s.setupTimeoutHandler() + return gnet.Run(s, s.addr, opts...) +} + +// ---- gnet event handler ---- + func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { log.Printf("[+] Listen addr: %s", s.addr) s.engine = eng @@ -117,14 +124,14 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action { func (s *Server) OnShutdown(_ gnet.Engine) {} func (s *Server) OnOpen(c gnet.Conn) ([]byte, gnet.Action) { - atomic.AddInt64(&s.connNum, 1) + s.connNum.Add(1) // monitor conn timeout s.keepConnTable.Set(c.RemoteAddr().String(), c, ttlcache.DefaultTTL) return nil, gnet.None } func (s *Server) OnClose(c gnet.Conn, _ error) gnet.Action { - atomic.AddInt64(&s.connNum, -1) + s.connNum.Add(-1) // conn closed, remove conn in monitor list s.keepConnTable.Delete(c.RemoteAddr().String()) return gnet.None @@ -149,13 +156,13 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { } // trying upgrader conn - if !upgraderConn.successUpgraded { + if !upgraderConn.successUpgraded.Load() { if _, err := s.upgrader.Upgrade(upgraderConn); err != nil { log.Printf("[-] upgrade error: %s, remote: %s\n", err.Error(), c.RemoteAddr()) - s.closeWS(upgraderConn, ws.StatusProtocolError, err) + _ = s.closeWS(upgraderConn, ws.StatusProtocolError, err) return gnet.Close } - upgraderConn.successUpgraded = true + upgraderConn.successUpgraded.Store(true) upgraderConn.UpdateActive() return gnet.None } @@ -164,7 +171,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { messages, err := wsutil.ReadClientMessage(upgraderConn, nil) if err != nil { log.Printf("[-] read client message error: %s, remote: %s\n", err.Error(), c.RemoteAddr()) - s.closeWS(upgraderConn, ws.StatusUnsupportedData, err) + _ = s.closeWS(upgraderConn, ws.StatusUnsupportedData, err) return gnet.Close } @@ -173,13 +180,13 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { switch message.OpCode { case ws.OpPing: // async handle - s.workerPool.Submit(func() { + _ = s.workerPool.Submit(func() { s.onPingHandler(upgraderConn) }) upgraderConn.UpdateActive() case ws.OpText, ws.OpBinary: // async handle - s.workerPool.Submit(func() { + _ = s.workerPool.Submit(func() { s.handler(&HandlerParams{ OpCode: message.OpCode, Request: message.Payload, @@ -189,7 +196,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { }) upgraderConn.UpdateActive() case ws.OpClose: - s.closeWS(upgraderConn, ws.StatusNormalClosure, nil) + _ = s.closeWS(upgraderConn, ws.StatusNormalClosure, nil) return gnet.Close } @@ -197,11 +204,6 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } -func (s *Server) ListenAndServe(opts ...gnet.Option) error { - s.setupTimeoutHandler() - return gnet.Run(s, s.addr, opts...) -} - func NewServer(addr string, opts ...OptionFunc) *Server { s := &Server{ addr: addr, diff --git a/upgrader.go b/upgrader.go index a032c48..fd29a8f 100644 --- a/upgrader.go +++ b/upgrader.go @@ -16,8 +16,8 @@ var ( // Conn is upgraded websocket conn type Conn struct { gnet.Conn - LastActive int64 // atomic - successUpgraded bool + LastActive *atomic.Int64 // atomic + successUpgraded *atomic.Bool ID string ctx context.Context @@ -32,14 +32,16 @@ func (c *Conn) SetContext(ctx context.Context) { } func (c *Conn) UpdateActive() { - atomic.StoreInt64(&c.LastActive, time.Now().Unix()) + c.LastActive.Store(time.Now().Unix()) } func NewUpgraderConn(conn gnet.Conn) *Conn { + lastActive := &atomic.Int64{} + lastActive.Store(time.Now().Unix()) return &Conn{ Conn: conn, - LastActive: time.Now().Unix(), - successUpgraded: false, + LastActive: lastActive, + successUpgraded: &atomic.Bool{}, ID: uuid.New().String(), ctx: context.Background(), }