diff --git a/benchmark/conn.go b/benchmark/conn.go index c72b72a..1244637 100644 --- a/benchmark/conn.go +++ b/benchmark/conn.go @@ -12,15 +12,13 @@ import ( func CreateClient (token string){ dialer := websocket.Dialer{} - conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s",token), nil) + conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=%s",token,getVersion()), nil) if nil != err { log.Println(err) return } defer conn.Close() - counter :=0 - go func() { time.Sleep(10*time.Second) conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token))) @@ -49,6 +47,15 @@ func CreateClient (token string){ } } +// 获取版本 +func getVersion()string{ + if time.Now().Unix() %2 == 0 { + return "v1" + }else{ + return "v2" + } +} + // 用户断线重连 func CreateClientAndTickerReConn (token string){ diff --git a/benchmark/rpc_bench_test.go b/benchmark/rpc_bench_test.go index 56eedc8..a6fbf97 100644 --- a/benchmark/rpc_bench_test.go +++ b/benchmark/rpc_bench_test.go @@ -60,6 +60,7 @@ func BenchmarkBroadCast(t *testing.B) { // BenchmarkBroadCast_paraml-6 2751 391783 ns/op +// BenchmarkBroadCast_paraml-6 3105 494931 ns/op func BenchmarkBroadCast_paraml(t *testing.B) { cli := Client() ctx := context.Background() @@ -70,3 +71,22 @@ func BenchmarkBroadCast_paraml(t *testing.B) { cli.Broadcast(ctx,data) } } + + +// 压测分类广播,大致情况如下 +// BenchmarkBroadCastByWTI-6 2905 411497 ns/op +// BenchmarkBroadCastByWTI-6 3099 421722 ns/op +// BenchmarkBroadCastByWTI-6 2709 385287 ns/op +func BenchmarkBroadCastByWTI(t *testing.B){ + cli := Client() + ctx := context.Background() + tests:= &im.BroadcastByWTIReq{Data: map[string][]byte{ + "v1":[]byte("fucker you v2"), + "v2":[]byte("lover you v1"), + "v3":[]byte("what are you v1"), + }} + + for i:= 0;i []string + rw *sync.RWMutex +} + + +func newwti() WTI { + return &tg{ + mp: map[string]*Group{}, + rw: &sync.RWMutex{}, + } +} + + +// 给用户设置标签 +func (t *tg) SetTAG(cli *client.Cli, tags ...string) { + if len(tags)== 0 { + return + } + t.rw.Lock() + defer t.rw.Unlock() + for _,tag := range tags{ + if group,ok:= t.mp[tag];!ok { // wti not exist + t.mp[tag]= NewGroup() + t.mp[tag].addCli(cli) + }else { // wti exist + group.addCli(cli) + } + } +} + + +// 给某一个标签的群体进行广播 +func (t *tg) BroadCast(content []byte,tags ...string) { + if len(tags)== 0 { + return + } + t.rw.RLock() + defer t.rw.RUnlock() + + for _,tag := range tags{ + if group,ok := t.mp[tag];ok{ + group.broadCast(content) + } + } +} + + +// 通知所有组进行自查某个用户,并删除 +func (t *tg)Update(token ...string){ + t.rw.RLock() + defer t.rw.RUnlock() + for _,v := range t.mp { + v.Update(token... ) + } +} + + +// 调用这个就可以分类广播,可能出现不同的targ 需要不同的内容 +func(t *tg)BroadCastByTarget(targetAndContent map[string][]byte){ + if len(targetAndContent) == 0{ return } + for target ,content := range targetAndContent { + go t.BroadCast(content,target) + } +} + + +// 获取到用户token的所有TAG,时间复杂度是O(m) ,m是所有的房间 +func (t *tg)GetClienterTAGs(token string)[]string{ + var res []string + t.rw.RLock() + defer t.rw.RUnlock() + for k,v:= range t.mp{ + exsit:= v.isExsit(token) + if exsit { + res = append(res,k ) + } + } + return res +} + +// 如果创建时间为0 ,表示没有这个房间 +func (t *tg) GetTAGCreateTime(tag string) int64{ + t.rw.RLock() + defer t.rw.RUnlock() + if v,ok:=t.mp[tag];ok{ + return v.createTime + } + return 0 +} + + +// 删除tag ,这个调用一个大锁将全局锁住清空过去的内容 +func (t *tg) FlushWTI() { + t.rw.Lock() + defer t.rw.Unlock() + for k,v := range t.mp{ + if v.Counter() ==0 && time.Now().Unix() -v.CreateTime() >60 { + delete(t.mp,k) + } + } +} + + +// 获取到tagOnlines 在线用户人数 +func (t *tg) Distribute(tags... string) map[string]*DistributeParam { + var res = map[string]*DistributeParam{} + if len(tags) == 0 { + t.rw.RLock() + for k,v:= range t.mp { + tem := &DistributeParam{ + TagName: k, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[k]=tem + } + t.rw.RUnlock() + return res + } + t.rw.RLock() + for _,tag := range tags { + // get the tag + if v,ok := t.mp[tag];ok { + tem := &DistributeParam{ + TagName: tag, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[tag]= tem + } + } + t.rw.RUnlock() + return res +} diff --git a/plugins/wti/wti_test.go b/plugins/wti/wti_test.go new file mode 100644 index 0000000..a4e3f49 --- /dev/null +++ b/plugins/wti/wti_test.go @@ -0,0 +1,161 @@ +package wti + +import ( + "fmt" + "github.com/mongofs/im/client" + "net/http" + "testing" +) + +type MockClient struct {} +func NewClient()client.Clienter { + return &MockClient{} +} + +func (m MockClient) Send(bytes []byte, i ...int64) error { + fmt.Printf("Send Called : %v \n\r",string(bytes)) + return nil +} +func (m MockClient) Offline() { + panic("implement me") +} +func (m MockClient) ResetHeartBeatTime() { + panic("implement me") +} +func (m MockClient) LastHeartBeat() int64 { + panic("implement me") +} +func (m MockClient) Token() string { + panic("implement me") +} +func (m MockClient) Request() *http.Request { + panic("implement me") +} + + + + +func TestTg_SetTAG(t *testing.T) { + tests := []struct{ + tag string + number int + }{ + { + tag: "v1", + number: 100, + }, + { + tag: "v2", + number: 200, + }, + } + + for _,v := range tests{ + for i :=0 ;i< v.number;i++ { + // SetTAG(NewClient(),v.tag) ,todo 待优化 + } + } +} + +// 广播,针对tag进行广播,这也是wti的核心接口,分类广播也是基于这个接口 +func TestTg_BroadCast(t *testing.T) { + tests := []struct { + target []string + content []byte + }{ + { + target: []string{"v1"} , + content: []byte("hello content"), + }, + { + target: []string{"v1","v2"} , + content: []byte("hello content"), + }, + { + target: []string{"v1","v2","v3"} , + content: []byte("hello content"), + }, + } + // 测试 v1、v2 、v1 三个版本的不同,如果要模式真实连接,则需要执行 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + err := BroadCast(v.content,v.target...) + if err !=nil { + t.Fatal(err) + } + } + // v1 websocket output : hello content ,and v2,v3 no content output + // v1,v2 websocket output : hello content,and v3 no content output + // v1,v2,v3 websocket output : hello content + +} + + + +// 主要应对数据发送的时候版本的问题,比如某一条数据由于协议更改需要向上兼容老的版本,因为这是应用层的内容 +// 所以使用wti 接口来进行兼容处理。避免进行内容的感染 +func TestTg_BroadCastByTarget(t *testing.T) { + tests := []struct { + give map[string][]byte + }{ + { + give: map[string][]byte{ + "v1": []byte("first v1 "), + "v2": []byte("second v2 "), + "v3": []byte("third v3 "), + }, + + }, + { + give: map[string][]byte{ + "v1": []byte("hello v1 "), + "v2": []byte("hello v2 "), + "v3": []byte("hello v3 "), + }, + }, + + } + // 测试 v1、v2 、v1 三个版本的不同,如果要模式真实连接,则需要执行 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + err:= BroadCastByTarget(v.give) + if err !=nil { + t.Fatal(err) + } + } + // v1,v2,v3 websocket output :first v1 | second v2 | third v3 + // v1,v2,v3 websocket output :hello v1 | hello v2 | hello v3 +} + + + + +// 主要应对数据发送的时候版本的问题,比如某一条数据由于协议更改需要向上兼容老的版本,因为这是应用层的内容 +// 所以使用wti 接口来进行兼容处理。避免进行内容的感染 +func TestTg_UpdateAndF(t *testing.T) { + tests := []struct { + give string + }{ + { + give: "1234", + }, + } + // 这个测试条件相对比较苛刻,update接口主要作用是接收globalclosed 的信号,如果某个用户关闭连接 + // im线程就会释放连接之前就会告诉当前的update方法,所以只需要判断当前用户是否被删除就好了 + // 1. 创建im 连接服务器,并开启wti 配置参数 + // 2. 要在handler方法中调用factory 进行调用SetTAG + // 3. 需要建立连接 + for _,v := range tests { + err := Update(v.give) // + if err !=nil { + t.Fatal(err) + } + res ,_:= GetClienterTAGs(v.give) + fmt.Println(res) + } + // output : [] +} \ No newline at end of file diff --git a/rooms/interface.go b/rooms/interface.go deleted file mode 100644 index bb6beb4..0000000 --- a/rooms/interface.go +++ /dev/null @@ -1,33 +0,0 @@ -package rooms - -import "github.com/mongofs/im/client" - -type room interface { - AddUser(token string, clienter client.Clienter) // 将用户添加到某个房间 - - DelUser(token string) // 将用户从房间删除 - - PushData(data []byte) int // 推送消息给房间,并返回推送在线用户数量 - - PushDataToPointedUser(data []byte, token ...string) []string // 将消息推送给房间指定用户 - - GetRoomUserNumber() int32 //获取当前房间人数:注意特指当前服务的机器,不能代表房间所有用户,建议通过redis incr 存储 - - GetRoomCreateTime() int64 // 获取当前房间创建时间 - - GetRoomUserToken()[]string // 获取当前房间用户 - -} - -// 房间管理器 -type RoomSet interface { - - // add user to room - AddClientToRoom(token string, conn client.Clienter, RoomID string) - // del user to room - DelClientFromRoom(token string, RoomID string) - // push data to room - PushDataToRoom(data []byte, RoomID string, token ...string) - // get room usernumber - GetRoomUserNumber() int32 -} diff --git a/rooms/map.go b/rooms/map.go deleted file mode 100644 index 62710c9..0000000 --- a/rooms/map.go +++ /dev/null @@ -1,81 +0,0 @@ -package rooms - -import ( - "github.com/mongofs/im/client" - "go.uber.org/atomic" - "sync" - "time" -) - -type MapRoom struct { - rw *sync.RWMutex - ro map[string]client.Clienter - cot *atomic.Int32 //当前在线用户 - createTime int64 -} - -func (m *MapRoom) GetRoomUserToken() []string { - panic("implement me") -} - -func (m *MapRoom) GetRoomCreateTime() int64 { - return m.createTime -} - -func NewMapRoom()room{ - return &MapRoom{ - rw: &sync.RWMutex{}, - ro: make(map[string]client.Clienter,20), - cot: &atomic.Int32{}, - createTime: time.Now().Unix(), - } -} - - -func (m *MapRoom) AddUser(token string, clienter client.Clienter) { - m.rw.Lock() - m.ro[token]= clienter - m.rw.Unlock() - - m.cot.Inc() -} - - - -func (m *MapRoom) DelUser(token string) { - m.rw.Lock() - delete(m.ro,token) - m.rw.Unlock() - m.cot.Dec() -} - - - -func (m *MapRoom) PushData(data []byte) int{ - m.rw.RLock() - for _,v:= range m.ro{ - v.Send(data) // todo 需要优化 - } - m.rw.RUnlock() -} - - - -func (m *MapRoom) PushDataToPointedUser(data []byte, token ...string)[]string { - m.rw.RLock() - for _,v:= range token{ - if temCli,ok:= m.ro[v];ok{ - temCli.Send(data) - } - } - m.rw.Unlock() -} - - - -func (m *MapRoom) GetRoomUserNumber() int32 { - return m.cot.Load() -} - - - diff --git a/rooms/roomset.go b/rooms/roomset.go deleted file mode 100644 index 57f79ca..0000000 --- a/rooms/roomset.go +++ /dev/null @@ -1,98 +0,0 @@ -package rooms - -import ( - "github.com/mongofs/im/client" - "go.uber.org/atomic" - "sync" - "time" -) - -type roomSet struct { - createM func(RooID string) room - rw *sync.RWMutex - rooms map[string]room - clear time.Duration - - inRoom *atomic.Int64 -} - - - -func (r *roomSet) GetRoomUserNumber() int32 { - panic("implement me") -} - - -func (r *roomSet) monitor() { - for { - r.rw.Lock() - r.inRoom.Store(0) - for k, v := range r.rooms { - // delete the room - counter := v.GetRoomUserNumber() - if counter == 0 && time.Now().Unix()-v.GetRoomCreateTime() > 60*60*2 { - delete(r.rooms, k) - continue - } - r.inRoom.Add(int64(counter)) - } - r.rw.Unlock() - - - time.Sleep(r.clear * time.Second) - } -} - - - -func (r *roomSet) AddClientToRoom(token string, conn client.Clienter, RoomID string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - temRoom := r.createM(RoomID) - temRoom.AddUser(token, conn) - r.rw.Lock() - r.rooms[RoomID] = temRoom - r.rw.Unlock() - } - tem.AddUser(token, conn) -} - - - -func (r *roomSet)GetRoomOnlineList(){ - - - -} - - - - -func (r roomSet) DelClientFromRoom(token string, RoomID string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - return - } - tem.DelUser(token) -} - - - -func (r roomSet) PushDataToRoom(data []byte, RoomID string, token ...string) { - r.rw.Lock() - tem, ok := r.rooms[RoomID] - r.rw.Unlock() - if !ok { - return - } - - if len(token) == 0 { - tem.PushData(data) //全房间推送 - } else { - tem.PushDataToPointedUser(data, token...) // 推送给指定用户 - } -} diff --git a/rpc.go b/rpc.go index be26705..c6d733c 100644 --- a/rpc.go +++ b/rpc.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" im "github.com/mongofs/api/im/v1" + "github.com/mongofs/im/plugins/wti" "time" ) @@ -61,4 +62,43 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro },nil } +// 在开发过程中存在IM需要版本共存的需求,比如我的协议替换了,但是如果im应用在App上面如何进行切换,这就是协议定制不合理的地方,但也需要 +// IM 服务器在这个过程中做配合。 +// IM 存在给用户分组的需求,所以我们在进行Broadcast 就必须进行用户的状态区分,所以前台需要对内容进行分组,传入的内容也需要对应分组 +// 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在 +// 使用用户 + +// 进行广播 +func (s *ImSrever) WTIBroadcast(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { + var err error + start := time.Now() + err = wti.BroadCastByTarget(req.Data) + escape := time.Since(start) + s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","BroadcastByWTI",escape) + return &im.BroadcastReply{ + Size: int64(len(s.buffer)), + },err +} + + +// 获取每个版本多少人 +func (s *ImSrever) WTIDistribute(ctx context.Context, req *im.Empty) (*im.WTIDistributeReply,error){ + distribute,err := wti.Distribute() + if err != nil { + return nil,err + } + + var result = map[string]*im.WTIDistribute{} + for k,v := range distribute{ + data := &im.WTIDistribute{ + Tag: v.TagName, + Number: v.Onlines, + CreateTime: v.CreateTime, + } + result[k] =data + } + return &im.WTIDistributeReply{ + Data: result, + },nil +} \ No newline at end of file diff --git a/run.go b/run.go index 165b855..67bb101 100644 --- a/run.go +++ b/run.go @@ -1,47 +1,23 @@ package im -import ( - "net" - "net/http" - "golang.org/x/sync/errgroup" -) - var VERSION = "master" func (s *ImSrever)Run ()error { - wg := errgroup.Group{} - wg.Go(s.runhttpServer) - wg.Go(s.runGrpcServer) - wg.Go(s.monitor) - wg.Go(s.PushBroadCast) - return wg.Wait() -} - - -func (s *ImSrever)runGrpcServer ()error{ - listen, err := net.Listen("tcp", s.opt.ServerRpcPort) - if err !=nil { s.opt.ServerLogger.Fatal(err) } - s.opt.ServerLogger.Infof("im/run : start GRPC server at %s ", s.opt.ServerRpcPort) - if err := s.rpc.Serve(listen);err !=nil { - s.opt.ServerLogger.Fatal(err) + var prepareParallelFunc = [] func()error { + // 启用单独goroutine 进行监控 + s.monitorOnline, + s.monitorWTI, + // 启用单独goroutine 进行运行 + s.runGrpcServer, + s.runhttpServer, + s.PushBroadCast, } - - return nil -} - - -func (s *ImSrever)runhttpServer ()error{ - listen, err := net.Listen("tcp", s.opt.ServerHttpPort) - if err !=nil { s.opt.ServerLogger.Fatal(err) } - s.opt.ServerLogger.Infof("im/run : start HTTP server at %s ", s.opt.ServerHttpPort) - if err := http.Serve(listen,s.http);err !=nil { - s.opt.ServerLogger.Fatal(err) - } - return nil + return ParallelRun(prepareParallelFunc... ) } +// 服务关闭 func (s *ImSrever)Close()error{ s.rpc.GracefulStop() s.cancel() diff --git a/server.go b/server.go deleted file mode 100644 index a9c2ed6..0000000 --- a/server.go +++ /dev/null @@ -1,72 +0,0 @@ -package im - -import ( - im "github.com/mongofs/api/im/v1" - "github.com/mongofs/im/bucket" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "net/http" - "time" -) - -type ImSrever struct { - http *http.ServeMux - rpc *grpc.Server - bs []bucket.Bucketer - ps atomic.Int64 - - buffer chan *im.BroadcastReq - cancel func() - - opt *Option -} - - - -// 统计用户在线人数 -// 监控buffer 长度 并进行报警 -func (s *ImSrever)monitor ()error{ - for{ - n := int64(0) - for _,bck := range s.bs{ - bck.Flush() - n += bck.Onlines() - } - s.ps.Store(n) - time.Sleep(10 *time.Second) - } - return nil -} - -// 单独处理广播业务 -func (s *ImSrever)PushBroadCast()error{ - wg:= errgroup.Group{} - for i:= 0;i