Skip to content

Commit

Permalink
Merge branch 'panjf2000:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
0-haha committed May 24, 2024
2 parents d78adc6 + 8a0be60 commit 7de6c58
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 62 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ English | [中文](README_ZH.md)

`gnet` is an event-driven networking framework that is ultra-fast and lightweight. It is built from scratch by exploiting [epoll](https://man7.org/linux/man-pages/man7/epoll.7.html) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) and it can achieve much higher performance with lower memory consumption than Go [net](https://golang.org/pkg/net/) in many specific scenarios.

`gnet` and [net](https://golang.org/pkg/net/) don't share the same philosophy about network programming. Thus, building network applications with `gnet` can be significantly different from building them with [net](https://golang.org/pkg/net/), and the philosophies can't be harmonized. There are other similar products written in other programming languages in the community, such as [libevent](https://github.com/libevent/libevent), [libuv](https://github.com/libuv/libuv), [netty](https://github.com/netty/netty), [twisted](https://github.com/twisted/twisted), [tornado](https://github.com/tornadoweb/tornado), etc. which work in a similar pattern as `gnet` under the hood.
`gnet` and [net](https://golang.org/pkg/net/) don't share the same philosophy about network programming. Thus, building network applications with `gnet` can be significantly different from building them with [net](https://golang.org/pkg/net/), and the philosophies can't be reconciled. There are other similar products written in other programming languages in the community, such as [libevent](https://github.com/libevent/libevent), [libuv](https://github.com/libuv/libuv), [netty](https://github.com/netty/netty), [twisted](https://github.com/twisted/twisted), [tornado](https://github.com/tornadoweb/tornado), etc. which work in a similar pattern as `gnet` under the hood.

`gnet` is not designed to displace the Go [net](https://golang.org/pkg/net/), but to create an alternative in the Go ecosystem for building performance-critical network services. As a result of which, `gnet` is not as comprehensive as Go [net](https://golang.org/pkg/net/), it provides only the core functionalities (in a concise API set) required by a network application and it is not planned on being a coverall networking framework, as I think [net](https://golang.org/pkg/net/) has done a good enough job in that area.
`gnet` is not designed to displace the Go [net](https://golang.org/pkg/net/), but to create an alternative in the Go ecosystem for building performance-critical network services. As a result of which, `gnet` is not as comprehensive as Go [net](https://golang.org/pkg/net/), it provides only the core functionalities (in a concise API set) required by a network application and it doesn't plan on being a coverall networking framework, as I think Go [net](https://golang.org/pkg/net/) has done a good enough job in that area.

`gnet` sells itself as a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go which works on the transport layer with TCP/UDP protocols and Unix Domain Socket. It enables developers to implement their own protocols(HTTP, RPC, WebSocket, Redis, etc.) of application layer upon `gnet` for building diversified network services. For instance, you get an HTTP Server if you implement HTTP protocol upon `gnet` while you have a Redis Server done with the implementation of Redis protocol upon `gnet` and so on.

Expand Down
2 changes: 1 addition & 1 deletion connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
err = el.write(c)
default:
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
err = el.close(c, io.EOF)
}
}
Expand Down
8 changes: 4 additions & 4 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
Expand All @@ -43,14 +43,14 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
// to the remote first and then close the connection.
//
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 {
if err := el.read(c); err != nil {
return err
}
Expand Down
41 changes: 36 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (el *eventloop) readTLS(c *conn) error {
}
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))
}

const maxBytesTransferET = 1 << 22

func (el *eventloop) read(c *conn) error {
if !c.opened {
return nil
Expand All @@ -171,6 +177,7 @@ func (el *eventloop) read(c *conn) error {
return el.readTLS(c)
}

var recv int
isET := el.engine.opts.EdgeTriggeredIO
loop:
n, err := unix.Read(c.fd, el.buffer)
Expand All @@ -183,6 +190,7 @@ loop:
}
return el.close(c, os.NewSyscallError("read", err))
}
recv += n

if c.tlsconn != nil {
// attach the gnet eventloop.buffer to tlsconn.rawInput.
Expand Down Expand Up @@ -219,13 +227,25 @@ loop:
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]

if isET || c.isEOF {
if c.isEOF || (isET && recv < maxBytesTransferET) {
goto loop
}

// To prevent infinite reading in ET mode and starving other events,
// we need to set up threshold for the maximum read bytes per connection
// on each event-loop. If the threshold is reached and there are still
// unread data in the socket buffer, we must issue another read event manually.
if isET && n == len(el.buffer) {
return el.poller.Trigger(queue.LowPriority, el.read0, c)
}

return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
const iovMax = 1024

Expand All @@ -236,8 +256,9 @@ func (el *eventloop) write(c *conn) error {

isET := el.engine.opts.EdgeTriggeredIO
var (
n int
err error
n int
sent int
err error
)
loop:
iov, _ := c.outboundBuffer.Peek(-1)
Expand All @@ -257,14 +278,24 @@ loop:
default:
return el.close(c, os.NewSyscallError("write", err))
}
if isET && !c.outboundBuffer.IsEmpty() {
sent += n

if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET {
goto loop
}

// All data have been sent, it's no need to monitor the writable events for LT mode,
// remove the writable event from poller to help the future event-loops if necessary.
if !isET && c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(&c.pollAttachment, false)
return el.poller.ModRead(&c.pollAttachment, false)
}

// To prevent infinite writing in ET mode and starving other events,
// we need to set up threshold for the maximum write bytes per connection
// on each event-loop. If the threshold is reached and there are still
// pending data to write, we must issue another write event manually.
if isET && !c.outboundBuffer.IsEmpty() {
return el.poller.Trigger(queue.HighPriority, el.write0, c)
}

return nil
Expand Down
30 changes: 15 additions & 15 deletions eventloop_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
func BenchmarkGC4El100k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000)
b.Run("Run-4-eventloop-100000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) {
func BenchmarkGC4El200k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000)
b.Run("Run-4-eventloop-200000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) {
func BenchmarkGC4El500k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000)
b.Run("Run-4-eventloop-500000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand Down Expand Up @@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 10000)
testServeGC(t, "tcp", ":0", true, true, 1, 10000)
})
t.Run("1-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 100000)
testServeGC(t, "tcp", ":0", true, true, 1, 100000)
})
t.Run("1-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 1000000)
testServeGC(t, "tcp", ":0", true, true, 1, 1000000)
})
t.Run("2-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 10000)
testServeGC(t, "tcp", ":0", true, true, 2, 10000)
})
t.Run("2-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 100000)
testServeGC(t, "tcp", ":0", true, true, 2, 100000)
})
t.Run("2-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 1000000)
testServeGC(t, "tcp", ":0", true, true, 2, 1000000)
})
t.Run("4-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 10000)
testServeGC(t, "tcp", ":0", true, true, 4, 10000)
})
t.Run("4-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 100000)
testServeGC(t, "tcp", ":0", true, true, 4, 100000)
})
t.Run("4-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 1000000)
testServeGC(t, "tcp", ":0", true, true, 4, 1000000)
})
t.Run("16-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 10000)
testServeGC(t, "tcp", ":0", true, true, 16, 10000)
})
t.Run("16-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 100000)
testServeGC(t, "tcp", ":0", true, true, 16, 100000)
})
t.Run("16-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 1000000)
testServeGC(t, "tcp", ":0", true, true, 16, 1000000)
})
})
}
Expand Down
11 changes: 8 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ const (
MinPollEventsCap = 32
// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
MaxAsyncTasksAtOneTime = 256
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
// reading/writing from/to a closed socket, etc.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
// ReadEvents represents readable events that are polled by epoll.
ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
// WriteEvents represents writeable events that are polled by epoll.
WriteEvents = unix.EPOLLOUT
// ReadWriteEvents represents both readable and writeable events.
ReadWriteEvents = ReadEvents | WriteEvents
// ErrEvents represents exceptional events that occurred on the local side.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
)

type eventList struct {
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,57 +193,51 @@ func (p *Poller) Polling(callback PollEventHandler) error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = writeEvents
var ev uint32 = WriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
Expand Down
Loading

0 comments on commit 7de6c58

Please sign in to comment.