Skip to content

Commit

Permalink
Add sendInLoopFinish callback (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allenxuxu authored Jul 5, 2021
1 parent 979940f commit 0d5fcc0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
11 changes: 10 additions & 1 deletion connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,23 @@ func (c *Connection) Connected() bool {
}

// Send 用来在非 loop 协程发送
func (c *Connection) Send(data interface{}) error {
func (c *Connection) Send(data interface{}, opts ...Option) error {
if !c.connected.Get() {
return ErrConnectionClosed
}

opt := Options{}
for _, o := range opts {
o(&opt)
}

c.loop.QueueInLoop(func() {
if c.connected.Get() {
c.sendInLoop(c.protocol.Packet(c, data))

if opt.sendInLoopFinish != nil {
opt.sendInLoopFinish(data)
}
}
})
return nil
Expand Down
15 changes: 15 additions & 0 deletions connection/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package connection

type SendInLoopFunc func(interface{})

type Options struct {
sendInLoopFinish SendInLoopFunc
}

type Option func(*Options)

func SendInLoop(f SendInLoopFunc) Option {
return func(o *Options) {
o.sendInLoopFinish = f
}
}
63 changes: 63 additions & 0 deletions example/sendAsync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"flag"
"net/http"
_ "net/http/pprof"
"strconv"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/log"
"github.com/gobwas/pool/pbytes"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
log.Info(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out interface{}) {
log.Info("OnMessage ", string(data))

b := pbytes.Get(0, 10)
b = append(b, []byte("1234\n")...)

_ = c.Send(b, connection.SendInLoop(func(i interface{}) {
log.Info("put []byte ")
pbytes.Put(i.([]byte))
}))
return
}

func (s *example) OnClose(c *connection.Connection) {
log.Info("OnClose")
}

func main() {
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
panic(err)
}
}()

handler := new(example)
var port int
var loops int

flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()

s, err := gev.NewServer(handler,
gev.Network("tcp"),
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.MetricsServer("", ":9091"),
)
if err != nil {
panic(err)
}

s.Start()
}

0 comments on commit 0d5fcc0

Please sign in to comment.