Skip to content

Commit

Permalink
feat(wti)
Browse files Browse the repository at this point in the history
  • Loading branch information
steven committed Feb 19, 2022
1 parent 88e8e69 commit d6de490
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 66 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/golang/protobuf v1.5.2
github.com/gorilla/websocket v1.4.2
github.com/mongofs/api v1.0.1
github.com/mongofs/api v1.0.5
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44
github.com/sirupsen/logrus v1.8.1
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ github.com/mongofs/api v1.0.0 h1:uMm8oWQPR0kHJu/NwewURZwwOpClHMG7ApHjAsWe2wQ=
github.com/mongofs/api v1.0.0/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.1 h1:r8vUeHTn7p3TqQQa8RDuL4lw+f0ZAnXuDGJ/hSwtIe0=
github.com/mongofs/api v1.0.1/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.2 h1:7+TP0+gxhrHuM0HY+DmAfkR/Z59O4R8TuaTrqo3sCh4=
github.com/mongofs/api v1.0.2/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.3 h1:fRJXW+duFOHCwwxwElS1mg8YYEzaE05I9f9Q0N7ENuI=
github.com/mongofs/api v1.0.3/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.4 h1:r64CS/NOUOLelgftEYW0UuXw7FQibonBAQGYIe711Vo=
github.com/mongofs/api v1.0.4/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.5 h1:iqO+At5mxYjfuBR+97l4n5pxcefy5CwYtfTIJqFXOeI=
github.com/mongofs/api v1.0.5/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 h1:+LdeRyjL/EoKpNAIipVCABxXoiYm/urYpcKMq0auNOg=
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44/go.mod h1:Wv4aMo8tAI+jNC9+SpfA+CH5CmLl9yEXEXY/O/P+Pyg=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
36 changes: 29 additions & 7 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,54 @@ package im

import (
"github.com/mongofs/im/client"
"github.com/mongofs/im/validate"
"github.com/mongofs/im/log"
"github.com/mongofs/im/plugins/wti"
"github.com/mongofs/im/validate"
"github.com/mongofs/im/validate/example"
)

const (
// 对客户端进行默认参数设置
DefaultClientHeartBeatInterval = 120
DefaultClientReaderBufferSize = 1024
DefaultClientWriteBufferSize = 1024
DefaultClientBufferSize = 8
DefaultClientMessageType = 1
DefaultClientProtocol = 1
DefaultBucketSize = 1 << 8 // 256

// 对分片进行基础设置
DefaultBucketSize = 1 << 8 // 256

// 默认基础的server配置
DefaultServerBucketNumber = 1 << 6 // 64
DefaultServerRpcPort = ":8081"
DefaultServerHttpPort = ":8080"
DefaultServerBuffer = 200

// 设置对广播能力的参数支持
DefaultBroadCastHandler = 10
DefaultBroadCastBuffer = 200


PluginWTISupport = false // 是否支持plugin 的wti ,默认为关
// plugins 的参数支持
PluginWTISupport = false // 是否支持WTI 进行扩展
)

var DefaultValidate validate.Validater = &example.DefaultValidate{}
var DefaultReceive client.Receiver = &client.Example{}
var DefaultLogger log.Logger = &log.DefaultLog{}

type Option struct {
// client
ClientHeartBeatInterval int // 用户心跳间隔
ClientReaderBufferSize int // 用户连接读取buffer
ClientWriteBufferSize int // 用户连接写入buffer
ClientBufferSize int // 用户应用层buffer
ClientMessageType int // 用户发送的数据类型
ClientProtocol int // 压缩协议

// bucket
BucketSize int // bucket用户

// server
ServerBucketNumber int // 所有
ServerRpcPort string
ServerHttpPort string
Expand All @@ -52,6 +61,7 @@ type Option struct {
BroadCastBuffer int
BroadCastHandler int

//plugins
SupportPluginWTI bool // 是否支持wti插件
}

Expand All @@ -74,6 +84,9 @@ func DefaultOption() *Option {

BroadCastBuffer: DefaultBroadCastBuffer,
BroadCastHandler: DefaultBroadCastHandler,

// 插件支持
SupportPluginWTI: PluginWTISupport,
}
}

