Skip to content

Commit

Permalink
add some feature
Browse files Browse the repository at this point in the history
  • Loading branch information
PotatoCloud committed Aug 11, 2023
1 parent 6aa516a commit 2c3ffd0
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 24 deletions.
2 changes: 2 additions & 0 deletions proto/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) {
}

req := &Request[T]{
Context: context.Background(),
OpCode: params.OpCode,
Writer: params.Writer,
Conn: params.WsConn,
Expand Down Expand Up @@ -142,6 +143,7 @@ func (e *Engine[T, K]) handler(params *qWebsocket.HandlerParams) {
}()

handler(req)

}

func (e *Engine[T, K]) UseHandler() qWebsocket.HandlerFunc {
Expand Down
2 changes: 2 additions & 0 deletions proto/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package proto

import (
"bytes"
"context"
qWebsocket "github.com/RealFax/q-websocket"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"io"
)

type Request[T any] struct {
Context context.Context
OpCode ws.OpCode
Writer io.Writer
Conn *qWebsocket.Conn
Expand Down
40 changes: 21 additions & 19 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type Server struct {
connNum int64
connNum atomic.Int64
addr string

ctx context.Context
Expand Down Expand Up @@ -79,13 +79,13 @@ func (s *Server) setupTimeoutHandler() {
reason ttlcache.EvictionReason,
item *ttlcache.Item[string, gnet.Conn],
) {
atomic.AddInt64(&s.connNum, -1)
s.connNum.Add(-1)
upgraderConn, ok := item.Value().Context().(*Conn)
if !ok {
item.Value().Close()
_ = item.Value().Close()
return
}
s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout"))
_ = s.closeWS(upgraderConn, ws.StatusGoingAway, errors.New("timeout"))
})

// start monitor connect ttl
Expand All @@ -97,7 +97,7 @@ func (s *Server) setupTimeoutHandler() {
}

func (s *Server) Online() int64 {
return atomic.LoadInt64(&s.connNum)
return s.connNum.Load()
}

func (s *Server) CountConnections() int {
Expand All @@ -108,6 +108,13 @@ func (s *Server) Stop(ctx context.Context) error {
return s.engine.Stop(ctx)
}

func (s *Server) ListenAndServe(opts ...gnet.Option) error {
s.setupTimeoutHandler()
return gnet.Run(s, s.addr, opts...)
}

// ---- gnet event handler ----

func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
log.Printf("[+] Listen addr: %s", s.addr)
s.engine = eng
Expand All @@ -117,14 +124,14 @@ func (s *Server) OnBoot(eng gnet.Engine) gnet.Action {
func (s *Server) OnShutdown(_ gnet.Engine) {}

func (s *Server) OnOpen(c gnet.Conn) ([]byte, gnet.Action) {
atomic.AddInt64(&s.connNum, 1)
s.connNum.Add(1)
// monitor conn timeout
s.keepConnTable.Set(c.RemoteAddr().String(), c, ttlcache.DefaultTTL)
return nil, gnet.None
}

func (s *Server) OnClose(c gnet.Conn, _ error) gnet.Action {
atomic.AddInt64(&s.connNum, -1)
s.connNum.Add(-1)
// conn closed, remove conn in monitor list
s.keepConnTable.Delete(c.RemoteAddr().String())
return gnet.None
Expand All @@ -149,13 +156,13 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
}

// trying upgrader conn
if !upgraderConn.successUpgraded {
if !upgraderConn.successUpgraded.Load() {
if _, err := s.upgrader.Upgrade(upgraderConn); err != nil {
log.Printf("[-] upgrade error: %s, remote: %s\n", err.Error(), c.RemoteAddr())
s.closeWS(upgraderConn, ws.StatusProtocolError, err)
_ = s.closeWS(upgraderConn, ws.StatusProtocolError, err)
return gnet.Close
}
upgraderConn.successUpgraded = true
upgraderConn.successUpgraded.Store(true)
upgraderConn.UpdateActive()
return gnet.None
}
Expand All @@ -164,7 +171,7 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
messages, err := wsutil.ReadClientMessage(upgraderConn, nil)
if err != nil {
log.Printf("[-] read client message error: %s, remote: %s\n", err.Error(), c.RemoteAddr())
s.closeWS(upgraderConn, ws.StatusUnsupportedData, err)
_ = s.closeWS(upgraderConn, ws.StatusUnsupportedData, err)
return gnet.Close
}

Expand All @@ -173,13 +180,13 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
switch message.OpCode {
case ws.OpPing:
// async handle
s.workerPool.Submit(func() {
_ = s.workerPool.Submit(func() {
s.onPingHandler(upgraderConn)
})
upgraderConn.UpdateActive()
case ws.OpText, ws.OpBinary:
// async handle
s.workerPool.Submit(func() {
_ = s.workerPool.Submit(func() {
s.handler(&HandlerParams{
OpCode: message.OpCode,
Request: message.Payload,
Expand All @@ -189,19 +196,14 @@ func (s *Server) OnTraffic(c gnet.Conn) gnet.Action {
})
upgraderConn.UpdateActive()
case ws.OpClose:
s.closeWS(upgraderConn, ws.StatusNormalClosure, nil)
_ = s.closeWS(upgraderConn, ws.StatusNormalClosure, nil)
return gnet.Close
}

}
return gnet.None
}

func (s *Server) ListenAndServe(opts ...gnet.Option) error {
s.setupTimeoutHandler()
return gnet.Run(s, s.addr, opts...)
}

func NewServer(addr string, opts ...OptionFunc) *Server {
s := &Server{
addr: addr,
Expand Down
12 changes: 7 additions & 5 deletions upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
// Conn is upgraded websocket conn
type Conn struct {
gnet.Conn
LastActive int64 // atomic
successUpgraded bool
LastActive *atomic.Int64 // atomic
successUpgraded *atomic.Bool
ID string

ctx context.Context
Expand All @@ -32,14 +32,16 @@ func (c *Conn) SetContext(ctx context.Context) {
}

func (c *Conn) UpdateActive() {
atomic.StoreInt64(&c.LastActive, time.Now().Unix())
c.LastActive.Store(time.Now().Unix())
}

func NewUpgraderConn(conn gnet.Conn) *Conn {
lastActive := &atomic.Int64{}
lastActive.Store(time.Now().Unix())
return &Conn{
Conn: conn,
LastActive: time.Now().Unix(),
successUpgraded: false,
LastActive: lastActive,
successUpgraded: &atomic.Bool{},
ID: uuid.New().String(),
ctx: context.Background(),
}
Expand Down

0 comments on commit 2c3ffd0

Please sign in to comment.