From 7a1142b839450fa92262e65353b64dd7def552cc Mon Sep 17 00:00:00 2001 From: steven Date: Wed, 16 Feb 2022 10:45:23 +0800 Subject: [PATCH 1/3] feat(wti): wti --- bucket/bucket.go | 2 +- client/client.go | 6 +- client/interface.go | 6 ++ plugins/wti/doc.go | 5 ++ plugins/wti/group.go | 71 ++++++++++++++++++++++++ plugins/wti/interface.go | 41 ++++++++++++++ plugins/wti/wti.go | 116 +++++++++++++++++++++++++++++++++++++++ plugins/wti/wti_test.go | 99 +++++++++++++++++++++++++++++++++ rooms/interface.go | 33 ----------- rooms/map.go | 81 --------------------------- rooms/roomset.go | 98 --------------------------------- rpc.go | 18 ++++++ run.go | 8 +-- server.go | 43 ++++++--------- test/wti/im_test.go | 54 ++++++++++++++++++ 15 files changed, 435 insertions(+), 246 deletions(-) create mode 100644 plugins/wti/doc.go create mode 100644 plugins/wti/group.go create mode 100644 plugins/wti/interface.go create mode 100644 plugins/wti/wti.go create mode 100644 plugins/wti/wti_test.go delete mode 100644 rooms/interface.go delete mode 100644 rooms/map.go delete mode 100644 rooms/roomset.go create mode 100644 test/wti/im_test.go diff --git a/bucket/bucket.go b/bucket/bucket.go index 5f20343..b83d2b9 100644 --- a/bucket/bucket.go +++ b/bucket/bucket.go @@ -35,7 +35,6 @@ type bucket struct { // log log log.Logger - opts * Option } @@ -138,6 +137,7 @@ func (h *bucket) OffLine(token string) { } } +// 将用户注册到bucket中 func (h *bucket) Register(cli client.Clienter,token string) error { if cli == nil { return ErrCliISNil diff --git a/client/client.go b/client/client.go index 5b6490b..e24fd8c 100644 --- a/client/client.go +++ b/client/client.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/mongofs/im/log" "net/http" "sync" "time" @@ -13,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/gorilla/websocket" im "github.com/mongofs/api/im/v1" + "github.com/mongofs/im/log" ) @@ -28,8 +28,8 @@ const ( type Cli struct { lastHeartBeatT int64 - conn *websocket.Conn - reader *http.Request + conn *websocket.Conn + reader *http.Request token string closeFunc sync.Once done chan struct{} diff --git a/client/interface.go b/client/interface.go index 5f029cf..9789872 100644 --- a/client/interface.go +++ b/client/interface.go @@ -3,16 +3,22 @@ package client import "net/http" type Clienter interface { + // 调用此方法可以给当前用户发送消息 Send([]byte, ...int64) error + // 用户下线 Offline() + // 重置用户的心跳 ResetHeartBeatTime() + // 获取用户的最后一次心跳 LastHeartBeat() int64 + // 获取用户的token Token() string + // 获取到用户的请求的链接 Request()*http.Request } diff --git a/plugins/wti/doc.go b/plugins/wti/doc.go new file mode 100644 index 0000000..ec95144 --- /dev/null +++ b/plugins/wti/doc.go @@ -0,0 +1,5 @@ +// wti 是关于im 的一个扩展,默认是不开启的。如果用户需要做tag相关的内容可以选着开启tag选项。如果开启tag 在rpc调用过程中 +// 实际可以调用tag相关的接口,tag因为是额外做存储的,所以可能会耗费额外的内存,如果说您不需要给用户进行标记等相关业务,建议你 +// 可以选择不开启,如果你的业务存在聊天室等相关的业务,需要快速查询到某一批次人(即打了标签的人),那么就可以使用这个额外的标签 +// 你也可以根据你自身的业务来进行编写符合你公司业务的模型进行分类。 +package wti \ No newline at end of file diff --git a/plugins/wti/group.go b/plugins/wti/group.go new file mode 100644 index 0000000..4f0839c --- /dev/null +++ b/plugins/wti/group.go @@ -0,0 +1,71 @@ +package wti + +import ( + "github.com/mongofs/im/client" + "sync" + "time" +) + +type Group struct { + rw *sync.RWMutex + set map[string]*client.Cli + createTime int64 +} + +func NewGroup()*Group { + return &Group{ + rw: &sync.RWMutex{}, + set: map[string]*client.Cli{}, + createTime: time.Now().Unix(), + } +} + +// 给所有用户广播 +func (g *Group) broadCast(content []byte){ + g.rw.RLock() + defer g.rw.RUnlock() + for _,v := range g.set { + v.Send(content) + } +} + +// 添加cli +func (g *Group) addCli(clis ... *client.Cli){ + g.rw.Lock() + defer g.rw.Unlock() + for _,v := range clis{ + g.set[v.Token()]=v + } +} + +// 删除cli +func (g *Group) rmCli(tokens ... string){ + g.rw.Lock() + defer g.rw.Unlock() + for _,token := range tokens { + delete(g.set,token) + } +} + +// 是否存在cli +func (g *Group) isExsit(token string)bool{ + g.rw.RLock() + defer g.rw.RUnlock() + if _,ok:=g.set[token];ok{ + return true + } + return false +} + +// 是否存在cli +func (g *Group) Counter()int64{ + g.rw.RLock() + defer g.rw.RUnlock() + return int64(len(g.set)) +} + + +// 就是使用这个方法将g 注册到Observer 上面去。 +func (g *Group) Update(tokens ... string){ + g.rmCli(tokens...) +} \ No newline at end of file diff --git a/plugins/wti/interface.go b/plugins/wti/interface.go new file mode 100644 index 0000000..abb09ad --- /dev/null +++ b/plugins/wti/interface.go @@ -0,0 +1,41 @@ +// WTI 全称为 websocket Target interface ,就是长连接标记接口,目前系统提供的解决方案为 +package wti + +import "github.com/mongofs/im/client" + +// WebSocket Target Interface +type WTI interface { + // 给用户打上标签 + SetTAG(cli *client.Cli, tag ...string) + + // 如果用户下线将会通知调用这个方法 + Update(token ...string) + + // 广播到包含tag 对象 + BroadCast(content []byte, tag ...string) + + // 广播所有内容 + BroadCastByTarget(targetAndContent map[string][]byte) + + // 获取某个用户的所有的tag + GetClienterTAGs(token string)[]string + + // 获取到TAG 的创建时间,系统会判断这个tag创建时间和当前人数来确认是否需要删除这个tag + GetTAGCreateTime(tag string)int64 + + // 获取到TAG 的在线人数,系统会判断这个tag如果没有在线人数为0 且创建时间大于MAX-wti-create-time ,这个tag就会被回收 + GetTAGClients (tag string)int64 + + // 回收TAG ,im 主线程会根据GetTAGCreateTime 和 GetTAGClients 进行数据回收,回收也会调用此方法 + RecycleTAG (tag string) +} + +// 其他地方将调用这个变量,如果自己公司实现tag需要注入在程序中进行注入 +var Factory WTI = newwti() + + +func Inject (wti WTI) { + Factory = wti +} + + diff --git a/plugins/wti/wti.go b/plugins/wti/wti.go new file mode 100644 index 0000000..75de8e5 --- /dev/null +++ b/plugins/wti/wti.go @@ -0,0 +1,116 @@ +// 构建tag 标签的这个功能,主要目标希望将用户分类起来,1000000 * 8 /1024 / 1024 = 7M的内存,额外使用tag包进行封装的话, +// 预计使用一百万用户的内存需要大概7~8M,这个范围是可以接受的。 +package wti + +import ( + "github.com/mongofs/im/client" + "sync" +) + +type tg struct { + mp map[string] *Group // wti => []string + rw *sync.RWMutex +} + + +func newwti() WTI { + return &tg{ + mp: map[string]*Group{}, + rw: &sync.RWMutex{}, + } +} + + +// 给用户设置标签 +func (t *tg) SetTAG(cli *client.Cli, tags ...string) { + if len(tags)== 0 { + return + } + t.rw.Lock() + defer t.rw.Unlock() + for _,tag := range tags{ + if group,ok:= t.mp[tag];!ok { // wti not exist + t.mp[tag]= NewGroup() + t.mp[tag].addCli(cli) + }else { // wti exist + group.addCli(cli) + } + } +} + + +// 给某一个标签的群体进行广播 +func (t *tg) BroadCast(content []byte,tags ...string) { + if len(tags)== 0 { + return + } + t.rw.RLock() + defer t.rw.RUnlock() + + for _,tag := range tags{ + if group,ok := t.mp[tag];ok{ + group.broadCast(content) + } + } +} + + +// 通知所有组进行自查某个用户,并删除 +func (t *tg)Update(token ...string){ + t.rw.RLock() + defer t.rw.RUnlock() + for _,v := range t.mp { + v.Update(token... ) + } +} + + +// 调用这个就可以分类广播,可能出现不同的targ 需要不同的内容 +func(t *tg)BroadCastByTarget(targetAndContent map[string][]byte){ + if len(targetAndContent) == 0{ return } + for target ,content := range targetAndContent { + t.BroadCast(content,target) + } +} + + +// 获取到用户token的所有TAG,时间复杂度是O(m) ,m是所有的房间 +func (t *tg)GetClienterTAGs(token string)[]string{ + var res []string + t.rw.RLock() + defer t.rw.RUnlock() + for k,v:= range t.mp{ + exsit:= v.isExsit(token) + if exsit { + res = append(res,k ) + } + } + return res +} + +// 如果创建时间为0 ,表示没有这个房间 +func (t *tg) GetTAGCreateTime(tag string) int64 { + t.rw.RLock() + defer t.rw.RUnlock() + if v,ok:=t.mp[tag];ok{ + return v.createTime + } + return 0 +} + +// 获取到tag的总人数 +func (t *tg) GetTAGClients(tag string) int64 { + t.rw.RLock() + defer t.rw.RUnlock() + if v,ok:=t.mp[tag];ok{ + return v.Counter() + } + return 0 +} + +// 删除tag +func (t *tg) RecycleTAG(tag string) { + t.rw.Lock() + defer t.rw.Unlock() + delete(t.mp,tag) +} \ No newline at end of file diff --git a/plugins/wti/wti_test.go b/plugins/wti/wti_test.go new file mode 100644 index 0000000..e6067f9 --- /dev/null +++ b/plugins/wti/wti_test.go @@ -0,0 +1,99 @@ +package wti + +import ( + "fmt" + "testing" +) + +// 广播,针对tag进行广播,这也是wti的核心接口,分类广播也是基于这个接口 +func TestTg_BroadCast(t *testing.T) { + tests := []struct { + target []string + content []byte + }{ + { + target: []string{"v1"} , + content: []byte("hello content"), + }, + { + target: []string{"v1","v2"} , + content: []byte("hello content"), + }, + { + target: []string{"v1","v2","v3"} , + content: []byte("hello content"), + }, + } + // 测试 v1、v2 、v1 三个版本的不同,如果要模式真实连接,则需要执行 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + Factory.BroadCast(v.content,v.target...) + } + // v1 websocket output : hello content ,and v2,v3 no content output + // v1,v2 websocket output : hello content,and v3 no content output + // v1,v2,v3 websocket output : hello content + +} + + + +// 主要应对数据发送的时候版本的问题,比如某一条数据由于协议更改需要向上兼容老的版本,因为这是应用层的内容 +// 所以使用wti 接口来进行兼容处理。避免进行内容的感染 +func TestTg_BroadCastByTarget(t *testing.T) { + tests := []struct { + give map[string][]byte + }{ + { + give: map[string][]byte{ + "v1": []byte("first v1 "), + "v2": []byte("second v2 "), + "v3": []byte("third v3 "), + }, + + }, + { + give: map[string][]byte{ + "v1": []byte("hello v1 "), + "v2": []byte("hello v2 "), + "v3": []byte("hello v3 "), + }, + }, + + } + // 测试 v1、v2 、v1 三个版本的不同,如果要模式真实连接,则需要执行 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + Factory.BroadCastByTarget(v.give) + } + // v1,v2,v3 websocket output :first v1 | second v2 | third v3 + // v1,v2,v3 websocket output :hello v1 | hello v2 | hello v3 +} + + + + +// 主要应对数据发送的时候版本的问题,比如某一条数据由于协议更改需要向上兼容老的版本,因为这是应用层的内容 +// 所以使用wti 接口来进行兼容处理。避免进行内容的感染 +func TestTg_UpdateAndF(t *testing.T) { + tests := []struct { + give string + }{ + { + give: "1234", + }, + } + // 这个测试条件相对比较苛刻,update接口主要作用是接收globalclosed 的信号,如果某个用户关闭连接 + // im线程就会释放连接之前就会告诉当前的update方法,所以只需要判断当前用户是否被删除就好了 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + Factory.Update(v.give) // + fmt.Println(Factory.GetClienterTAGs(v.give)) + } + // output : [] +} \ No newline at end of file diff --git a/rooms/interface.go b/rooms/interface.go deleted file mode 100644 index bb6beb4..0000000 --- a/rooms/interface.go +++ /dev/null @@ -1,33 +0,0 @@ -package rooms - -import "github.com/mongofs/im/client" - -type room interface { - AddUser(token string, clienter client.Clienter) // 将用户添加到某个房间 - - DelUser(token string) // 将用户从房间删除 - - PushData(data []byte) int // 推送消息给房间,并返回推送在线用户数量 - - PushDataToPointedUser(data []byte, token ...string) []string // 将消息推送给房间指定用户 - - GetRoomUserNumber() int32 //获取当前房间人数:注意特指当前服务的机器,不能代表房间所有用户,建议通过redis incr 存储 - - GetRoomCreateTime() int64 // 获取当前房间创建时间 - - GetRoomUserToken()[]string // 获取当前房间用户 - -} - -// 房间管理器 -type RoomSet interface { - - // add user to room - AddClientToRoom(token string, conn client.Clienter, RoomID string) - // del user to room - DelClientFromRoom(token string, RoomID string) - // push data to room - PushDataToRoom(data []byte, RoomID string, token ...string) - // get room usernumber - GetRoomUserNumber() int32 -} diff --git a/rooms/map.go b/rooms/map.go deleted file mode 100644 index 62710c9..0000000 --- a/rooms/map.go +++ /dev/null @@ -1,81 +0,0 @@ -package rooms - -import ( - "github.com/mongofs/im/client" - "go.uber.org/atomic" - "sync" - "time" -) - -type MapRoom struct { - rw *sync.RWMutex - ro map[string]client.Clienter - cot *atomic.Int32 //当前在线用户 - createTime int64 -} - -func (m *MapRoom) GetRoomUserToken() []string { - panic("implement me") -} - -func (m *MapRoom) GetRoomCreateTime() int64 { - return m.createTime -} - -func NewMapRoom()room{ - return &MapRoom{ - rw: &sync.RWMutex{}, - ro: make(map[string]client.Clienter,20), - cot: &atomic.Int32{}, - createTime: time.Now().Unix(), - } -} - - -func (m *MapRoom) AddUser(token string, clienter client.Clienter) { - m.rw.Lock() - m.ro[token]= clienter - m.rw.Unlock() - - m.cot.Inc() -} - - - -func (m *MapRoom) DelUser(token string) { - m.rw.Lock() - delete(m.ro,token) - m.rw.Unlock() - m.cot.Dec() -} - - - -func (m *MapRoom) PushData(data []byte) int{ - m.rw.RLock() - for _,v:= range m.ro{ - v.Send(data) // todo 需要优化 - } - m.rw.RUnlock() -} - - - -func (m *MapRoom) PushDataToPointedUser(data []byte, token ...string)[]string { - m.rw.RLock() - for _,v:= range token{ - if temCli,ok:= m.ro[v];ok{ - temCli.Send(data) - } - } - m.rw.Unlock() -} - - - -func (m *MapRoom) GetRoomUserNumber() int32 { - return m.cot.Load() -} - - - diff --git a/rooms/roomset.go b/rooms/roomset.go deleted file mode 100644 index 57f79ca..0000000 --- a/rooms/roomset.go +++ /dev/null @@ -1,98 +0,0 @@ -package rooms - -import ( - "github.com/mongofs/im/client" - "go.uber.org/atomic" - "sync" - "time" -) - -type roomSet struct { - createM func(RooID string) room - rw *sync.RWMutex - rooms map[string]room - clear time.Duration - - inRoom *atomic.Int64 -} - - - -func (r *roomSet) GetRoomUserNumber() int32 { - panic("implement me") -} - - -func (r *roomSet) monitor() { - for { - r.rw.Lock() - r.inRoom.Store(0) - for k, v := range r.rooms { - // delete the room - counter := v.GetRoomUserNumber() - if counter == 0 && time.Now().Unix()-v.GetRoomCreateTime() > 60*60*2 { - delete(r.rooms, k) - continue - } - r.inRoom.Add(int64(counter)) - } - r.rw.Unlock() - - - time.Sleep(r.clear * time.Second) - } -} - - - -func (r *roomSet) AddClientToRoom(token string, conn client.Clienter, RoomID string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - temRoom := r.createM(RoomID) - temRoom.AddUser(token, conn) - r.rw.Lock() - r.rooms[RoomID] = temRoom - r.rw.Unlock() - } - tem.AddUser(token, conn) -} - - - -func (r *roomSet)GetRoomOnlineList(){ - - - -} - - - - -func (r roomSet) DelClientFromRoom(token string, RoomID string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - return - } - tem.DelUser(token) -} - - - -func (r roomSet) PushDataToRoom(data []byte, RoomID string, token ...string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - return - } - - if len(token) == 0 { - tem.PushData(data) //全房间推送 - } else { - tem.PushDataToPointedUser(data, token...) // 推送给指定用户 - } -} diff --git a/rpc.go b/rpc.go index be26705..1f808d9 100644 --- a/rpc.go +++ b/rpc.go @@ -61,4 +61,22 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro },nil } +// 在开发过程中存在IM需要版本共存的需求,比如我的协议替换了,但是如果im应用在App上面如何进行切换,这就是协议定制不合理的地方,但也需要 +// IM 服务器在这个过程中做配合。 +// IM 存在给用户分组的需求,所以我们在进行Broadcast 就必须进行用户的状态区分,所以前台需要对内容进行分组,传入的内容也需要对应分组 +// 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在 +// 使用用户 + +func (s *ImSrever) BroadcastByTarget(ctx context.Context, req *im.BroadcastReq) (*im.BroadcastReply, error) { + if len(s.buffer) *10 > 8* cap(s.buffer){ + return nil,errors.New(fmt.Sprintf("im/rpc: too much message ,buffer length is %v but cap is %v",len(s.buffer),cap(s.buffer))) + } + start := time.Now() + s.buffer <- req + escape := time.Since(start) + s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","Broadcast",escape) + return &im.BroadcastReply{ + Size: int64(len(s.buffer)), + },nil +} diff --git a/run.go b/run.go index 165b855..f5f4bb3 100644 --- a/run.go +++ b/run.go @@ -11,10 +11,10 @@ var VERSION = "master" func (s *ImSrever)Run ()error { wg := errgroup.Group{} - wg.Go(s.runhttpServer) - wg.Go(s.runGrpcServer) - wg.Go(s.monitor) - wg.Go(s.PushBroadCast) + wg.Go(s.runhttpServer) // 监控HTTP 服务情况 + wg.Go(s.runGrpcServer) // 监控GRPC 服务情况 + wg.Go(s.monitorOnline) // 监控全局在线情况 + wg.Go(s.PushBroadCast) // 监控全局广播情况 return wg.Wait() } diff --git a/server.go b/server.go index a9c2ed6..248c340 100644 --- a/server.go +++ b/server.go @@ -11,44 +11,42 @@ import ( ) type ImSrever struct { - http *http.ServeMux - rpc *grpc.Server - bs []bucket.Bucketer - ps atomic.Int64 + http *http.ServeMux + rpc *grpc.Server + bs []bucket.Bucketer + ps atomic.Int64 - buffer chan *im.BroadcastReq - cancel func() + buffer chan *im.BroadcastReq // 全局广播队列 + cancel func() opt *Option } - - // 统计用户在线人数 // 监控buffer 长度 并进行报警 -func (s *ImSrever)monitor ()error{ - for{ +func (s *ImSrever) monitorOnline() error { + for { n := int64(0) - for _,bck := range s.bs{ + for _, bck := range s.bs { bck.Flush() n += bck.Onlines() } s.ps.Store(n) - time.Sleep(10 *time.Second) + time.Sleep(10 * time.Second) } return nil } // 单独处理广播业务 -func (s *ImSrever)PushBroadCast()error{ - wg:= errgroup.Group{} - for i:= 0;i Date: Thu, 17 Feb 2022 15:06:51 +0800 Subject: [PATCH 2/3] =?UTF-8?q?2022-02-17=E4=BF=AE=E6=94=B9=E7=9A=84?= =?UTF-8?q?=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- benchmark/conn.go | 13 ++++-- benchmark/rpc_bench_test.go | 20 ++++++++ benchmark/rpc_test.go | 6 +++ bucket/observer.go | 4 ++ bucket/operation.go | 9 +++- go.mod | 2 +- go.sum | 4 ++ im.go | 18 ++++++++ option.go | 5 ++ parallel.go | 91 ++++++++++++++++++++++++++++++++++++ plugins/wti/group.go | 6 +++ plugins/wti/interface.go | 92 +++++++++++++++++++++++++++++++++---- plugins/wti/wti.go | 15 ++++-- plugins/wti/wti_test.go | 18 ++++++-- rpc.go | 13 +++--- run.go | 44 ++++-------------- server.go | 63 ------------------------- test/wti/conn_test.go | 57 +++++++++++++++++++++++ test/wti/im_test.go | 16 +++++-- test/wti/rpc_test.go | 45 ++++++++++++++++++ 20 files changed, 409 insertions(+), 132 deletions(-) create mode 100644 bucket/observer.go create mode 100644 parallel.go delete mode 100644 server.go create mode 100644 test/wti/conn_test.go create mode 100644 test/wti/rpc_test.go diff --git a/benchmark/conn.go b/benchmark/conn.go index c72b72a..1244637 100644 --- a/benchmark/conn.go +++ b/benchmark/conn.go @@ -12,15 +12,13 @@ import ( func CreateClient (token string){ dialer := websocket.Dialer{} - conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s",token), nil) + conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=%s",token,getVersion()), nil) if nil != err { log.Println(err) return } defer conn.Close() - counter :=0 - go func() { time.Sleep(10*time.Second) conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token))) @@ -49,6 +47,15 @@ func CreateClient (token string){ } } +// 获取版本 +func getVersion()string{ + if time.Now().Unix() %2 == 0 { + return "v1" + }else{ + return "v2" + } +} + // 用户断线重连 func CreateClientAndTickerReConn (token string){ diff --git a/benchmark/rpc_bench_test.go b/benchmark/rpc_bench_test.go index 56eedc8..a6fbf97 100644 --- a/benchmark/rpc_bench_test.go +++ b/benchmark/rpc_bench_test.go @@ -60,6 +60,7 @@ func BenchmarkBroadCast(t *testing.B) { // BenchmarkBroadCast_paraml-6 2751 391783 ns/op +// BenchmarkBroadCast_paraml-6 3105 494931 ns/op func BenchmarkBroadCast_paraml(t *testing.B) { cli := Client() ctx := context.Background() @@ -70,3 +71,22 @@ func BenchmarkBroadCast_paraml(t *testing.B) { cli.Broadcast(ctx,data) } } + + +// 压测分类广播,大致情况如下 +// BenchmarkBroadCastByWTI-6 2905 411497 ns/op +// BenchmarkBroadCastByWTI-6 3099 421722 ns/op +// BenchmarkBroadCastByWTI-6 2709 385287 ns/op +func BenchmarkBroadCastByWTI(t *testing.B){ + cli := Client() + ctx := context.Background() + tests:= &im.BroadcastByWTIReq{Data: map[string][]byte{ + "v1":[]byte("fucker you v2"), + "v2":[]byte("lover you v1"), + "v3":[]byte("what are you v1"), + }} + + for i:= 0;i60 { + delete(t.mp,k) + } + } } \ No newline at end of file diff --git a/plugins/wti/wti_test.go b/plugins/wti/wti_test.go index e6067f9..b4b6e86 100644 --- a/plugins/wti/wti_test.go +++ b/plugins/wti/wti_test.go @@ -29,7 +29,10 @@ func TestTg_BroadCast(t *testing.T) { // 2. 要在handler方法中调用factory 进行调用SetTAG // 3. 需要建立连接 for _,v := range tests { - Factory.BroadCast(v.content,v.target...) + err := BroadCast(v.content,v.target...) + if err !=nil { + t.Fatal(err) + } } // v1 websocket output : hello content ,and v2,v3 no content output // v1,v2 websocket output : hello content,and v3 no content output @@ -67,7 +70,10 @@ func TestTg_BroadCastByTarget(t *testing.T) { // 2. 要在handler方法中调用factory 进行调用SetTAG // 3. 需要建立连接 for _,v := range tests { - Factory.BroadCastByTarget(v.give) + err:= BroadCastByTarget(v.give) + if err !=nil { + t.Fatal(err) + } } // v1,v2,v3 websocket output :first v1 | second v2 | third v3 // v1,v2,v3 websocket output :hello v1 | hello v2 | hello v3 @@ -92,8 +98,12 @@ func TestTg_UpdateAndF(t *testing.T) { // 2. 要在handler方法中调用factory 进行调用SetTAG // 3. 需要建立连接 for _,v := range tests { - Factory.Update(v.give) // - fmt.Println(Factory.GetClienterTAGs(v.give)) + err := Update(v.give) // + if err !=nil { + t.Fatal(err) + } + res ,_:= GetClienterTAGs(v.give) + fmt.Println(res) } // output : [] } \ No newline at end of file diff --git a/rpc.go b/rpc.go index 1f808d9..7d3b275 100644 --- a/rpc.go +++ b/rpc.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" im "github.com/mongofs/api/im/v1" + "github.com/mongofs/im/plugins/wti" "time" ) @@ -67,16 +68,14 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro // 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在 // 使用用户 -func (s *ImSrever) BroadcastByTarget(ctx context.Context, req *im.BroadcastReq) (*im.BroadcastReply, error) { - if len(s.buffer) *10 > 8* cap(s.buffer){ - return nil,errors.New(fmt.Sprintf("im/rpc: too much message ,buffer length is %v but cap is %v",len(s.buffer),cap(s.buffer))) - } +func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { + var err error start := time.Now() - s.buffer <- req + err = wti.BroadCastByTarget(req.Data) escape := time.Since(start) - s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","Broadcast",escape) + s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","BroadcastByWTI",escape) return &im.BroadcastReply{ Size: int64(len(s.buffer)), - },nil + },err } diff --git a/run.go b/run.go index f5f4bb3..67bb101 100644 --- a/run.go +++ b/run.go @@ -1,47 +1,23 @@ package im -import ( - "net" - "net/http" - "golang.org/x/sync/errgroup" -) - var VERSION = "master" func (s *ImSrever)Run ()error { - wg := errgroup.Group{} - wg.Go(s.runhttpServer) // 监控HTTP 服务情况 - wg.Go(s.runGrpcServer) // 监控GRPC 服务情况 - wg.Go(s.monitorOnline) // 监控全局在线情况 - wg.Go(s.PushBroadCast) // 监控全局广播情况 - return wg.Wait() -} - - -func (s *ImSrever)runGrpcServer ()error{ - listen, err := net.Listen("tcp", s.opt.ServerRpcPort) - if err !=nil { s.opt.ServerLogger.Fatal(err) } - s.opt.ServerLogger.Infof("im/run : start GRPC server at %s ", s.opt.ServerRpcPort) - if err := s.rpc.Serve(listen);err !=nil { - s.opt.ServerLogger.Fatal(err) + var prepareParallelFunc = [] func()error { + // 启用单独goroutine 进行监控 + s.monitorOnline, + s.monitorWTI, + // 启用单独goroutine 进行运行 + s.runGrpcServer, + s.runhttpServer, + s.PushBroadCast, } - - return nil -} - - -func (s *ImSrever)runhttpServer ()error{ - listen, err := net.Listen("tcp", s.opt.ServerHttpPort) - if err !=nil { s.opt.ServerLogger.Fatal(err) } - s.opt.ServerLogger.Infof("im/run : start HTTP server at %s ", s.opt.ServerHttpPort) - if err := http.Serve(listen,s.http);err !=nil { - s.opt.ServerLogger.Fatal(err) - } - return nil + return ParallelRun(prepareParallelFunc... ) } +// 服务关闭 func (s *ImSrever)Close()error{ s.rpc.GracefulStop() s.cancel() diff --git a/server.go b/server.go deleted file mode 100644 index 248c340..0000000 --- a/server.go +++ /dev/null @@ -1,63 +0,0 @@ -package im - -import ( - im "github.com/mongofs/api/im/v1" - "github.com/mongofs/im/bucket" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "net/http" - "time" -) - -type ImSrever struct { - http *http.ServeMux - rpc *grpc.Server - bs []bucket.Bucketer - ps atomic.Int64 - - buffer chan *im.BroadcastReq // 全局广播队列 - cancel func() - - opt *Option -} - -// 统计用户在线人数 -// 监控buffer 长度 并进行报警 -func (s *ImSrever) monitorOnline() error { - for { - n := int64(0) - for _, bck := range s.bs { - bck.Flush() - n += bck.Onlines() - } - s.ps.Store(n) - time.Sleep(10 * time.Second) - } - return nil -} - -// 单独处理广播业务 -func (s *ImSrever) PushBroadCast() error { - wg := errgroup.Group{} - for i := 0; i < s.opt.BroadCastHandler; i++ { - wg.Go(func() error { - for { - req := <-s.buffer - for _, v := range s.bs { - err := v.BroadCast(req.Data, false) - if err != nil { - s.opt.ServerLogger.Error(err) - } - } - } - return nil - }) - } - return wg.Wait() -} - -func (s *ImSrever) bucket(token string) bucket.Bucketer { - idx := Index(token,uint32(s.opt.ServerBucketNumber)) - return s.bs[idx] -} diff --git a/test/wti/conn_test.go b/test/wti/conn_test.go new file mode 100644 index 0000000..6d27586 --- /dev/null +++ b/test/wti/conn_test.go @@ -0,0 +1,57 @@ +package wti + +import ( + "fmt" + "github.com/gorilla/websocket" + "testing" + "time" +) + +// 模拟创建3个链接: 分别是订阅 +// conn1 : v1 +// conn2 : v2 +// conn3 : v1,v2 + + +func Test_Conn(t *testing.T){ + go CreateClient("v1") + go CreateClient("v2") + go CreateClient("v3") + //CreateClient("v1&v2") + time.Sleep(1000 *time.Second) +} + +func CreateClient (token string){ + dialer := websocket.Dialer{} + conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=v1",token), nil) + if nil != err { + fmt.Println(err) + return + } + defer conn.Close() + + counter :=0 + + go func() { + time.Sleep(50*time.Second) + conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token))) + }() + + for { + messageType, messageData, err := conn.ReadMessage() + if nil != err { + fmt.Println(err) + break + } + switch messageType { + case websocket.TextMessage://文本数据 + counter++ + fmt.Println(token,string(messageData),counter) + case websocket.BinaryMessage://二进制数据 + case websocket.CloseMessage://关闭 + case websocket.PingMessage://Ping + case websocket.PongMessage://Pong + default: + } + } +} \ No newline at end of file diff --git a/test/wti/im_test.go b/test/wti/im_test.go index 8526ffb..a002fd9 100644 --- a/test/wti/im_test.go +++ b/test/wti/im_test.go @@ -8,12 +8,14 @@ import ( "testing" ) -// 创建IM测试服务器 +// 创建IM测试服务器,模拟测试im服务器的具体内容。 func Test_IMServer(t *testing.T) { - + wti.SetSupport() + serv := NewFakeImServer() + serv.Run() } -// 这里就是一个Im聊天服务器 +// this is a temp imserver type Fake struct { im *im.ImSrever } @@ -33,9 +35,13 @@ func (f *Fake)ValidateFailed(err error,cli client.Clienter){ } func (f *Fake)ValidateSuccess(cli client.Clienter){ // 可以通过header 或者 get query 方式来传参,或者从数据库获取当前用户的tag - tags := []string{"v1","v2"} + req := cli.Request() + res := req.Form["ver"] + tags := []string{res[0]} // 调用wti - wti.Factory.SetTAG(cli.(*client.Cli),tags...) + if err := wti.SetTAG(cli.(*client.Cli),tags...);err != nil { + fmt.Println(err) + } } func NewFakeImServer() *Fake { diff --git a/test/wti/rpc_test.go b/test/wti/rpc_test.go new file mode 100644 index 0000000..f554bb3 --- /dev/null +++ b/test/wti/rpc_test.go @@ -0,0 +1,45 @@ +package wti + +import ( + "context" + "fmt" + im "github.com/mongofs/api/im/v1" + "google.golang.org/grpc" + "testing" +) + + +const ( + Address = "ws://127.0.0.1:8080/conn" + DefaultRpcAddress = "127.0.0.1:8081" +) + + +func TestSendMessage(t *testing.T) { + cli := Client() + ctx := context.Background() + tests:= []*im.BroadcastByWTIReq{ + { + Data: map[string][]byte{ + "v1":[]byte("fucker you v2"), + "v2":[]byte("lover you v1"), + "v3":[]byte("what are you v1"), + }, + }, + } + for _,v := range tests { + fmt.Println(cli.BroadcastByWTI(ctx,v,)) + } + // output + // nil + // nil + // nil + // nil +} + +var conn,_ = grpc.Dial(DefaultRpcAddress,grpc.WithInsecure()) + + +func Client ()im.BasicClient{ + return im.NewBasicClient(conn) +} From d6de490f21de4eea49000745bd933f5bb7cc30b5 Mon Sep 17 00:00:00 2001 From: steven Date: Sat, 19 Feb 2022 17:42:14 +0800 Subject: [PATCH 3/3] feat(wti) --- go.mod | 2 +- go.sum | 8 +++++ option.go | 36 +++++++++++++++---- parallel.go | 3 +- plugins/wti/interface.go | 28 ++++++++++----- plugins/wti/wti.go | 48 ++++++++++++++++++------- plugins/wti/wti_test.go | 52 ++++++++++++++++++++++++++++ rpc.go | 25 +++++++++++++- test/wti/conn_test.go | 75 ++++++++++++++++++++++++++-------------- test/wti/doc.go | 4 +++ test/wti/im_test.go | 5 ++- test/wti/rpc_test.go | 30 +++++++++++----- 12 files changed, 250 insertions(+), 66 deletions(-) create mode 100644 test/wti/doc.go diff --git a/go.mod b/go.mod index 513dbfe..2e813dd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gin-gonic/gin v1.7.7 // indirect github.com/golang/protobuf v1.5.2 github.com/gorilla/websocket v1.4.2 - github.com/mongofs/api v1.0.1 + github.com/mongofs/api v1.0.5 github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 github.com/sirupsen/logrus v1.8.1 github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index e578899..96e2cd1 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,14 @@ github.com/mongofs/api v1.0.0 h1:uMm8oWQPR0kHJu/NwewURZwwOpClHMG7ApHjAsWe2wQ= github.com/mongofs/api v1.0.0/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= github.com/mongofs/api v1.0.1 h1:r8vUeHTn7p3TqQQa8RDuL4lw+f0ZAnXuDGJ/hSwtIe0= github.com/mongofs/api v1.0.1/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.2 h1:7+TP0+gxhrHuM0HY+DmAfkR/Z59O4R8TuaTrqo3sCh4= +github.com/mongofs/api v1.0.2/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.3 h1:fRJXW+duFOHCwwxwElS1mg8YYEzaE05I9f9Q0N7ENuI= +github.com/mongofs/api v1.0.3/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.4 h1:r64CS/NOUOLelgftEYW0UuXw7FQibonBAQGYIe711Vo= +github.com/mongofs/api v1.0.4/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.5 h1:iqO+At5mxYjfuBR+97l4n5pxcefy5CwYtfTIJqFXOeI= +github.com/mongofs/api v1.0.5/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 h1:+LdeRyjL/EoKpNAIipVCABxXoiYm/urYpcKMq0auNOg= github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44/go.mod h1:Wv4aMo8tAI+jNC9+SpfA+CH5CmLl9yEXEXY/O/P+Pyg= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= diff --git a/option.go b/option.go index a525141..7ae330c 100644 --- a/option.go +++ b/option.go @@ -2,30 +2,35 @@ package im import ( "github.com/mongofs/im/client" - "github.com/mongofs/im/validate" "github.com/mongofs/im/log" + "github.com/mongofs/im/plugins/wti" + "github.com/mongofs/im/validate" "github.com/mongofs/im/validate/example" ) const ( + // 对客户端进行默认参数设置 DefaultClientHeartBeatInterval = 120 DefaultClientReaderBufferSize = 1024 DefaultClientWriteBufferSize = 1024 DefaultClientBufferSize = 8 DefaultClientMessageType = 1 DefaultClientProtocol = 1 - DefaultBucketSize = 1 << 8 // 256 + // 对分片进行基础设置 + DefaultBucketSize = 1 << 8 // 256 + + // 默认基础的server配置 DefaultServerBucketNumber = 1 << 6 // 64 DefaultServerRpcPort = ":8081" DefaultServerHttpPort = ":8080" - DefaultServerBuffer = 200 + // 设置对广播能力的参数支持 DefaultBroadCastHandler = 10 DefaultBroadCastBuffer = 200 - - PluginWTISupport = false // 是否支持plugin 的wti ,默认为关 + // plugins 的参数支持 + PluginWTISupport = false // 是否支持WTI 进行扩展 ) var DefaultValidate validate.Validater = &example.DefaultValidate{} @@ -33,6 +38,7 @@ var DefaultReceive client.Receiver = &client.Example{} var DefaultLogger log.Logger = &log.DefaultLog{} type Option struct { + // client ClientHeartBeatInterval int // 用户心跳间隔 ClientReaderBufferSize int // 用户连接读取buffer ClientWriteBufferSize int // 用户连接写入buffer @@ -40,7 +46,10 @@ type Option struct { ClientMessageType int // 用户发送的数据类型 ClientProtocol int // 压缩协议 + // bucket BucketSize int // bucket用户 + + // server ServerBucketNumber int // 所有 ServerRpcPort string ServerHttpPort string @@ -52,6 +61,7 @@ type Option struct { BroadCastBuffer int BroadCastHandler int + //plugins SupportPluginWTI bool // 是否支持wti插件 } @@ -74,6 +84,9 @@ func DefaultOption() *Option { BroadCastBuffer: DefaultBroadCastBuffer, BroadCastHandler: DefaultBroadCastHandler, + + // 插件支持 + SupportPluginWTI: PluginWTISupport, } } @@ -105,13 +118,12 @@ func WithServerValidate(ServerValidate validate.Validater) OptionFunc { } } -func WithServerLogger(ServerLogger log.Logger ) OptionFunc { +func WithServerLogger(ServerLogger log.Logger) OptionFunc { return func(b *Option) { b.ServerLogger = ServerLogger } } - func WithServerBucketNumber(ServerBucketNumber int) OptionFunc { return func(b *Option) { b.ServerBucketNumber = ServerBucketNumber @@ -177,3 +189,13 @@ func WithBroadCastHandler(BroadCastHandler int) OptionFunc { b.BroadCastHandler = BroadCastHandler } } + +//设置plugin内容 +func WithPluginsWTI(SupportPluginWTI bool) OptionFunc { + return func(b *Option) { + if SupportPluginWTI { + wti.SetSupport() + } + b.SupportPluginWTI = SupportPluginWTI + } +} diff --git a/parallel.go b/parallel.go index 649f835..220be9f 100644 --- a/parallel.go +++ b/parallel.go @@ -8,7 +8,7 @@ import ( "github.com/mongofs/im/plugins/wti" ) - +// 并行的启动,使用goroutine 来进行管理 func ParallelRun (parallels ... func()error)error{ wg := errgroup.Group{} for _,v:= range parallels { @@ -43,7 +43,6 @@ func (s *ImSrever) monitorWTI() error { } } return nil - } // 监控rpc 服务 diff --git a/plugins/wti/interface.go b/plugins/wti/interface.go index e26b89d..0c7ae4c 100644 --- a/plugins/wti/interface.go +++ b/plugins/wti/interface.go @@ -27,16 +27,26 @@ type WTI interface { // 获取到TAG 的创建时间,系统会判断这个tag创建时间和当前人数来确认是否需要删除这个tag GetTAGCreateTime(tag string) int64 - // 获取到TAG 的在线人数,系统会判断这个tag如果没有在线人数为0 且创建时间大于MAX-wti-create-time ,这个tag就会被回收 - GetTAGClients(tag string) int64 + // 获取到所有tag的用户分布 + Distribute(tags ...string)map[string]*DistributeParam // 调用方法的回收房间的策略 FlushWTI() } +type DistributeParam struct { + TagName string + Onlines int64 + CreateTime int64 +} + + + // 其他地方将调用这个变量,如果自己公司实现tag需要注入在程序中进行注入 -var factory WTI = newwti() -var isSupportWTI = atomic.NewBool(false) +var ( + factory WTI = newwti() + isSupportWTI = atomic.NewBool(false) +) func Inject(wti WTI) { factory = wti @@ -47,7 +57,7 @@ func SetSupport (){ } var ( - ERRNotSupportWTI = errors.New("im/plugins/wti: not set the wti support params") + ERRNotSupportWTI = errors.New("im/plugins/wti: you should call the SetSupport func") ) func SetTAG(cli *client.Cli, tag ...string) error { @@ -98,14 +108,16 @@ func GetTAGCreateTime(tag string) (int64, error) { return res, nil } -func GetTAGClients(tag string) (int64, error) { + +func Distribute() (map[string]*DistributeParam, error) { if isSupportWTI.Load() == false { - return 0, ERRNotSupportWTI + return nil, ERRNotSupportWTI } - res := factory.GetTAGClients(tag) + res := factory.Distribute() return res, nil } + func FlushWTI() error { if isSupportWTI.Load() == false { return ERRNotSupportWTI diff --git a/plugins/wti/wti.go b/plugins/wti/wti.go index 55cf617..5d85a9c 100644 --- a/plugins/wti/wti.go +++ b/plugins/wti/wti.go @@ -1,5 +1,3 @@ -// 构建tag 标签的这个功能,主要目标希望将用户分类起来,1000000 * 8 /1024 / 1024 = 7M的内存,额外使用tag包进行封装的话, -// 预计使用一百万用户的内存需要大概7~8M,这个范围是可以接受的。 package wti import ( @@ -23,7 +21,7 @@ func newwti() WTI { // 给用户设置标签 -func (t *tg) SetTAG(cli *client.Cli, tags ...string) { +func (t *tg) SetTAG(cli *client.Cli, tags ...string) { if len(tags)== 0 { return } @@ -99,15 +97,6 @@ func (t *tg) GetTAGCreateTime(tag string) int64{ return 0 } -// 获取到tag的总人数 -func (t *tg) GetTAGClients(tag string) int64 { - t.rw.RLock() - defer t.rw.RUnlock() - if v,ok:=t.mp[tag];ok{ - return v.Counter() - } - return 0 -} // 删除tag ,这个调用一个大锁将全局锁住清空过去的内容 func (t *tg) FlushWTI() { @@ -118,4 +107,37 @@ func (t *tg) FlushWTI() { delete(t.mp,k) } } -} \ No newline at end of file +} + + +// 获取到tagOnlines 在线用户人数 +func (t *tg) Distribute(tags... string) map[string]*DistributeParam { + var res = map[string]*DistributeParam{} + if len(tags) == 0 { + t.rw.RLock() + for k,v:= range t.mp { + tem := &DistributeParam{ + TagName: k, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[k]=tem + } + t.rw.RUnlock() + return res + } + t.rw.RLock() + for _,tag := range tags { + // get the tag + if v,ok := t.mp[tag];ok { + tem := &DistributeParam{ + TagName: tag, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[tag]= tem + } + } + t.rw.RUnlock() + return res +} diff --git a/plugins/wti/wti_test.go b/plugins/wti/wti_test.go index b4b6e86..a4e3f49 100644 --- a/plugins/wti/wti_test.go +++ b/plugins/wti/wti_test.go @@ -2,9 +2,61 @@ package wti import ( "fmt" + "github.com/mongofs/im/client" + "net/http" "testing" ) +type MockClient struct {} +func NewClient()client.Clienter { + return &MockClient{} +} + +func (m MockClient) Send(bytes []byte, i ...int64) error { + fmt.Printf("Send Called : %v \n\r",string(bytes)) + return nil +} +func (m MockClient) Offline() { + panic("implement me") +} +func (m MockClient) ResetHeartBeatTime() { + panic("implement me") +} +func (m MockClient) LastHeartBeat() int64 { + panic("implement me") +} +func (m MockClient) Token() string { + panic("implement me") +} +func (m MockClient) Request() *http.Request { + panic("implement me") +} + + + + +func TestTg_SetTAG(t *testing.T) { + tests := []struct{ + tag string + number int + }{ + { + tag: "v1", + number: 100, + }, + { + tag: "v2", + number: 200, + }, + } + + for _,v := range tests{ + for i :=0 ;i< v.number;i++ { + // SetTAG(NewClient(),v.tag) ,todo 待优化 + } + } +} + // 广播,针对tag进行广播,这也是wti的核心接口,分类广播也是基于这个接口 func TestTg_BroadCast(t *testing.T) { tests := []struct { diff --git a/rpc.go b/rpc.go index 7d3b275..c6d733c 100644 --- a/rpc.go +++ b/rpc.go @@ -68,7 +68,9 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro // 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在 // 使用用户 -func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { + +// 进行广播 +func (s *ImSrever) WTIBroadcast(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { var err error start := time.Now() err = wti.BroadCastByTarget(req.Data) @@ -79,3 +81,24 @@ func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq },err } + +// 获取每个版本多少人 +func (s *ImSrever) WTIDistribute(ctx context.Context, req *im.Empty) (*im.WTIDistributeReply,error){ + distribute,err := wti.Distribute() + if err != nil { + return nil,err + } + + var result = map[string]*im.WTIDistribute{} + for k,v := range distribute{ + data := &im.WTIDistribute{ + Tag: v.TagName, + Number: v.Onlines, + CreateTime: v.CreateTime, + } + result[k] =data + } + return &im.WTIDistributeReply{ + Data: result, + },nil +} \ No newline at end of file diff --git a/test/wti/conn_test.go b/test/wti/conn_test.go index 6d27586..932a31e 100644 --- a/test/wti/conn_test.go +++ b/test/wti/conn_test.go @@ -4,39 +4,64 @@ import ( "fmt" "github.com/gorilla/websocket" "testing" + "math/rand" "time" ) -// 模拟创建3个链接: 分别是订阅 -// conn1 : v1 -// conn2 : v2 -// conn3 : v1,v2 +var r = rand.New(rand.NewSource(time.Now().Unix())) -func Test_Conn(t *testing.T){ - go CreateClient("v1") - go CreateClient("v2") - go CreateClient("v3") - //CreateClient("v1&v2") - time.Sleep(1000 *time.Second) +func RandString(len int) string { + bytes := make([]byte, len) + for i := 0; i < len; i++ { + b := r.Intn(26) + 65 + bytes[i] = byte(b) + } + return string(bytes) } -func CreateClient (token string){ + + + +func Test_Conn(t *testing.T) { + tests := []struct{ + tag string + number int + }{ + { + tag: "v1", + number: 50, + }, + { + tag: "v2", + number: 60, + }, + } + + for _,v := range tests{ + for i :=0 ;i< v.number;i++ { + go CreateClient(v.tag) + } + } + time.Sleep(1000 * time.Second) +} + + +// http://www.baidu.com/conn?token=1080&version=v.10 +func CreateClient(version string) { + token := RandString(20) dialer := websocket.Dialer{} - conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=v1",token), nil) + conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&version=%s", token,version), nil) if nil != err { fmt.Println(err) return } defer conn.Close() - - counter :=0 - - go func() { + counter := 0 + /*go func() { time.Sleep(50*time.Second) conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token))) - }() - + }()*/ for { messageType, messageData, err := conn.ReadMessage() if nil != err { @@ -44,14 +69,14 @@ func CreateClient (token string){ break } switch messageType { - case websocket.TextMessage://文本数据 + case websocket.TextMessage: //文本数据 counter++ - fmt.Println(token,string(messageData),counter) - case websocket.BinaryMessage://二进制数据 - case websocket.CloseMessage://关闭 - case websocket.PingMessage://Ping - case websocket.PongMessage://Pong + fmt.Println(token, string(messageData), counter) + case websocket.BinaryMessage: //二进制数据 + case websocket.CloseMessage: //关闭 + case websocket.PingMessage: //Ping + case websocket.PongMessage: //Pong default: } } -} \ No newline at end of file +} diff --git a/test/wti/doc.go b/test/wti/doc.go new file mode 100644 index 0000000..801f12a --- /dev/null +++ b/test/wti/doc.go @@ -0,0 +1,4 @@ +// 关于wti的测试 ,首先使用wti是需要建立一个wti的实例,然后将实例注册到wti下面,后面就能进行wti进行用户标签分类了 +// 当然用户也可以选择我们提供的WTI的实例,具体实现可以参考wti的doc进行观察 +package wti + diff --git a/test/wti/im_test.go b/test/wti/im_test.go index a002fd9..ae8d12b 100644 --- a/test/wti/im_test.go +++ b/test/wti/im_test.go @@ -8,6 +8,8 @@ import ( "testing" ) +// http://www.baidu.com/conn?token=1080&version=v.10 + // 创建IM测试服务器,模拟测试im服务器的具体内容。 func Test_IMServer(t *testing.T) { wti.SetSupport() @@ -36,7 +38,7 @@ func (f *Fake)ValidateFailed(err error,cli client.Clienter){ func (f *Fake)ValidateSuccess(cli client.Clienter){ // 可以通过header 或者 get query 方式来传参,或者从数据库获取当前用户的tag req := cli.Request() - res := req.Form["ver"] + res := req.Form["version"] tags := []string{res[0]} // 调用wti if err := wti.SetTAG(cli.(*client.Cli),tags...);err != nil { @@ -50,6 +52,7 @@ func NewFakeImServer() *Fake { options := []im.OptionFunc{ im.WithBroadCastBuffer(10), im.WithServerValidate(res), + im.WithPluginsWTI(true), } // 设置Option option := im.NewOption(options...) diff --git a/test/wti/rpc_test.go b/test/wti/rpc_test.go index f554bb3..c974c5b 100644 --- a/test/wti/rpc_test.go +++ b/test/wti/rpc_test.go @@ -6,6 +6,7 @@ import ( im "github.com/mongofs/api/im/v1" "google.golang.org/grpc" "testing" + "time" ) @@ -14,10 +15,18 @@ const ( DefaultRpcAddress = "127.0.0.1:8081" ) +var ( + cli = Client() + ctx = context.Background() +) + +func Client ()im.BasicClient{ + return im.NewBasicClient(conn) +} + +var conn,_ = grpc.Dial(DefaultRpcAddress,grpc.WithInsecure()) func TestSendMessage(t *testing.T) { - cli := Client() - ctx := context.Background() tests:= []*im.BroadcastByWTIReq{ { Data: map[string][]byte{ @@ -28,7 +37,7 @@ func TestSendMessage(t *testing.T) { }, } for _,v := range tests { - fmt.Println(cli.BroadcastByWTI(ctx,v,)) + fmt.Println(cli.WTIBroadcast(ctx,v,)) } // output // nil @@ -37,9 +46,14 @@ func TestSendMessage(t *testing.T) { // nil } -var conn,_ = grpc.Dial(DefaultRpcAddress,grpc.WithInsecure()) - - -func Client ()im.BasicClient{ - return im.NewBasicClient(conn) +func TestDistribute(t *testing.T) { + for { + distribute,err :=cli.WTIDistribute(ctx,&im.Empty{}) + if err !=nil { + t.Fatal(err) + } + fmt.Printf("当前用户分布: %+v\n\r ",distribute) + time.Sleep(5*time.Second) + } } +