Expand Down Expand Up @@ -105,13 +118,12 @@ func WithServerValidate(ServerValidate validate.Validater) OptionFunc {
}
}

func WithServerLogger(ServerLogger log.Logger ) OptionFunc {
func WithServerLogger(ServerLogger log.Logger) OptionFunc {
return func(b *Option) {
b.ServerLogger = ServerLogger
}
}


func WithServerBucketNumber(ServerBucketNumber int) OptionFunc {
return func(b *Option) {
b.ServerBucketNumber = ServerBucketNumber
Expand Down Expand Up @@ -177,3 +189,13 @@ func WithBroadCastHandler(BroadCastHandler int) OptionFunc {
b.BroadCastHandler = BroadCastHandler
}
}

//设置plugin内容
func WithPluginsWTI(SupportPluginWTI bool) OptionFunc {
return func(b *Option) {
if SupportPluginWTI {
wti.SetSupport()
}
b.SupportPluginWTI = SupportPluginWTI
}
}
3 changes: 1 addition & 2 deletions parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/mongofs/im/plugins/wti"
)


// 并行的启动,使用goroutine 来进行管理
func ParallelRun (parallels ... func()error)error{
wg := errgroup.Group{}
for _,v:= range parallels {
Expand Down Expand Up @@ -43,7 +43,6 @@ func (s *ImSrever) monitorWTI() error {
}
}
return nil

}

// 监控rpc 服务
Expand Down
28 changes: 20 additions & 8 deletions plugins/wti/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,26 @@ type WTI interface {
// 获取到TAG 的创建时间,系统会判断这个tag创建时间和当前人数来确认是否需要删除这个tag
GetTAGCreateTime(tag string) int64

// 获取到TAG 的在线人数,系统会判断这个tag如果没有在线人数为0 且创建时间大于MAX-wti-create-time ,这个tag就会被回收
GetTAGClients(tag string) int64
// 获取到所有tag的用户分布
Distribute(tags ...string)map[string]*DistributeParam

// 调用方法的回收房间的策略
FlushWTI()
}

type DistributeParam struct {
TagName string
Onlines int64
CreateTime int64
}



// 其他地方将调用这个变量,如果自己公司实现tag需要注入在程序中进行注入
var factory WTI = newwti()
var isSupportWTI = atomic.NewBool(false)
var (
factory WTI = newwti()
isSupportWTI = atomic.NewBool(false)
)

func Inject(wti WTI) {
factory = wti
Expand All @@ -47,7 +57,7 @@ func SetSupport (){
}

var (
ERRNotSupportWTI = errors.New("im/plugins/wti: not set the wti support params")
ERRNotSupportWTI = errors.New("im/plugins/wti: you should call the SetSupport func")
)

func SetTAG(cli *client.Cli, tag ...string) error {
Expand Down Expand Up @@ -98,14 +108,16 @@ func GetTAGCreateTime(tag string) (int64, error) {
return res, nil
}

func GetTAGClients(tag string) (int64, error) {

func Distribute() (map[string]*DistributeParam, error) {
if isSupportWTI.Load() == false {
return 0, ERRNotSupportWTI
return nil, ERRNotSupportWTI
}
res := factory.GetTAGClients(tag)
res := factory.Distribute()
return res, nil
}


func FlushWTI() error {
if isSupportWTI.Load() == false {
return ERRNotSupportWTI
Expand Down
48 changes: 35 additions & 13 deletions plugins/wti/wti.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// 构建tag 标签的这个功能,主要目标希望将用户分类起来,1000000 * 8 /1024 / 1024 = 7M的内存,额外使用tag包进行封装的话,
// 预计使用一百万用户的内存需要大概7~8M,这个范围是可以接受的。
package wti

import (
Expand All @@ -23,7 +21,7 @@ func newwti() WTI {


// 给用户设置标签
func (t *tg) SetTAG(cli *client.Cli, tags ...string) {
func (t *tg) SetTAG(cli *client.Cli, tags ...string) {
if len(tags)== 0 {
return
}
Expand Down Expand Up @@ -99,15 +97,6 @@ func (t *tg) GetTAGCreateTime(tag string) int64{
return 0
}

// 获取到tag的总人数
func (t *tg) GetTAGClients(tag string) int64 {
t.rw.RLock()
defer t.rw.RUnlock()
if v,ok:=t.mp[tag];ok{
return v.Counter()
}
return 0
}

// 删除tag ,这个调用一个大锁将全局锁住清空过去的内容
func (t *tg) FlushWTI() {
Expand All @@ -118,4 +107,37 @@ func (t *tg) FlushWTI() {
delete(t.mp,k)
}
}
}
}


// 获取到tagOnlines 在线用户人数
func (t *tg) Distribute(tags... string) map[string]*DistributeParam {
var res = map[string]*DistributeParam{}
if len(tags) == 0 {
t.rw.RLock()
for k,v:= range t.mp {
tem := &DistributeParam{
TagName: k,
Onlines: v.Counter(),
CreateTime: v.createTime,
}
res[k]=tem
}
t.rw.RUnlock()
return res
}
t.rw.RLock()
for _,tag := range tags {
// get the tag
if v,ok := t.mp[tag];ok {
tem := &DistributeParam{
TagName: tag,
Onlines: v.Counter(),
CreateTime: v.createTime,
}
res[tag]= tem
}
}
t.rw.RUnlock()
return res
}
52 changes: 52 additions & 0 deletions plugins/wti/wti_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,61 @@ package wti

import (
"fmt"
"github.com/mongofs/im/client"
"net/http"
"testing"
)

type MockClient struct {}
func NewClient()client.Clienter {
return &MockClient{}
}

func (m MockClient) Send(bytes []byte, i ...int64) error {
fmt.Printf("Send Called : %v \n\r",string(bytes))
return nil
}
func (m MockClient) Offline() {
panic("implement me")
}
func (m MockClient) ResetHeartBeatTime() {
panic("implement me")
}
func (m MockClient) LastHeartBeat() int64 {
panic("implement me")
}
func (m MockClient) Token() string {
panic("implement me")
}
func (m MockClient) Request() *http.Request {
panic("implement me")
}




func TestTg_SetTAG(t *testing.T) {
tests := []struct{
tag string
number int
}{
{
tag: "v1",
number: 100,
},
{
tag: "v2",
number: 200,
},
}

for _,v := range tests{
for i :=0 ;i< v.number;i++ {
// SetTAG(NewClient(),v.tag) ,todo 待优化
}
}
}

// 广播,针对tag进行广播,这也是wti的核心接口,分类广播也是基于这个接口
func TestTg_BroadCast(t *testing.T) {
tests := []struct {
Expand Down
25 changes: 24 additions & 1 deletion rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func (s *ImSrever) Broadcast(ctx context.Context, req *im.BroadcastReq) (*im.Bro
// 比如 v1 => string ,v2 => []byte,那么v1,v2 就是不相同的两个版本内容。在client上面可以设置用户的连接版本Version,建议在
// 使用用户

func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) {

// 进行广播
func (s *ImSrever) WTIBroadcast(ctx context.Context, req *im.BroadcastByWTIReq) (*im.BroadcastReply, error) {
var err error
start := time.Now()
err = wti.BroadCastByTarget(req.Data)
Expand All @@ -79,3 +81,24 @@ func (s *ImSrever) BroadcastByWTI(ctx context.Context, req *im.BroadcastByWTIReq
},err
}


// 获取每个版本多少人
func (s *ImSrever) WTIDistribute(ctx context.Context, req *im.Empty) (*im.WTIDistributeReply,error){
distribute,err := wti.Distribute()
if err != nil {
return nil,err
}

var result = map[string]*im.WTIDistribute{}
for k,v := range distribute{
data := &im.WTIDistribute{
Tag: v.TagName,
Number: v.Onlines,
CreateTime: v.CreateTime,
}
result[k] =data
}
return &im.WTIDistributeReply{
Data: result,
},nil
}
Loading

0 comments on commit d6de490

Please sign in to comment.