diff --git a/go.mod b/go.mod index 513dbfe..2e813dd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gin-gonic/gin v1.7.7 // indirect github.com/golang/protobuf v1.5.2 github.com/gorilla/websocket v1.4.2 - github.com/mongofs/api v1.0.1 + github.com/mongofs/api v1.0.5 github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 github.com/sirupsen/logrus v1.8.1 github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index e578899..96e2cd1 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,14 @@ github.com/mongofs/api v1.0.0 h1:uMm8oWQPR0kHJu/NwewURZwwOpClHMG7ApHjAsWe2wQ= github.com/mongofs/api v1.0.0/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= github.com/mongofs/api v1.0.1 h1:r8vUeHTn7p3TqQQa8RDuL4lw+f0ZAnXuDGJ/hSwtIe0= github.com/mongofs/api v1.0.1/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.2 h1:7+TP0+gxhrHuM0HY+DmAfkR/Z59O4R8TuaTrqo3sCh4= +github.com/mongofs/api v1.0.2/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.3 h1:fRJXW+duFOHCwwxwElS1mg8YYEzaE05I9f9Q0N7ENuI= +github.com/mongofs/api v1.0.3/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.4 h1:r64CS/NOUOLelgftEYW0UuXw7FQibonBAQGYIe711Vo= +github.com/mongofs/api v1.0.4/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= +github.com/mongofs/api v1.0.5 h1:iqO+At5mxYjfuBR+97l4n5pxcefy5CwYtfTIJqFXOeI= +github.com/mongofs/api v1.0.5/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw= github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 h1:+LdeRyjL/EoKpNAIipVCABxXoiYm/urYpcKMq0auNOg= github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44/go.mod h1:Wv4aMo8tAI+jNC9+SpfA+CH5CmLl9yEXEXY/O/P+Pyg= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= diff --git a/option.go b/option.go index a525141..7ae330c 100644 --- a/option.go +++ b/option.go @@ -2,30 +2,35 @@ package im import ( "github.com/mongofs/im/client" - "github.com/mongofs/im/validate" "github.com/mongofs/im/log" + "github.com/mongofs/im/plugins/wti" + "github.com/mongofs/im/validate" "github.com/mongofs/im/validate/example" ) const ( + // 对客户端进行默认参数设置 DefaultClientHeartBeatInterval = 120 DefaultClientReaderBufferSize = 1024 DefaultClientWriteBufferSize = 1024 DefaultClientBufferSize = 8 DefaultClientMessageType = 1 DefaultClientProtocol = 1 - DefaultBucketSize = 1 << 8 // 256 + // 对分片进行基础设置 + DefaultBucketSize = 1 << 8 // 256 + + // 默认基础的server配置 DefaultServerBucketNumber = 1 << 6 // 64 DefaultServerRpcPort = ":8081" DefaultServerHttpPort = ":8080" - DefaultServerBuffer = 200 + // 设置对广播能力的参数支持 DefaultBroadCastHandler = 10 DefaultBroadCastBuffer = 200 - - PluginWTISupport = false // 是否支持plugin 的wti ,默认为关 + // plugins 的参数支持 + PluginWTISupport = false // 是否支持WTI 进行扩展 ) var DefaultValidate validate.Validater = &example.DefaultValidate{} @@ -33,6 +38,7 @@ var DefaultReceive client.Receiver = &client.Example{} var DefaultLogger log.Logger = &log.DefaultLog{} type Option struct { + // client ClientHeartBeatInterval int // 用户心跳间隔 ClientReaderBufferSize int // 用户连接读取buffer ClientWriteBufferSize int // 用户连接写入buffer @@ -40,7 +46,10 @@ type Option struct { ClientMessageType int // 用户发送的数据类型 ClientProtocol int // 压缩协议 + // bucket BucketSize int // bucket用户 + + // server ServerBucketNumber int // 所有 ServerRpcPort string ServerHttpPort string @@ -52,6 +61,7 @@ type Option struct { BroadCastBuffer int BroadCastHandler int + //plugins SupportPluginWTI bool // 是否支持wti插件 } @@ -74,6 +84,9 @@ func DefaultOption() *Option { BroadCastBuffer: DefaultBroadCastBuffer, BroadCastHandler: DefaultBroadCastHandler, + + // 插件支持 + SupportPluginWTI: PluginWTISupport, } } @@ -105,13 +118,12 @@ func WithServerValidate(ServerValidate validate.Validater) OptionFunc { } } -func WithServerLogger(ServerLogger log.Logger ) OptionFunc { +func WithServerLogger(ServerLogger log.Logger) OptionFunc { return func(b *Option) { b.ServerLogger = ServerLogger } } - func WithServerBucketNumber(ServerBucketNumber int) OptionFunc { return func(b *Option) { b.ServerBucketNumber = ServerBucketNumber @@ -177,3 +189,13 @@ func WithBroadCastHandler(BroadCastHandler int) OptionFunc { b.BroadCastHandler = BroadCastHandler } } + +//设置plugin内容 +func WithPluginsWTI(SupportPluginWTI bool) OptionFunc { + return func(b *Option) { + if SupportPluginWTI { + wti.SetSupport() + } + b.SupportPluginWTI = SupportPluginWTI + } +} diff --git a/parallel.go b/parallel.go index 649f835..220be9f 100644 --- a/parallel.go +++ b/parallel.go @@ -8,7 +8,7 @@ import ( "github.com/mongofs/im/plugins/wti" ) - +// 并行的启动,使用goroutine 来进行管理 func ParallelRun (parallels ... func()error)error{ wg := errgroup.Group{} for _,v:= range parallels { @@ -43,7 +43,6 @@ func (s *ImSrever) monitorWTI() error { } } return nil - } // 监控rpc 服务 diff --git a/plugins/wti/interface.go b/plugins/wti/interface.go index e26b89d..0c7ae4c 100644 --- a/plugins/wti/interface.go +++ b/plugins/wti/interface.go @@ -27,16 +27,26 @@ type WTI interface { // 获取到TAG 的创建时间,系统会判断这个tag创建时间和当前人数来确认是否需要删除这个tag GetTAGCreateTime(tag string) int64 - // 获取到TAG 的在线人数,系统会判断这个tag如果没有在线人数为0 且创建时间大于MAX-wti-create-time ,这个tag就会被回收 - GetTAGClients(tag string) int64 + // 获取到所有tag的用户分布 + Distribute(tags ...string)map[string]*DistributeParam // 调用方法的回收房间的策略 FlushWTI() } +type DistributeParam struct { + TagName string + Onlines int64 + CreateTime int64 +} + + + // 其他地方将调用这个变量,如果自己公司实现tag需要注入在程序中进行注入 -var factory WTI = newwti() -var isSupportWTI = atomic.NewBool(false) +var ( + factory WTI = newwti() + isSupportWTI = atomic.NewBool(false) +) func Inject(wti WTI) { factory = wti @@ -47,7 +57,7 @@ func SetSupport (){ } var ( - ERRNotSupportWTI = errors.New("im/plugins/wti: not set the wti support params") + ERRNotSupportWTI = errors.New("im/plugins/wti: you should call the SetSupport func") ) func SetTAG(cli *client.Cli, tag ...string) error { @@ -98,14 +108,16 @@ func GetTAGCreateTime(tag string) (int64, error) { return res, nil } -func GetTAGClients(tag string) (int64, error) { + +func Distribute() (map[string]*DistributeParam, error) { if isSupportWTI.Load() == false { - return 0, ERRNotSupportWTI + return nil, ERRNotSupportWTI } - res := factory.GetTAGClients(tag) + res := factory.Distribute() return res, nil } + func FlushWTI() error { if isSupportWTI.Load() == false { return ERRNotSupportWTI diff --git a/plugins/wti/wti.go b/plugins/wti/wti.go index 55cf617..5d85a9c 100644 --- a/plugins/wti/wti.go +++ b/plugins/wti/wti.go @@ -1,5 +1,3 @@ -// 构建tag 标签的这个功能,主要目标希望将用户分类起来,1000000 * 8 /1024 / 1024 = 7M的内存,额外使用tag包进行封装的话, -// 预计使用一百万用户的内存需要大概7~8M,这个范围是可以接受的。 package wti import ( @@ -23,7 +21,7 @@ func newwti() WTI { // 给用户设置标签 -func (t *tg) SetTAG(cli *client.Cli, tags ...string) { +func (t *tg) SetTAG(cli *client.Cli, tags ...string) { if len(tags)== 0 { return } @@ -99,15 +97,6 @@ func (t *tg) GetTAGCreateTime(tag string) int64{ return 0 } -// 获取到tag的总人数 -func (t *tg) GetTAGClients(tag string) int64 { - t.rw.RLock() - defer t.rw.RUnlock() - if v,ok:=t.mp[tag];ok{ - return v.Counter() - } - return 0 -} // 删除tag ,这个调用一个大锁将全局锁住清空过去的内容 func (t *tg) FlushWTI() { @@ -118,4 +107,37 @@ func (t *tg) FlushWTI() { delete(t.mp,k) } } -} \ No newline at end of file +} + + +// 获取到tagOnlines 在线用户人数 +func (t *tg) Distribute(tags... string) map[string]*DistributeParam { + var res = map[string]*DistributeParam{} + if len(tags) == 0 { + t.rw.RLock() + for k,v:= range t.mp { + tem := &DistributeParam{ + TagName: k, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[k]=tem + } + t.rw.RUnlock() + return res + } + t.rw.RLock() + for _,tag := range tags { + // get the tag + if v,ok := t.mp[tag];ok { + tem := &DistributeParam{ + TagName: tag, + Onlines: v.Counter(), + CreateTime: v.createTime, + } + res[tag]= tem + } + } + t.rw.RUnlock() + return res +} diff --git a/plugins/wti/wti_test.go b/plugins/wti/wti_test.go index b4b6e86..a4e3f49 100644 --- a/plugins/wti/wti_test.go +++ b/plugins/wti/wti_test.go @@ -2,9 +2,61 @@ package wti import ( "fmt" + "github.com/mongofs/im/client" + "net/http" "testing" ) +type MockClient struct {} +func NewClient()client.Clienter { + return &MockClient{} +} + +func (m MockClient) Send(bytes []byte, i ...int64) error { + fmt.Printf("Send Called : %v \n\r",string(bytes)) + return nil +} +func (m MockClient) Offline() { + panic("implement me") +} +func (m MockClient) ResetHeartBeatTime() { + panic("implement me") +} +func (m MockClient) LastHeartBeat() int64 { + panic("implement me") +} +func (m MockClient) Token() string { + panic("implement me") +} +func (m MockClient) Request() *http.Request { + panic("implement me") +} + + + + +func TestTg_SetTAG(t *testing.T) { + tests := []struct{ + tag string + number int + }{ + { + tag: "v1", + number: 100, + }, + { + tag: "v2", + number: 200, + }, + } + + for _,v := range tests{ + for i :=0 ;i< v.number;i++ { + // SetTAG(NewClient(),v.tag) ,todo 待优化 + } + } +} + // 广播,针对tag进行广播,这也是wti的核心接口,分类广播也是基于这个接口 func TestTg_BroadCast(t *testing.T) { tests := []struct { diff --git a/rpc.go b/rpc.go index 7d3b275..c6d733c 100644 --- a/rpc.go +++ b/rpc.go @@ -68,7 +68,9 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro // 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在 // 使用用户 -func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { + +// 进行广播 +func (s *ImSrever) WTIBroadcast(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) { var err error start := time.Now() err = wti.BroadCastByTarget(req.Data) @@ -79,3 +81,24 @@ func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq },err } + +// 获取每个版本多少人 +func (s *ImSrever) WTIDistribute(ctx context.Context, req *im.Empty) (*im.WTIDistributeReply,error){ + distribute,err := wti.Distribute() + if err != nil { + return nil,err + } + + var result = map[string]*im.WTIDistribute{} + for k,v := range distribute{ + data := &im.WTIDistribute{ + Tag: v.TagName, + Number: v.Onlines, + CreateTime: v.CreateTime, + } + result[k] =data + } + return &im.WTIDistributeReply{ + Data: result, + },nil +} \ No newline at end of file diff --git a/test/wti/conn_test.go b/test/wti/conn_test.go index 6d27586..932a31e 100644 --- a/test/wti/conn_test.go +++ b/test/wti/conn_test.go @@ -4,39 +4,64 @@ import ( "fmt" "github.com/gorilla/websocket" "testing" + "math/rand" "time" ) -// 模拟创建3个链接: 分别是订阅 -// conn1 : v1 -// conn2 : v2 -// conn3 : v1,v2 +var r = rand.New(rand.NewSource(time.Now().Unix())) -func Test_Conn(t *testing.T){ - go CreateClient("v1") - go CreateClient("v2") - go CreateClient("v3") - //CreateClient("v1&v2") - time.Sleep(1000 *time.Second) +func RandString(len int) string { + bytes := make([]byte, len) + for i := 0; i < len; i++ { + b := r.Intn(26) + 65 + bytes[i] = byte(b) + } + return string(bytes) } -func CreateClient (token string){ + + + +func Test_Conn(t *testing.T) { + tests := []struct{ + tag string + number int + }{ + { + tag: "v1", + number: 50, + }, + { + tag: "v2", + number: 60, + }, + } + + for _,v := range tests{ + for i :=0 ;i< v.number;i++ { + go CreateClient(v.tag) + } + } + time.Sleep(1000 * time.Second) +} + + +// http://www.baidu.com/conn?token=1080&version=v.10 +func CreateClient(version string) { + token := RandString(20) dialer := websocket.Dialer{} - conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=v1",token), nil) + conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&version=%s", token,version), nil) if nil != err { fmt.Println(err) return } defer conn.Close() - - counter :=0 - - go func() { + counter := 0 + /*go func() { time.Sleep(50*time.Second) conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token))) - }() - + }()*/ for { messageType, messageData, err := conn.ReadMessage() if nil != err { @@ -44,14 +69,14 @@ func CreateClient (token string){ break } switch messageType { - case websocket.TextMessage://文本数据 + case websocket.TextMessage: //文本数据 counter++ - fmt.Println(token,string(messageData),counter) - case websocket.BinaryMessage://二进制数据 - case websocket.CloseMessage://关闭 - case websocket.PingMessage://Ping - case websocket.PongMessage://Pong + fmt.Println(token, string(messageData), counter) + case websocket.BinaryMessage: //二进制数据 + case websocket.CloseMessage: //关闭 + case websocket.PingMessage: //Ping + case websocket.PongMessage: //Pong default: } } -} \ No newline at end of file +} diff --git a/test/wti/doc.go b/test/wti/doc.go new file mode 100644 index 0000000..801f12a --- /dev/null +++ b/test/wti/doc.go @@ -0,0 +1,4 @@ +// 关于wti的测试 ,首先使用wti是需要建立一个wti的实例,然后将实例注册到wti下面,后面就能进行wti进行用户标签分类了 +// 当然用户也可以选择我们提供的WTI的实例,具体实现可以参考wti的doc进行观察 +package wti + diff --git a/test/wti/im_test.go b/test/wti/im_test.go index a002fd9..ae8d12b 100644 --- a/test/wti/im_test.go +++ b/test/wti/im_test.go @@ -8,6 +8,8 @@ import ( "testing" ) +// http://www.baidu.com/conn?token=1080&version=v.10 + // 创建IM测试服务器,模拟测试im服务器的具体内容。 func Test_IMServer(t *testing.T) { wti.SetSupport() @@ -36,7 +38,7 @@ func (f *Fake)ValidateFailed(err error,cli client.Clienter){ func (f *Fake)ValidateSuccess(cli client.Clienter){ // 可以通过header 或者 get query 方式来传参,或者从数据库获取当前用户的tag req := cli.Request() - res := req.Form["ver"] + res := req.Form["version"] tags := []string{res[0]} // 调用wti if err := wti.SetTAG(cli.(*client.Cli),tags...);err != nil { @@ -50,6 +52,7 @@ func NewFakeImServer() *Fake { options := []im.OptionFunc{ im.WithBroadCastBuffer(10), im.WithServerValidate(res), + im.WithPluginsWTI(true), } // 设置Option option := im.NewOption(options...) diff --git a/test/wti/rpc_test.go b/test/wti/rpc_test.go index f554bb3..c974c5b 100644 --- a/test/wti/rpc_test.go +++ b/test/wti/rpc_test.go @@ -6,6 +6,7 @@ import ( im "github.com/mongofs/api/im/v1" "google.golang.org/grpc" "testing" + "time" ) @@ -14,10 +15,18 @@ const ( DefaultRpcAddress = "127.0.0.1:8081" ) +var ( + cli = Client() + ctx = context.Background() +) + +func Client ()im.BasicClient{ + return im.NewBasicClient(conn) +} + +var conn,_ = grpc.Dial(DefaultRpcAddress,grpc.WithInsecure()) func TestSendMessage(t *testing.T) { - cli := Client() - ctx := context.Background() tests:= []*im.BroadcastByWTIReq{ { Data: map[string][]byte{ @@ -28,7 +37,7 @@ func TestSendMessage(t *testing.T) { }, } for _,v := range tests { - fmt.Println(cli.BroadcastByWTI(ctx,v,)) + fmt.Println(cli.WTIBroadcast(ctx,v,)) } // output // nil @@ -37,9 +46,14 @@ func TestSendMessage(t *testing.T) { // nil } -var conn,_ = grpc.Dial(DefaultRpcAddress,grpc.WithInsecure()) - - -func Client ()im.BasicClient{ - return im.NewBasicClient(conn) +func TestDistribute(t *testing.T) { + for { + distribute,err :=cli.WTIDistribute(ctx,&im.Empty{}) + if err !=nil { + t.Fatal(err) + } + fmt.Printf("当前用户分布: %+v\n\r ",distribute) + time.Sleep(5*time.Second) + } } +