-
Notifications
You must be signed in to change notification settings - Fork 2
/
rpc.go
110 lines (93 loc) · 3.38 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package im
import (
"context"
"errors"
"fmt"
im "github.com/mongofs/api/im/v1"
"github.com/mongofs/im/plugins/wti"
"time"
)
func (s *ImSrever) Ping(ctx context.Context, empty *im.Empty) (*im.Empty, error) {
s.opt.ServerLogger.Infof(" im/rpc : called %v method ","Ping")
return nil,nil
}
func (s *ImSrever) Onliens(ctx context.Context, empty *im.Empty) (*im.OnlinesReply, error) {
s.opt.ServerLogger.Infof(" im/rpc : called %v method ","Onliens")
num := s.ps.Load()
req := & im.OnlinesReply{Number: num}
return req,nil
}
func (s *ImSrever) SendMessage(ctx context.Context, req *im.SendMessageReq) (*im.SendMessageReply, error) {
start := time.Now()
bs:= s.bucket(req.Token)
err := bs.Send(req.Data,req.Token,false)
if err != nil {
// todo 记录日志
}
escape := time.Since(start)
s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","SendMessage",escape)
return &im.SendMessageReply{},err
}
// 相同消息发送给多个用户
func (s *ImSrever) SendMessageToMultiple(ctx context.Context, req *im.SendMessageToMultipleReq) (*im.SendMessageReply, error) {
start := time.Now()
var err error
for _,token := range req.Token{
bs:= s.bucket(token)
err = bs.Send(req.Data,token,false)
if err !=nil {
// todo 记录日志
}
}
escape := time.Since(start)
s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","SendMessageToMultiple",escape)
return &im.SendMessageReply{},err
}
// 广播消息给用户
func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.BroadcastReply, error) {
if len(s.buffer) *10 > 8* cap(s.buffer){
return nil,errors.New(fmt.Sprintf("im/rpc: too much message ,buffer length is %v but cap is %v",len(s.buffer),cap(s.buffer)))
}
start := time.Now()
s.buffer <- req
escape := time.Since(start)
s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","Broadcast",escape)
return &im.BroadcastReply{
Size: int64(len(s.buffer)),
},nil
}
// 在开发过程中存在IM需要版本共存的需求,比如我的协议替换了,但是如果im应用在App上面如何进行切换,这就是协议定制不合理的地方,但也需要
// IM 服务器在这个过程中做配合。
// IM 存在给用户分组的需求,所以我们在进行Broadcast 就必须进行用户的状态区分,所以前台需要对内容进行分组,传入的内容也需要对应分组
// 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在
// 使用用户
// 进行广播
func (s *ImSrever) WTIBroadcast(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) {
var err error
start := time.Now()
err = wti.BroadCastByTarget(req.Data)
escape := time.Since(start)
s.opt.ServerLogger.Infof(" im/rpc : called %v method cost time %v ","BroadcastByWTI",escape)
return &im.BroadcastReply{
Size: int64(len(s.buffer)),
},err
}
// 获取每个版本多少人
func (s *ImSrever) WTIDistribute(ctx context.Context, req *im.Empty) (*im.WTIDistributeReply,error){
distribute,err := wti.Distribute()
if err != nil {
return nil,err
}
var result = map[string]*im.WTIDistribute{}
for k,v := range distribute{
data := &im.WTIDistribute{
Tag: v.TagName,
Number: v.Onlines,
CreateTime: v.CreateTime,
}
result[k] =data
}
return &im.WTIDistributeReply{
Data: result,
},nil
}