Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.2.2 #30

Merged
merged 7 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {
ev := &e.events[i]
conn := e.parent.getConn(int(ev.Fd))
if conn == nil {
e.getMultiEventLoop().Logger.Debug("ev.Fd get conn is nil", "fd", ev.Fd)
unix.Close(int(ev.Fd))
continue
}
Expand Down Expand Up @@ -224,14 +225,18 @@ func (e *epollState) apiPoll(tv time.Duration) (retVal int, err error) {

func (e *epollState) process(conn *Conn, isRead, isWrite bool) bool {
if isRead {

e.getMultiEventLoop().addReadEvNum()
err := conn.processWebsocketFrame()
if err != nil {
e.getMultiEventLoop().Logger.Info("processWebsocketFrame", "err", err.Error())
conn.closeWithLock(err)
return true
}
}

if isWrite {
e.getMultiEventLoop().addWriteEvNum()
// 刷新下直接写入失败的数据
conn.flushOrClose()
}
Expand Down
5 changes: 4 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func ClientOptionToConf(opts ...ClientOption) *DialOption {
for _, o := range opts {
o(&dial)
}
dial.defaultSettingAfter()
return &dial
}

Expand Down Expand Up @@ -87,6 +88,7 @@ func Dial(rawUrl string, opts ...ClientOption) (*Conn, error) {
o(&dial)
}

dial.defaultSettingAfter()
return dial.Dial()
}

Expand Down Expand Up @@ -248,7 +250,8 @@ func (d *DialOption) Dial() (wsCon *Conn, err error) {
return nil, err
}
wsCon.pd = pd
d.Callback.OnOpen(wsCon)
wsCon.Callback = d.cb
wsCon.OnOpen(wsCon)
if br.Buffered() > 0 {
b, err := br.Peek(br.Buffered())
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions common_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// 0. CallbackFunc
func WithClientCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ClientOption {
return func(o *DialOption) {
o.Callback = &funcToCallback{
o.cb = &funcToCallback{
onOpen: open,
onMessage: m,
onClose: c,
Expand All @@ -32,7 +32,7 @@ func WithClientCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) Cli
// 配置服务端回调函数
func WithServerCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) ServerOption {
return func(o *ConnOption) {
o.Callback = &funcToCallback{
o.cb = &funcToCallback{
onOpen: open,
onMessage: m,
onClose: c,
Expand All @@ -44,14 +44,14 @@ func WithServerCallbackFunc(open OnOpenFunc, m OnMessageFunc, c OnCloseFunc) Ser
// 配置客户端callback
func WithClientCallback(cb Callback) ClientOption {
return func(o *DialOption) {
o.Callback = cb
o.cb = cb
}
}

// 配置服务端回调函数
func WithServerCallback(cb Callback) ServerOption {
return func(o *ConnOption) {
o.Callback = cb
o.cb = cb
}
}

Expand Down Expand Up @@ -88,14 +88,14 @@ func WithClientEnableUTF8Check() ClientOption {
// 仅仅配置OnMessae函数
func WithServerOnMessageFunc(cb OnMessageFunc) ServerOption {
return func(o *ConnOption) {
o.Callback = OnMessageFunc(cb)
o.cb = OnMessageFunc(cb)
}
}

// 仅仅配置OnMessae函数
func WithClientOnMessageFunc(cb OnMessageFunc) ClientOption {
return func(o *DialOption) {
o.Callback = OnMessageFunc(cb)
o.cb = OnMessageFunc(cb)
}
}

Expand Down Expand Up @@ -239,14 +239,14 @@ func WithClientReadTimeout(t time.Duration) ClientOption {
// 17.1 配置服务端OnClose
func WithServerOnCloseFunc(onClose func(c *Conn, err error)) ServerOption {
return func(o *ConnOption) {
o.Callback = OnCloseFunc(onClose)
o.cb = OnCloseFunc(onClose)
}
}

// 17.2 配置客户端OnClose
func WithClientOnCloseFunc(onClose func(c *Conn, err error)) ClientOption {
return func(o *DialOption) {
o.Callback = OnCloseFunc(onClose)
o.cb = OnCloseFunc(onClose)
}
}

Expand Down
19 changes: 16 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ import (
"github.com/antlabs/wsutil/deflate"
)

// Config 配置
// 有两种方式可以配置相关值
// 1. NewUpgrade, 这通常在只初始化一次的时候使用
// 2. greatws.Upgrade(), 这通常在每次请求的时候使用,每个语法的配置参数不一样
// 这样可以方便的在两种方式中使用, 不需要担心配置参数会有并发修改的情况
type Config struct {
Callback
cb Callback // 静态配置
deflate.PermessageDeflateConf // 静态配置, 从WithXXX函数中获取
tcpNoDelay bool //TODO: 加下这个功能
tcpNoDelay bool // TODO: 加下这个功能
replyPing bool // 开启自动回复
ignorePong bool // 忽略pong消息
disableBufioClearHack bool // 关闭bufio的clear hack优化
Expand All @@ -45,7 +50,7 @@ type Config struct {

// 默认设置
func (c *Config) defaultSetting() {
c.Callback = &DefCallback{}
c.cb = &DefCallback{}
c.maxDelayWriteNum = 10
c.windowsMultipleTimesPayloadSize = 1.0
c.delayWriteInitBufferSize = 8 * 1024
Expand All @@ -56,3 +61,11 @@ func (c *Config) defaultSetting() {
c.utf8Check = func(b []byte) bool { return true }
c.runInGoTask = "stream2" //默认使用stream2模块
}

func (c *Config) defaultSettingAfter() {

if c.multiEventLoop == nil {
c.multiEventLoop = getDefaultMultiEventLoop()
}
c.multiEventLoop.Start()
}
4 changes: 4 additions & 0 deletions conn_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,5 +664,9 @@ func (c *Conn) WritePong(data []byte) (err error) {
}

func (c *Conn) Close() {
if c == nil {
return
}

c.closeWithLock(nil)
}
6 changes: 4 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ func Test_Conn(t *testing.T) {
// 在未加入tls功能时,Conn的大小为160字节够用了。
fmt.Printf("%d\n", unsafe.Sizeof(Conn{}))
// conn大小改变历史
// 新增上下文接管,从< 160到184
if unsafe.Sizeof(Conn{}) > 184 {
// 新增上下文接管,从 小于160到184
// 把Callback移到Conn, 从184到200
fmt.Printf("conn.size = %d\n", unsafe.Sizeof(conn{}))
if unsafe.Sizeof(Conn{}) > 200 {
t.Errorf("Conn size:%d is too large", unsafe.Sizeof(Conn{}))
}
})
Expand Down
65 changes: 35 additions & 30 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ var (
type Conn struct {
conn

pd deflate.PermessageDeflateConf // 上下文接管的控制参数, 由于每个comm的配置都可能不一样,所以需要放在Conn里面
mu sync.Mutex // 锁
*Config // 配置
deCtx *deflate.DeCompressContextTakeover // 解压缩上下文
enCtx *deflate.CompressContextTakeover // 压缩上下文
parent *EventLoop // event loop
task driver.TaskExecutor // 任务,该任务会进协程池里面执行
rtime *time.Timer // 控制读超时
wtime *time.Timer // 控制写超时
Callback // callback移至conn中
pd deflate.PermessageDeflateConf // 上下文接管的控制参数, 由于每个comm的配置都可能不一样,所以需要放在Conn里面
mu sync.Mutex // 锁
*Config // 配置
deCtx *deflate.DeCompressContextTakeover // 解压缩上下文
enCtx *deflate.CompressContextTakeover // 压缩上下文
parent *EventLoop // event loop
task driver.TaskExecutor // 任务,该任务会进协程池里面执行
rtime *time.Timer // 控制读超时
wtime *time.Timer // 控制写超时

// mu2由 onCloseOnce使用, 这里使用新锁只是为了简化维护的难度
// 也可以共用mu,区别 优点:节约内存,缺点:容易出现死锁和需要精心调试代码
Expand Down Expand Up @@ -120,45 +121,43 @@ func duplicateSocket(socketFD int) (int, error) {
}

// 没有加锁的版本,有外层已经有锁保护,所以不需要加锁
func (c *Conn) closeInnerWithOnClose(err error, onClose bool) {
func (c *Conn) closeWithoutLockOnClose(err error, onClose bool) {

if c.isClosed() {
return
}

if !c.isClosed() {

if err != nil {
err = io.EOF
}
fd := c.getFd()
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
c.parent.del(c)
atomic.StoreInt64(&c.fd, -1)
atomic.StoreInt32(&c.closed, 1)
if err != nil {
err = io.EOF
}
fd := c.getFd()
c.getLogger().Debug("close conn", slog.Int64("fd", int64(fd)))
c.parent.del(c)
atomic.StoreInt64(&c.fd, -1)
atomic.StoreInt32(&c.closed, 1)

// 这个必须要放在后面
if onClose {
c.onCloseOnce.Do(&c.mu2, func() {
c.OnClose(c, err)
})
if c.task != nil {
c.task.Close(nil)
}
}

}

func (c *Conn) closeInner(err error) {
func (c *Conn) closeWithoutLock(err error) {

c.closeInnerWithOnClose(err, true)
c.closeWithoutLockOnClose(err, true)
}

func (c *Conn) closeWithLock(err error) {
if c.isClosed() {
return
}

c.task.Close(&c.mu)

c.mu.Lock()
if c.isClosed() {
c.mu.Unlock()
Expand All @@ -168,7 +167,7 @@ func (c *Conn) closeWithLock(err error) {
if err == nil {
err = io.EOF
}
c.closeInnerWithOnClose(err, false)
c.closeWithoutLockOnClose(err, false)

c.mu.Unlock()

Expand Down Expand Up @@ -263,7 +262,7 @@ func (c *Conn) maybeWriteAll(b []byte) (total int, ws writeState, err error) {
}

c.getLogger().Error("writeOrAddPoll", "err", err.Error(), slog.Int64("fd", c.fd), slog.Int("b.len", len(b)))
c.closeInner(err)
c.closeWithoutLock(err)
return total, writeDefault, err
}

Expand Down Expand Up @@ -355,9 +354,10 @@ func (c *Conn) processWebsocketFrame() (err error) {
}

if c.readTimeout > 0 {
if err = c.setReadDeadline(time.Now().Add(c.readTimeout)); err != nil {
return err
}
// if err = c.setReadDeadline(time.Time{}); err != nil {
// return err
// }
c.setReadDeadline(time.Now().Add(c.readTimeout))
}
n := 0
var success bool
Expand Down Expand Up @@ -493,9 +493,14 @@ func (c *Conn) setDeadlineInner(t **time.Timer, tm time.Time, err error) error {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
// c.getLogger().Error("Conn-lock", "addr", uintptr(unsafe.Pointer(c)))
defer func() {
// c.getLogger().Error("Conn-unlock", "addr", uintptr(unsafe.Pointer(c)))
c.mu.Unlock()
}()
if tm.IsZero() {
if *t != nil {
// c.getLogger().Error("conn-reset", "addr", uintptr(unsafe.Pointer(c)))
(*t).Stop()
*t = nil
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
module github.com/antlabs/greatws

go 1.21
go 1.21.1

require (
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c
github.com/antlabs/wsutil v0.1.10
golang.org/x/sys v0.12.0
golang.org/x/sys v0.20.0
)

require github.com/klauspost/compress v1.17.8 // indirect
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/antlabs/wsutil v0.1.9 h1:om+dMBbJnfq9aNeCv7A3ip8+McH9UaTjPLEOovyokLI=
github.com/antlabs/wsutil v0.1.9/go.mod h1:Pk7xYOw3o5iEB6ukiOu+2uJMLYeMVVjJLazFD3okI2A=
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c h1:VHYsrGiynSH8kkauv8Gl+Xd9j883cyNJPUj6m7jEg44=
github.com/antlabs/cpuproc v0.0.0-20240615150837-aa4bcf33806c/go.mod h1:xRJWwB0CjmTPmZ+I92DKy1N6N/7/IqMC9IPNLbE7Je4=
github.com/antlabs/wsutil v0.1.10 h1:86p67dG8/iiQ+yZrHVl73OPHGnXfXopFSU0w84fLOdE=
github.com/antlabs/wsutil v0.1.10/go.mod h1:Pk7xYOw3o5iEB6ukiOu+2uJMLYeMVVjJLazFD3okI2A=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Loading
Loading