From e800fbd0a717bb428de08969a8b4f761f9d64087 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Tue, 23 Apr 2024 11:17:49 +0800 Subject: [PATCH 1/6] opt: reduce duplicate code for I/O reactors (#590) --- ...tor_epoll_default.go => reactor_default.go | 23 +++- reactor_epoll_ultimate.go | 84 -------------- reactor_kqueue_default.go | 109 ------------------ ..._kqueue_ultimate.go => reactor_ultimate.go | 4 +- 4 files changed, 19 insertions(+), 201 deletions(-) rename reactor_epoll_default.go => reactor_default.go (68%) delete mode 100644 reactor_epoll_ultimate.go delete mode 100644 reactor_kqueue_default.go rename reactor_kqueue_ultimate.go => reactor_ultimate.go (93%) diff --git a/reactor_epoll_default.go b/reactor_default.go similarity index 68% rename from reactor_epoll_default.go rename to reactor_default.go index 4f150e6ec..a8935c879 100644 --- a/reactor_epoll_default.go +++ b/reactor_default.go @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux && !poll_opt -// +build linux,!poll_opt +//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt +// +build linux freebsd dragonfly netbsd openbsd darwin +// +build !poll_opt package gnet @@ -53,8 +54,13 @@ func (el *eventloop) orbit() error { err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { - // Somehow epoll notified with an event for a stale fd that is not in our connection set. - // We need to delete it from the epoll set. + // For kqueue, this might happen when the connection has already been closed, + // the file descriptor will be deleted from kqueue automatically as documented + // in the manual pages. + // For epoll, it somehow notified with an event for a stale fd that is not in + // our connection set. We need to explicitly delete it from the epoll set. + // Also print a warning log for this kind of irregularity. + el.getLogger().Warnf("received event[fd=%d|ev=%d|flags=%d] of a stale connection from event-loop(%d)", fd, ev, flags, el.idx) return el.poller.Delete(fd) } return c.processIO(fd, ev, flags) @@ -84,8 +90,13 @@ func (el *eventloop) run() error { if _, ok := el.listeners[fd]; ok { return el.accept(fd, ev, flags) } - // Somehow epoll notified with an event for a stale fd that is not in our connection set. - // We need to delete it from the epoll set. + // For kqueue, this might happen when the connection has already been closed, + // the file descriptor will be deleted from kqueue automatically as documented + // in the manual pages. + // For epoll, it somehow notified with an event for a stale fd that is not in + // our connection set. We need to explicitly delete it from the epoll set. + // Also print a warning log for this kind of irregularity. + el.getLogger().Warnf("received event[fd=%d|ev=%d|flags=%d] of a stale connection from event-loop(%d)", fd, ev, flags, el.idx) return el.poller.Delete(fd) } return c.processIO(fd, ev, flags) diff --git a/reactor_epoll_ultimate.go b/reactor_epoll_ultimate.go deleted file mode 100644 index 2fd3dd693..000000000 --- a/reactor_epoll_ultimate.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2021 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build linux && poll_opt -// +build linux,poll_opt - -package gnet - -import ( - "errors" - "runtime" - - errorx "github.com/panjf2000/gnet/v2/pkg/errors" -) - -func (el *eventloop) rotate() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling() - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) - err = nil - } else if err != nil { - el.getLogger().Errorf("main reactor is exiting due to error: %v", err) - } - - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) orbit() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling() - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) run() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling() - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go deleted file mode 100644 index 7426c74e0..000000000 --- a/reactor_kqueue_default.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright (c) 2019 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt -// +build freebsd dragonfly netbsd openbsd darwin -// +build !poll_opt - -package gnet - -import ( - "errors" - "runtime" - - "github.com/panjf2000/gnet/v2/internal/netpoll" - errorx "github.com/panjf2000/gnet/v2/pkg/errors" -) - -func (el *eventloop) rotate() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(el.accept0) - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) - err = nil - } else if err != nil { - el.getLogger().Errorf("main reactor is exiting due to error: %v", err) - } - - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) orbit() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { - c := el.connections.getConn(fd) - if c == nil { - // This might happen when the connection has already been closed, - // the file descriptor will be deleted from kqueue automatically - // as documented in the manual pages, So we just print a warning log. - el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) - return nil - } - return c.processIO(fd, filter, flags) - }) - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} - -func (el *eventloop) run() error { - if el.engine.opts.LockOSThread { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { - c := el.connections.getConn(fd) - if c == nil { - if _, ok := el.listeners[fd]; ok { - return el.accept(fd, filter, flags) - } - // This might happen when the connection has already been closed, - // the file descriptor will be deleted from kqueue automatically - // as documented in the manual pages, So we just print a warning log. - el.getLogger().Warnf("received event[fd=%d|filter=%d|flags=%d] of a stale connection from event-loop(%d)", fd, filter, flags, el.idx) - return nil - } - return c.processIO(fd, filter, flags) - }) - if errors.Is(err, errorx.ErrEngineShutdown) { - el.getLogger().Debugf("event-loop(%d) is exiting in terms of the demand from user, %v", el.idx, err) - err = nil - } else if err != nil { - el.getLogger().Errorf("event-loop(%d) is exiting due to error: %v", el.idx, err) - } - - el.closeConns() - el.engine.shutdown(err) - - return err -} diff --git a/reactor_kqueue_ultimate.go b/reactor_ultimate.go similarity index 93% rename from reactor_kqueue_ultimate.go rename to reactor_ultimate.go index 93b998625..08ab1c8c6 100644 --- a/reactor_kqueue_ultimate.go +++ b/reactor_ultimate.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && poll_opt -// +build freebsd dragonfly netbsd openbsd darwin +//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && poll_opt +// +build linux freebsd dragonfly netbsd openbsd darwin // +build poll_opt package gnet From f6980ec948dd26839fcd210dbc18c76272633fe3 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 24 Apr 2024 20:28:45 +0800 Subject: [PATCH 2/6] opt: fix compilation errors on *BSD (#593) --- .github/workflows/cross-compile-bsd.yml | 85 ++++++++++++++++++++++ acceptor_unix.go | 4 +- client_test.go | 4 +- client_unix.go | 4 +- conn_map.go | 4 +- conn_matrix.go | 4 +- conn_matrix_test.go | 4 +- connection_bsd.go | 4 +- connection_unix.go | 4 +- engine_unix.go | 4 +- eventloop_unix.go | 4 +- eventloop_unix_test.go | 4 +- go.mod | 4 +- go.sum | 8 +- internal/io/io_bsd.go | 4 +- internal/netpoll/defs_bsd_32bit.go | 4 +- internal/netpoll/defs_bsd_64bit.go | 4 +- internal/netpoll/defs_poller.go | 7 +- internal/netpoll/defs_poller_bsd.go | 24 ++++++ internal/netpoll/defs_poller_epoll.go | 3 + internal/netpoll/defs_poller_kqueue.go | 7 +- internal/netpoll/defs_poller_netbsd.go | 21 ++++++ internal/netpoll/poller_epoll_default.go | 6 +- internal/netpoll/poller_epoll_ultimate.go | 18 ++--- internal/netpoll/poller_kqueue_default.go | 42 +++++------ internal/netpoll/poller_kqueue_ultimate.go | 55 ++++++-------- internal/netpoll/poller_kqueue_wakeup.go | 55 ++++++++++++++ internal/netpoll/poller_kqueue_wakeup1.go | 62 ++++++++++++++++ internal/netpoll/poller_unix_ultimate.go | 29 ++++++++ internal/socket/fd_unix.go | 4 +- internal/socket/sock_bsd.go | 4 +- internal/socket/sock_cloexec.go | 4 +- internal/socket/sock_posix.go | 4 +- internal/socket/socket.go | 4 +- internal/socket/sockopts_posix.go | 4 +- internal/socket/sockopts_unix.go | 4 +- internal/socket/sockopts_unix1.go | 4 +- internal/socket/socktoaddr.go | 4 +- internal/socket/sys_cloexec.go | 4 +- internal/socket/tcp_socket.go | 4 +- internal/socket/udp_socket.go | 4 +- internal/socket/unix_socket.go | 4 +- listener_unix.go | 4 +- os_unix_test.go | 4 +- reactor_default.go | 4 +- reactor_ultimate.go | 4 +- 46 files changed, 401 insertions(+), 149 deletions(-) create mode 100644 .github/workflows/cross-compile-bsd.yml create mode 100644 internal/netpoll/defs_poller_bsd.go create mode 100644 internal/netpoll/defs_poller_netbsd.go create mode 100644 internal/netpoll/poller_kqueue_wakeup.go create mode 100644 internal/netpoll/poller_kqueue_wakeup1.go create mode 100644 internal/netpoll/poller_unix_ultimate.go diff --git a/.github/workflows/cross-compile-bsd.yml b/.github/workflows/cross-compile-bsd.yml new file mode 100644 index 000000000..37a65afe2 --- /dev/null +++ b/.github/workflows/cross-compile-bsd.yml @@ -0,0 +1,85 @@ +name: Cross-compile for *BSD + +on: + push: + branches: + - master + - dev + - 1.x + paths-ignore: + - '**.md' + - '**.yml' + - '**.yaml' + - '!.github/workflows/cross-compile-bsd.yml' + pull_request: + branches: + - master + - dev + - 1.x + paths-ignore: + - '**.md' + - '**.yml' + - '**.yaml' + - '!.github/workflows/cross-compile-bsd.yml' + +env: + GO111MODULE: on + GOPROXY: "https://proxy.golang.org" + +jobs: + build: + strategy: + fail-fast: false + matrix: + go: ['1.17', '1.21'] + os: + - ubuntu-latest + name: Go ${{ matrix.go }} @ ${{ matrix.os }} + runs-on: ${{ matrix.os }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + ref: ${{ github.ref }} + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go }} + + - name: Print Go environment + id: go-env + run: | + printf "Using go at: $(which go)\n" + printf "Go version: $(go version)\n" + printf "\n\nGo environment:\n\n" + go env + printf "\n\nSystem environment:\n\n" + env + # Calculate the short SHA1 hash of the git commit + echo "SHORT_SHA=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT + echo "GO_CACHE=$(go env GOCACHE)" >> $GITHUB_OUTPUT + + - name: Cross-compiling for DragonFlyBSD + run: GOOS=dragonfly GOARCH=amd64 go build + + - name: Cross-compiling for DragonFlyBSD -tags=poll_opt,gc_opt + run: GOOS=dragonfly GOARCH=amd64 go build -tags=poll_opt,gc_opt + + - name: Cross-compiling for FreeBSD + run: GOOS=freebsd GOARCH=amd64 go build + + - name: Cross-compiling for FreeBSD -tags=poll_opt,gc_opt + run: GOOS=freebsd GOARCH=amd64 go build -tags=poll_opt,gc_opt + + - name: Cross-compiling for NetBSD + run: GOOS=netbsd GOARCH=amd64 go build + + - name: Cross-compiling for NetBSD -tags=poll_opt,gc_opt + run: GOOS=netbsd GOARCH=amd64 go build -tags=poll_opt,gc_opt + + - name: Cross-compiling for OpenBSD + run: GOOS=openbsd GOARCH=amd64 go build + + - name: Cross-compiling for OpenBSD -tags=poll_opt,gc_opt + run: GOOS=openbsd GOARCH=amd64 go build -tags=poll_opt,gc_opt diff --git a/acceptor_unix.go b/acceptor_unix.go index fe7d67cab..ae6e9a35e 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/client_test.go b/client_test.go index dccbf10dc..8b56f5494 100644 --- a/client_test.go +++ b/client_test.go @@ -1,5 +1,5 @@ -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin || windows -// +build linux freebsd dragonfly netbsd openbsd darwin windows +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || windows +// +build darwin dragonfly freebsd linux netbsd openbsd windows package gnet diff --git a/client_unix.go b/client_unix.go index 10bf99c07..597d73592 100644 --- a/client_unix.go +++ b/client_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/conn_map.go b/conn_map.go index e6e45a050..6b1b02aa1 100644 --- a/conn_map.go +++ b/conn_map.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && !gc_opt -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && !gc_opt +// +build darwin dragonfly freebsd linux netbsd openbsd // +build !gc_opt package gnet diff --git a/conn_matrix.go b/conn_matrix.go index 70ba4affa..6076a41fb 100644 --- a/conn_matrix.go +++ b/conn_matrix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && gc_opt -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && gc_opt +// +build darwin dragonfly freebsd linux netbsd openbsd // +build gc_opt package gnet diff --git a/conn_matrix_test.go b/conn_matrix_test.go index c081cfa67..c13e39e83 100644 --- a/conn_matrix_test.go +++ b/conn_matrix_test.go @@ -1,5 +1,5 @@ -//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && gc_opt -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && gc_opt +// +build darwin dragonfly freebsd linux netbsd openbsd // +build gc_opt package gnet diff --git a/connection_bsd.go b/connection_bsd.go index 3223f01e4..716a00180 100644 --- a/connection_bsd.go +++ b/connection_bsd.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || netbsd || openbsd +// +build darwin dragonfly freebsd netbsd openbsd package gnet diff --git a/connection_unix.go b/connection_unix.go index 101a51e96..1e59f5e77 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/engine_unix.go b/engine_unix.go index 18f46c89d..83a1ae864 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/eventloop_unix.go b/eventloop_unix.go index b7530a966..d2f1c5c22 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/eventloop_unix_test.go b/eventloop_unix_test.go index 25e8ae64e..b9cc2121e 100644 --- a/eventloop_unix_test.go +++ b/eventloop_unix_test.go @@ -1,5 +1,5 @@ -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/go.mod b/go.mod index 9c94e27ed..02ad938c6 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/valyala/bytebufferpool v1.0.0 go.uber.org/zap v1.21.0 // don't upgrade this one - golang.org/x/sync v0.6.0 - golang.org/x/sys v0.16.0 + golang.org/x/sync v0.7.0 + golang.org/x/sys v0.19.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) diff --git a/go.sum b/go.sum index 3a9505260..0823447e5 100644 --- a/go.sum +++ b/go.sum @@ -46,15 +46,15 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/internal/io/io_bsd.go b/internal/io/io_bsd.go index 661bb1606..c32109059 100644 --- a/internal/io/io_bsd.go +++ b/internal/io/io_bsd.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || netbsd || openbsd +// +build darwin dragonfly freebsd netbsd openbsd package io diff --git a/internal/netpoll/defs_bsd_32bit.go b/internal/netpoll/defs_bsd_32bit.go index 2b0fa0b66..1a42c4a76 100644 --- a/internal/netpoll/defs_bsd_32bit.go +++ b/internal/netpoll/defs_bsd_32bit.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && (386 || arm || mips || mipsle) -// +build freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || netbsd || openbsd) && (386 || arm || mips || mipsle) +// +build darwin dragonfly freebsd netbsd openbsd // +build 386 arm mips mipsle package netpoll diff --git a/internal/netpoll/defs_bsd_64bit.go b/internal/netpoll/defs_bsd_64bit.go index 1bcab7c35..7248bcceb 100644 --- a/internal/netpoll/defs_bsd_64bit.go +++ b/internal/netpoll/defs_bsd_64bit.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && (amd64 || arm64 || ppc64 || ppc64le || mips64 || mips64le || riscv64) -// +build freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || netbsd || openbsd) && (amd64 || arm64 || ppc64 || ppc64le || mips64 || mips64le || riscv64) +// +build darwin dragonfly freebsd netbsd openbsd // +build amd64 arm64 ppc64 ppc64le mips64 mips64le riscv64 package netpoll diff --git a/internal/netpoll/defs_poller.go b/internal/netpoll/defs_poller.go index 79f27d2ea..ed12ca80d 100644 --- a/internal/netpoll/defs_poller.go +++ b/internal/netpoll/defs_poller.go @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package netpoll -// IOFlags represents the flags of IO events. -type IOFlags = uint16 - // PollEventHandler is the callback for I/O events notified by the poller. type PollEventHandler func(int, IOEvent, IOFlags) error diff --git a/internal/netpoll/defs_poller_bsd.go b/internal/netpoll/defs_poller_bsd.go new file mode 100644 index 000000000..772e8e4e2 --- /dev/null +++ b/internal/netpoll/defs_poller_bsd.go @@ -0,0 +1,24 @@ +// Copyright (c) 2024 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || dragonfly || freebsd || openbsd +// +build darwin dragonfly freebsd openbsd + +package netpoll + +// IOFlags represents the flags of IO events. +type IOFlags = uint16 + +// IOEvent is the integer type of I/O events on BSD's. +type IOEvent = int16 diff --git a/internal/netpoll/defs_poller_epoll.go b/internal/netpoll/defs_poller_epoll.go index 81501524b..423439cbe 100644 --- a/internal/netpoll/defs_poller_epoll.go +++ b/internal/netpoll/defs_poller_epoll.go @@ -19,6 +19,9 @@ package netpoll import "golang.org/x/sys/unix" +// IOFlags represents the flags of IO events. +type IOFlags = uint16 + // IOEvent is the integer type of I/O events on Linux. type IOEvent = uint32 diff --git a/internal/netpoll/defs_poller_kqueue.go b/internal/netpoll/defs_poller_kqueue.go index 89ea29ae8..af7b0adda 100644 --- a/internal/netpoll/defs_poller_kqueue.go +++ b/internal/netpoll/defs_poller_kqueue.go @@ -12,16 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || netbsd || openbsd +// +build darwin dragonfly freebsd netbsd openbsd package netpoll import "golang.org/x/sys/unix" -// IOEvent is the integer type of I/O events on BSD's. -type IOEvent = int16 - const ( // InitPollEventsCap represents the initial capacity of poller event-list. InitPollEventsCap = 64 diff --git a/internal/netpoll/defs_poller_netbsd.go b/internal/netpoll/defs_poller_netbsd.go new file mode 100644 index 000000000..b29f5f9c3 --- /dev/null +++ b/internal/netpoll/defs_poller_netbsd.go @@ -0,0 +1,21 @@ +// Copyright (c) 2024 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +// IOEvent is the integer type of I/O events on BSD's. +type IOEvent = uint32 + +// IOFlags represents the flags of IO events. +type IOFlags = uint32 diff --git a/internal/netpoll/poller_epoll_default.go b/internal/netpoll/poller_epoll_default.go index 20b9cb213..12a094664 100644 --- a/internal/netpoll/poller_epoll_default.go +++ b/internal/netpoll/poller_epoll_default.go @@ -70,10 +70,8 @@ func OpenPoller() (poller *Poller, err error) { // Close closes the poller. func (p *Poller) Close() error { - if err := os.NewSyscallError("close", unix.Close(p.fd)); err != nil { - return err - } - return os.NewSyscallError("close", unix.Close(p.efd)) + _ = unix.Close(p.efd) + return os.NewSyscallError("close", unix.Close(p.fd)) } // Make the endianness of bytes compatible with more linux OSs under different processor-architectures, diff --git a/internal/netpoll/poller_epoll_ultimate.go b/internal/netpoll/poller_epoll_ultimate.go index 844914d8a..84d415f87 100644 --- a/internal/netpoll/poller_epoll_ultimate.go +++ b/internal/netpoll/poller_epoll_ultimate.go @@ -71,10 +71,8 @@ func OpenPoller() (poller *Poller, err error) { // Close closes the poller. func (p *Poller) Close() error { - if err := os.NewSyscallError("close", unix.Close(p.fd)); err != nil { - return err - } - return os.NewSyscallError("close", unix.Close(p.epa.FD)) + _ = unix.Close(p.epa.FD) + return os.NewSyscallError("close", unix.Close(p.fd)) } // Make the endianness of bytes compatible with more linux OSs under different processor-architectures, @@ -133,7 +131,7 @@ func (p *Poller) Polling() error { for i := 0; i < n; i++ { ev := &el.events[i] - pollAttachment := *(**PollAttachment)(unsafe.Pointer(&ev.data)) + pollAttachment := restorePollAttachment(unsafe.Pointer(&ev.data)) if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues. doChores = true } else { @@ -210,7 +208,7 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { if edgeTriggered { ev.events |= unix.EPOLLET } - *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa + convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } @@ -221,7 +219,7 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { if edgeTriggered { ev.events |= unix.EPOLLET } - *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa + convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } @@ -232,7 +230,7 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { if edgeTriggered { ev.events |= unix.EPOLLET } - *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa + convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) } @@ -243,7 +241,7 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { if edgeTriggered { ev.events |= unix.EPOLLET } - *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa + convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev)) } @@ -254,7 +252,7 @@ func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { if edgeTriggered { ev.events |= unix.EPOLLET } - *(**PollAttachment)(unsafe.Pointer(&ev.data)) = pa + convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev)) } diff --git a/internal/netpoll/poller_kqueue_default.go b/internal/netpoll/poller_kqueue_default.go index 4a04c3d80..c44be9027 100644 --- a/internal/netpoll/poller_kqueue_default.go +++ b/internal/netpoll/poller_kqueue_default.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt -// +build freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || netbsd || openbsd) && !poll_opt +// +build darwin dragonfly freebsd netbsd openbsd // +build !poll_opt package netpoll @@ -33,6 +33,7 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { fd int + pipe []int wakeupCall int32 asyncTaskQueue queue.AsyncTaskQueue // queue with low priority urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority @@ -47,14 +48,10 @@ func OpenPoller() (poller *Poller, err error) { err = os.NewSyscallError("kqueue", err) return } - if _, err = unix.Kevent(poller.fd, []unix.Kevent_t{{ - Ident: 0, - Filter: unix.EVFILT_USER, - Flags: unix.EV_ADD | unix.EV_CLEAR, - }}, nil, nil); err != nil { + if err = poller.addWakeupEvent(); err != nil { _ = poller.Close() poller = nil - err = os.NewSyscallError("kevent add|clear", err) + err = os.NewSyscallError("kevent | pipe2", err) return } poller.asyncTaskQueue = queue.NewLockFreeQueue() @@ -65,15 +62,13 @@ func OpenPoller() (poller *Poller, err error) { // Close closes the poller. func (p *Poller) Close() error { + if len(p.pipe) == 2 { + _ = unix.Close(p.pipe[0]) + _ = unix.Close(p.pipe[1]) + } return os.NewSyscallError("close", unix.Close(p.fd)) } -var note = []unix.Kevent_t{{ - Ident: 0, - Filter: unix.EVFILT_USER, - Fflags: unix.NOTE_TRIGGER, -}} - // Trigger enqueues task and wakes up the poller to process pending tasks. // By default, any incoming task will enqueued into urgentAsyncTaskQueue // before the threshold of high-priority events is reached. When it happens, @@ -91,11 +86,9 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + err = p.wakePoller() } - return os.NewSyscallError("kevent trigger", err) + return os.NewSyscallError("kevent | write", err) } // Polling blocks the current goroutine, waiting for network-events. @@ -123,6 +116,7 @@ func (p *Poller) Polling(callback PollEventHandler) error { ev := &el.events[i] if fd := int(ev.Ident); fd == 0 { // poller is awakened to run tasks in queues doChores = true + p.drainWakeupEvent() } else { switch err = callback(fd, ev.Filter, ev.Flags); err { case nil: @@ -162,9 +156,7 @@ func (p *Poller) Polling(callback PollEventHandler) error { } atomic.StoreInt32(&p.wakeupCall, 0) if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - switch _, err = unix.Kevent(p.fd, note, nil, nil); err { - case nil, unix.EAGAIN: - default: + if err = p.wakePoller(); err != nil { doChores = true } } @@ -180,7 +172,7 @@ func (p *Poller) Polling(callback PollEventHandler) error { // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { - var flags uint16 = unix.EV_ADD + var flags IOFlags = unix.EV_ADD if edgeTriggered { flags |= unix.EV_CLEAR } @@ -193,7 +185,7 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { // AddRead registers the given file-descriptor with readable event to the poller. func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { - var flags uint16 = unix.EV_ADD + var flags IOFlags = unix.EV_ADD if edgeTriggered { flags |= unix.EV_CLEAR } @@ -205,7 +197,7 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { // AddWrite registers the given file-descriptor with writable event to the poller. func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { - var flags uint16 = unix.EV_ADD + var flags IOFlags = unix.EV_ADD if edgeTriggered { flags |= unix.EV_CLEAR } @@ -225,7 +217,7 @@ func (p *Poller) ModRead(pa *PollAttachment, _ bool) error { // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { - var flags uint16 = unix.EV_ADD + var flags IOFlags = unix.EV_ADD if edgeTriggered { flags |= unix.EV_CLEAR } diff --git a/internal/netpoll/poller_kqueue_ultimate.go b/internal/netpoll/poller_kqueue_ultimate.go index 397dad052..fc50c6521 100644 --- a/internal/netpoll/poller_kqueue_ultimate.go +++ b/internal/netpoll/poller_kqueue_ultimate.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (freebsd || dragonfly || netbsd || openbsd || darwin) && poll_opt -// +build freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || netbsd || openbsd) && poll_opt +// +build darwin dragonfly freebsd netbsd openbsd // +build poll_opt package netpoll @@ -34,6 +34,7 @@ import ( // Poller represents a poller which is in charge of monitoring file-descriptors. type Poller struct { fd int + pipe []int wakeupCall int32 asyncTaskQueue queue.AsyncTaskQueue // queue with low priority urgentAsyncTaskQueue queue.AsyncTaskQueue // queue with high priority @@ -48,14 +49,10 @@ func OpenPoller() (poller *Poller, err error) { err = os.NewSyscallError("kqueue", err) return } - if _, err = unix.Kevent(poller.fd, []unix.Kevent_t{{ - Ident: 0, - Filter: unix.EVFILT_USER, - Flags: unix.EV_ADD | unix.EV_CLEAR, - }}, nil, nil); err != nil { + if err = poller.addWakeupEvent(); err != nil { _ = poller.Close() poller = nil - err = os.NewSyscallError("kevent add|clear", err) + err = os.NewSyscallError("kevent | pipe2", err) return } poller.asyncTaskQueue = queue.NewLockFreeQueue() @@ -66,15 +63,13 @@ func OpenPoller() (poller *Poller, err error) { // Close closes the poller. func (p *Poller) Close() error { + if len(p.pipe) == 2 { + _ = unix.Close(p.pipe[0]) + _ = unix.Close(p.pipe[1]) + } return os.NewSyscallError("close", unix.Close(p.fd)) } -var note = []unix.Kevent_t{{ - Ident: 0, - Filter: unix.EVFILT_USER, - Fflags: unix.NOTE_TRIGGER, -}} - // Trigger enqueues task and wakes up the poller to process pending tasks. // By default, any incoming task will enqueued into urgentAsyncTaskQueue // before the threshold of high-priority events is reached. When it happens, @@ -92,11 +87,9 @@ func (p *Poller) Trigger(priority queue.EventPriority, fn queue.TaskFunc, arg in p.urgentAsyncTaskQueue.Enqueue(task) } if atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - if _, err = unix.Kevent(p.fd, note, nil, nil); err == unix.EAGAIN { - err = nil - } + err = p.wakePoller() } - return os.NewSyscallError("kevent trigger", err) + return os.NewSyscallError("kevent | write", err) } // Polling blocks the current goroutine, waiting for network-events. @@ -124,8 +117,9 @@ func (p *Poller) Polling() error { ev := &el.events[i] if ev.Ident == 0 { // poller is awakened to run tasks in queues doChores = true + p.drainWakeupEvent() } else { - pollAttachment := (*PollAttachment)(unsafe.Pointer(ev.Udata)) + pollAttachment := restorePollAttachment(unsafe.Pointer(&ev.Udata)) switch err = pollAttachment.Callback(int(ev.Ident), ev.Filter, ev.Flags); err { case nil: case errors.ErrAcceptSocket, errors.ErrEngineShutdown: @@ -164,9 +158,7 @@ func (p *Poller) Polling() error { } atomic.StoreInt32(&p.wakeupCall, 0) if (!p.asyncTaskQueue.IsEmpty() || !p.urgentAsyncTaskQueue.IsEmpty()) && atomic.CompareAndSwapInt32(&p.wakeupCall, 0, 1) { - switch _, err = unix.Kevent(p.fd, note, nil, nil); err { - case nil, unix.EAGAIN: - default: + if err = p.wakePoller(); err != nil { doChores = true } } @@ -184,12 +176,12 @@ func (p *Poller) Polling() error { func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [2]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) + evs[0].Filter = unix.EVFILT_READ evs[0].Flags = unix.EV_ADD if edgeTriggered { evs[0].Flags |= unix.EV_CLEAR } - evs[0].Filter = unix.EVFILT_READ - evs[0].Udata = (*byte)(unsafe.Pointer(pa)) + convertPollAttachment(unsafe.Pointer(&evs[0].Udata), pa) evs[1] = evs[0] evs[1].Filter = unix.EVFILT_WRITE _, err := unix.Kevent(p.fd, evs[:], nil, nil) @@ -200,12 +192,12 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) + evs[0].Filter = unix.EVFILT_READ evs[0].Flags = unix.EV_ADD if edgeTriggered { evs[0].Flags |= unix.EV_CLEAR } - evs[0].Filter = unix.EVFILT_READ - evs[0].Udata = (*byte)(unsafe.Pointer(pa)) + convertPollAttachment(unsafe.Pointer(&evs[0].Udata), pa) _, err := unix.Kevent(p.fd, evs[:], nil, nil) return os.NewSyscallError("kevent add", err) } @@ -214,12 +206,12 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) + evs[0].Filter = unix.EVFILT_WRITE evs[0].Flags = unix.EV_ADD if edgeTriggered { evs[0].Flags |= unix.EV_CLEAR } - evs[0].Filter = unix.EVFILT_WRITE - evs[0].Udata = (*byte)(unsafe.Pointer(pa)) + convertPollAttachment(unsafe.Pointer(&evs[0].Udata), pa) _, err := unix.Kevent(p.fd, evs[:], nil, nil) return os.NewSyscallError("kevent add", err) } @@ -228,9 +220,8 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { func (p *Poller) ModRead(pa *PollAttachment, _ bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) - evs[0].Flags = unix.EV_DELETE evs[0].Filter = unix.EVFILT_WRITE - evs[0].Udata = (*byte)(unsafe.Pointer(pa)) + evs[0].Flags = unix.EV_DELETE _, err := unix.Kevent(p.fd, evs[:], nil, nil) return os.NewSyscallError("kevent delete", err) } @@ -239,12 +230,12 @@ func (p *Poller) ModRead(pa *PollAttachment, _ bool) error { func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { var evs [1]unix.Kevent_t evs[0].Ident = keventIdent(pa.FD) + evs[0].Filter = unix.EVFILT_WRITE evs[0].Flags = unix.EV_ADD if edgeTriggered { evs[0].Flags |= unix.EV_CLEAR } - evs[0].Filter = unix.EVFILT_WRITE - evs[0].Udata = (*byte)(unsafe.Pointer(pa)) + convertPollAttachment(unsafe.Pointer(&evs[0].Udata), pa) _, err := unix.Kevent(p.fd, evs[:], nil, nil) return os.NewSyscallError("kevent add", err) } diff --git a/internal/netpoll/poller_kqueue_wakeup.go b/internal/netpoll/poller_kqueue_wakeup.go new file mode 100644 index 000000000..2f9f93298 --- /dev/null +++ b/internal/netpoll/poller_kqueue_wakeup.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build darwin || dragonfly || freebsd +// +build darwin dragonfly freebsd + +package netpoll + +import ( + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/pkg/logging" +) + +func (p *Poller) addWakeupEvent() error { + _, err := unix.Kevent(p.fd, []unix.Kevent_t{{ + Ident: 0, + Filter: unix.EVFILT_USER, + Flags: unix.EV_ADD | unix.EV_CLEAR, + }}, nil, nil) + return err +} + +func (p *Poller) wakePoller() error { +retry: + _, err := unix.Kevent(p.fd, []unix.Kevent_t{{ + Ident: 0, + Filter: unix.EVFILT_USER, + Fflags: unix.NOTE_TRIGGER, + }}, nil, nil) + if err == nil { + return nil + } + if err == unix.EINTR { + // All changes contained in the changelist should have been applied + // before returning EINTR. But let's be skeptical and retry it anyway, + // to make a 100% commitment. + goto retry + } + logging.Warnf("failed to wake up the poller: %v", err) + return err +} + +func (p *Poller) drainWakeupEvent() {} diff --git a/internal/netpoll/poller_kqueue_wakeup1.go b/internal/netpoll/poller_kqueue_wakeup1.go new file mode 100644 index 000000000..4505b5087 --- /dev/null +++ b/internal/netpoll/poller_kqueue_wakeup1.go @@ -0,0 +1,62 @@ +// Copyright (c) 2024 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build netbsd || openbsd +// +build netbsd openbsd + +package netpoll + +import ( + "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/pkg/logging" +) + +// TODO(panjf2000): NetBSD didn't implement EVFILT_USER for user-established events +// until NetBSD 10.0, check out https://www.netbsd.org/releases/formal-10/NetBSD-10.0.html +// Therefore we use the pipe to wake up the kevent on NetBSD at this point. Get back here +// and switch to EVFILT_USER when we bump up the minimal requirement of NetBSD to 10.0. +// Alternatively, maybe we can use EVFILT_USER on the NetBSD by checking the kernel version +// via uname(3) and fall back to the pipe if the kernel version is older than 10.0. + +func (p *Poller) addWakeupEvent() error { + p.pipe = make([]int, 2) + if err := unix.Pipe2(p.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC); err != nil { + logging.Fatalf("failed to create pipe for wakeup event: %v", err) + } + _, err := unix.Kevent(p.fd, []unix.Kevent_t{{ + Ident: uint64(p.pipe[0]), + Filter: unix.EVFILT_READ, + Flags: unix.EV_ADD, + }}, nil, nil) + return err +} + +func (p *Poller) wakePoller() error { +retry: + _, err := unix.Write(p.pipe[1], []byte("x")) + if err == nil || err == unix.EAGAIN { + return nil + } + if err == unix.EINTR { + goto retry + } + logging.Warnf("failed to write to the wakeup pipe: %v", err) + return err +} + +func (p *Poller) drainWakeupEvent() { + var buf [8]byte + _, _ = unix.Read(p.pipe[0], buf[:]) +} diff --git a/internal/netpoll/poller_unix_ultimate.go b/internal/netpoll/poller_unix_ultimate.go new file mode 100644 index 000000000..7404e77bb --- /dev/null +++ b/internal/netpoll/poller_unix_ultimate.go @@ -0,0 +1,29 @@ +// Copyright (c) 2024 The Gnet Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && poll_opt +// +build darwin dragonfly freebsd linux netbsd openbsd +// +build poll_opt + +package netpoll + +import "unsafe" + +func convertPollAttachment(ptr unsafe.Pointer, attachment *PollAttachment) { + *(**PollAttachment)(ptr) = attachment +} + +func restorePollAttachment(ptr unsafe.Pointer) *PollAttachment { + return *(**PollAttachment)(ptr) +} diff --git a/internal/socket/fd_unix.go b/internal/socket/fd_unix.go index 82de8ce27..7b89591d9 100644 --- a/internal/socket/fd_unix.go +++ b/internal/socket/fd_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/sock_bsd.go b/internal/socket/sock_bsd.go index 6c9de5627..9d97ef417 100644 --- a/internal/socket/sock_bsd.go +++ b/internal/socket/sock_bsd.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || netbsd || openbsd +// +build darwin dragonfly freebsd netbsd openbsd package socket diff --git a/internal/socket/sock_cloexec.go b/internal/socket/sock_cloexec.go index 4be0813a1..334ad0a3b 100644 --- a/internal/socket/sock_cloexec.go +++ b/internal/socket/sock_cloexec.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd -// +build linux freebsd dragonfly netbsd openbsd +//go:build dragonfly || freebsd || linux +// +build dragonfly freebsd linux package socket diff --git a/internal/socket/sock_posix.go b/internal/socket/sock_posix.go index 5235c6406..8b7dd24c4 100644 --- a/internal/socket/sock_posix.go +++ b/internal/socket/sock_posix.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/socket.go b/internal/socket/socket.go index e49f143f3..a6d9a9448 100644 --- a/internal/socket/socket.go +++ b/internal/socket/socket.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd // Package socket provides functions that return fd and net.Addr based on // given the protocol and address with a SO_REUSEPORT option set to the socket. diff --git a/internal/socket/sockopts_posix.go b/internal/socket/sockopts_posix.go index 1280285f2..61fc8d31f 100644 --- a/internal/socket/sockopts_posix.go +++ b/internal/socket/sockopts_posix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/sockopts_unix.go b/internal/socket/sockopts_unix.go index 1d1a2d559..4b63d8389 100644 --- a/internal/socket/sockopts_unix.go +++ b/internal/socket/sockopts_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd -// +build linux freebsd dragonfly netbsd +//go:build dragonfly || freebsd || linux || netbsd +// +build dragonfly freebsd linux netbsd package socket diff --git a/internal/socket/sockopts_unix1.go b/internal/socket/sockopts_unix1.go index 0f7be1da9..d4c666f95 100644 --- a/internal/socket/sockopts_unix1.go +++ b/internal/socket/sockopts_unix1.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || dragonfly || netbsd || openbsd || darwin -// +build linux dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || linux || netbsd || openbsd +// +build darwin dragonfly linux netbsd openbsd package socket diff --git a/internal/socket/socktoaddr.go b/internal/socket/socktoaddr.go index 36caa6f9c..068a37213 100644 --- a/internal/socket/socktoaddr.go +++ b/internal/socket/socktoaddr.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/sys_cloexec.go b/internal/socket/sys_cloexec.go index b526c100d..abd4ee378 100644 --- a/internal/socket/sys_cloexec.go +++ b/internal/socket/sys_cloexec.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build darwin -// +build darwin +//go:build darwin || netbsd || openbsd +// +build darwin netbsd openbsd package socket diff --git a/internal/socket/tcp_socket.go b/internal/socket/tcp_socket.go index 337b13c65..21d4af32e 100644 --- a/internal/socket/tcp_socket.go +++ b/internal/socket/tcp_socket.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/udp_socket.go b/internal/socket/udp_socket.go index 10c3ba304..6205c986b 100644 --- a/internal/socket/udp_socket.go +++ b/internal/socket/udp_socket.go @@ -13,8 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/internal/socket/unix_socket.go b/internal/socket/unix_socket.go index b999aaf06..688672d09 100644 --- a/internal/socket/unix_socket.go +++ b/internal/socket/unix_socket.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package socket diff --git a/listener_unix.go b/listener_unix.go index e2d497b61..18fde857a 100644 --- a/listener_unix.go +++ b/listener_unix.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/os_unix_test.go b/os_unix_test.go index f3d0f0724..0612cfd06 100644 --- a/os_unix_test.go +++ b/os_unix_test.go @@ -1,5 +1,5 @@ -//go:build linux || freebsd || dragonfly || netbsd || openbsd || darwin -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd package gnet diff --git a/reactor_default.go b/reactor_default.go index a8935c879..8927f471e 100644 --- a/reactor_default.go +++ b/reactor_default.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && !poll_opt -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && !poll_opt +// +build darwin dragonfly freebsd linux netbsd openbsd // +build !poll_opt package gnet diff --git a/reactor_ultimate.go b/reactor_ultimate.go index 08ab1c8c6..ce307f7c1 100644 --- a/reactor_ultimate.go +++ b/reactor_ultimate.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build (linux || freebsd || dragonfly || netbsd || openbsd || darwin) && poll_opt -// +build linux freebsd dragonfly netbsd openbsd darwin +//go:build (darwin || dragonfly || freebsd || linux || netbsd || openbsd) && poll_opt +// +build darwin dragonfly freebsd linux netbsd openbsd // +build poll_opt package gnet From c2a7318880416b41ea8ea76196ddc3552a0dde09 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 24 Apr 2024 20:59:57 +0800 Subject: [PATCH 3/6] chore: update READMEs --- README.md | 7 ++++--- README_ZH.md | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5934bf1b9..d68ed1e52 100644 --- a/README.md +++ b/README.md @@ -38,15 +38,16 @@ English | [中文](README_ZH.md) - [x] Efficient, reusable, and elastic memory buffer: (Elastic-)Ring-Buffer, Linked-List-Buffer and Elastic-Mixed-Buffer - [x] Multiple protocols/IPC mechanisms: `TCP`, `UDP`, and `Unix Domain Socket` - [x] Multiple load-balancing algorithms: `Round-Robin`, `Source-Addr-Hash`, and `Least-Connections` -- [x] Two event-driven mechanisms: `epoll` on **Linux** and `kqueue` on **FreeBSD/DragonFly/Darwin** - [x] Flexible ticker event -- [x] Implementation of `gnet` Client -- [x] **Windows** platform support (For compatibility in development only, do not use it in production) +- [x] `gnet` client +- [x] Running on `Linux`, `macOS`, `Windows`, and *BSD: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD` - [x] **Edge-triggered** I/O support - [x] Multiple network addresses binding - [ ] **TLS** support - [ ] [io_uring](https://kernel.dk/io_uring.pdf) support +***Windows version of `gnet` should only be used in development for developing and testing, it shouldn't be used in production.*** + # 🎬 Getting started `gnet` is available as a Go module and we highly recommend that you use `gnet` via [Go Modules](https://go.dev/blog/using-go-modules), with Go 1.11 Modules enabled (Go 1.11+), you can just simply add `import "github.com/panjf2000/gnet/v2"` to the codebase and run `go mod download/go mod tidy` or `go [build|run|test]` to download the necessary dependencies automatically. diff --git a/README_ZH.md b/README_ZH.md index 43db28d00..70f1c2c4d 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -38,15 +38,16 @@ - [x] 高效、可重用而且自动伸缩的内存 buffer:(Elastic-)Ring-Buffer, Linked-List-Buffer and Elastic-Mixed-Buffer - [x] 多种网络协议/IPC 机制:`TCP`、`UDP` 和 `Unix Domain Socket` - [x] 多种负载均衡算法:`Round-Robin(轮询)`、`Source-Addr-Hash(源地址哈希)` 和 `Least-Connections(最少连接数)` -- [x] 两种事件驱动机制:**Linux** 里的 `epoll` 以及 **FreeBSD/DragonFly/Darwin** 里的 `kqueue` - [x] 灵活的事件定时器 -- [x] 实现 `gnet` 客户端 -- [x] 支持 **Windows** 平台 (仅用于开发环境的兼容性,不要在生产环境中使用) +- [x] `gnet` 客户端支持 +- [x] 支持 `Linux`, `macOS`, `Windows` 和 *BSD 操作系统: `Darwin`/`DragonFlyBSD`/`FreeBSD`/`NetBSD`/`OpenBSD` - [x] **Edge-triggered** I/O 支持 - [x] 多网络地址绑定 - [ ] **TLS** 支持 - [ ] [io_uring](https://kernel.dk/io_uring.pdf) 支持 +***`gnet` 的 Windows 版本应该仅用于开发阶段的开发和测试,切勿用于生产环境***。 + # 🎬 开始 `gnet` 是一个 Go module,而且我们也强烈推荐通过 [Go Modules](https://go.dev/blog/using-go-modules) 来使用 `gnet`,在开启 Go Modules 支持(Go 1.11+)之后可以通过简单地在代码中写 `import "github.com/panjf2000/gnet/v2"` 来引入 `gnet`,然后执行 `go mod download/go mod tidy` 或者 `go [build|run|test]` 这些命令来自动下载所依赖的包。 From 31a153fd04d90b6c66be8bfc064ac064dd2e5326 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 24 Apr 2024 23:34:45 +0800 Subject: [PATCH 4/6] chore: update the comments of SO_REUSEPORT on AF_UNIX --- gnet.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/gnet.go b/gnet.go index a5214eede..3e2d9ab9a 100644 --- a/gnet.go +++ b/gnet.go @@ -488,8 +488,11 @@ func createListeners(addrs []string, opts ...Option) ([]*listener, *Options, err // with the capability of load balancing, it's the equivalent of Linux's SO_REUSEPORT. // Also note that DragonFlyBSD 3.6.0 extended SO_REUSEPORT to distribute workload to // available sockets, which make it the same as Linux's SO_REUSEPORT. - // AF_LOCAL with SO_REUSEPORT enables duplicate address and port bindings without - // load balancing on Linux and *BSD. Therefore, disable it for Unix domain sockets. + // + // Despite the fact that SO_REUSEPORT can be set on a Unix domain socket + // via setsockopt() without reporting an error, SO_REUSEPORT is actually + // not supported for sockets of AF_UNIX. Thus, we avoid setting it on the + // Unix domain sockets. goos := runtime.GOOS if (options.Multicore || options.NumEventLoop > 1) && options.ReusePort && ((goos != "linux" && goos != "dragonfly" && goos != "freebsd") || hasUnix) { From 4609d5075d0ea9f164b396aba65003220425c334 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 24 Apr 2024 23:45:43 +0800 Subject: [PATCH 5/6] opt: make it fail when binding to the same AF_UNIX path --- listener_unix.go | 1 - 1 file changed, 1 deletion(-) diff --git a/listener_unix.go b/listener_unix.go index 18fde857a..956c84890 100644 --- a/listener_unix.go +++ b/listener_unix.go @@ -58,7 +58,6 @@ func (ln *listener) normalize() (err error) { ln.fd, ln.addr, err = socket.UDPSocket(ln.network, ln.address, false, ln.sockOpts...) ln.network = "udp" case "unix": - _ = os.RemoveAll(ln.address) ln.fd, ln.addr, err = socket.UnixSocket(ln.network, ln.address, true, ln.sockOpts...) default: err = errors.ErrUnsupportedProtocol From 4c3b84f54b33f34015a41838b14a9e52b9cb5959 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Thu, 25 Apr 2024 00:23:04 +0800 Subject: [PATCH 6/6] Revert "opt: make it fail when binding to the same AF_UNIX path" This reverts commit 4609d5075d0ea9f164b396aba65003220425c334. --- listener_unix.go | 1 + 1 file changed, 1 insertion(+) diff --git a/listener_unix.go b/listener_unix.go index 956c84890..18fde857a 100644 --- a/listener_unix.go +++ b/listener_unix.go @@ -58,6 +58,7 @@ func (ln *listener) normalize() (err error) { ln.fd, ln.addr, err = socket.UDPSocket(ln.network, ln.address, false, ln.sockOpts...) ln.network = "udp" case "unix": + _ = os.RemoveAll(ln.address) ln.fd, ln.addr, err = socket.UnixSocket(ln.network, ln.address, true, ln.sockOpts...) default: err = errors.ErrUnsupportedProtocol