Skip to content

Commit

Permalink
Merge pull request #1 from mongofs/feature/groupby
Browse files Browse the repository at this point in the history
Feature/groupby
  • Loading branch information
mongofs committed Feb 21, 2022
2 parents 1fffebe + d6de490 commit f65c413
Show file tree
Hide file tree
Showing 28 changed files with 982 additions and 332 deletions.
13 changes: 10 additions & 3 deletions benchmark/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ import (

func CreateClient (token string){
dialer := websocket.Dialer{}
conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s",token), nil)
conn, _, err := dialer.Dial(fmt.Sprintf(Address+"?token=%s&ver=%s",token,getVersion()), nil)
if nil != err {
log.Println(err)
return
}
defer conn.Close()

counter :=0

go func() {
time.Sleep(10*time.Second)
conn.WriteMessage(websocket.TextMessage,[]byte(fmt.Sprintf(" heartbeat %s",token)))
Expand Down Expand Up @@ -49,6 +47,15 @@ func CreateClient (token string){
}
}

// 获取版本
func getVersion()string{
if time.Now().Unix() %2 == 0 {
return "v1"
}else{
return "v2"
}
}


// 用户断线重连
func CreateClientAndTickerReConn (token string){
Expand Down
20 changes: 20 additions & 0 deletions benchmark/rpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func BenchmarkBroadCast(t *testing.B) {


// BenchmarkBroadCast_paraml-6 2751 391783 ns/op
// BenchmarkBroadCast_paraml-6 3105 494931 ns/op
func BenchmarkBroadCast_paraml(t *testing.B) {
cli := Client()
ctx := context.Background()
Expand All @@ -70,3 +71,22 @@ func BenchmarkBroadCast_paraml(t *testing.B) {
cli.Broadcast(ctx,data)
}
}


// 压测分类广播,大致情况如下
// BenchmarkBroadCastByWTI-6 2905 411497 ns/op
// BenchmarkBroadCastByWTI-6 3099 421722 ns/op
// BenchmarkBroadCastByWTI-6 2709 385287 ns/op
func BenchmarkBroadCastByWTI(t *testing.B){
cli := Client()
ctx := context.Background()
tests:= &im.BroadcastByWTIReq{Data: map[string][]byte{
"v1":[]byte("fucker you v2"),
"v2":[]byte("lover you v1"),
"v3":[]byte("what are you v1"),
}}

for i:= 0;i<t.N ;i++ {
cli.BroadcastByWTI(ctx,tests,)
}
}
6 changes: 6 additions & 0 deletions benchmark/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,9 @@ func TestTickerBroadCast(t *testing.T){
}



func TestBroadCastByWTI(t *testing.T){

}


2 changes: 1 addition & 1 deletion bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type bucket struct {
// log
log log.Logger


opts * Option
}

Expand Down Expand Up @@ -138,6 +137,7 @@ func (h *bucket) OffLine(token string) {
}
}

// 将用户注册到bucket中
func (h *bucket) Register(cli client.Clienter,token string) error {
if cli == nil {
return ErrCliISNil
Expand Down
4 changes: 4 additions & 0 deletions bucket/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package bucket



9 changes: 8 additions & 1 deletion bucket/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bucket

import (
"github.com/mongofs/im/client"
"github.com/mongofs/im/plugins/wti"
"go.uber.org/atomic"
"time"
)
Expand Down Expand Up @@ -88,15 +89,21 @@ func (b *bucket)keepAlive (){




// 删除用户
func (h *bucket)delUser(token string) {
h.rw.Lock()
delete(h.clis, token)
h.rw.Unlock()
h.np.Add(-1)
// todo 这里需要用个观察者模式
wti.Update(token)
if h.opts.callback != nil {
h.opts.callback()
}
}


// 通知到wti



6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/mongofs/im/log"
"net/http"
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
im "github.com/mongofs/api/im/v1"
"github.com/mongofs/im/log"
)


Expand All @@ -28,8 +28,8 @@ const (

type Cli struct {
lastHeartBeatT int64
conn *websocket.Conn
reader *http.Request
conn *websocket.Conn
reader *http.Request
token string
closeFunc sync.Once
done chan struct{}
Expand Down
6 changes: 6 additions & 0 deletions client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ package client
import "net/http"

type Clienter interface {

// 调用此方法可以给当前用户发送消息
Send([]byte, ...int64) error

// 用户下线
Offline()

// 重置用户的心跳
ResetHeartBeatTime()

// 获取用户的最后一次心跳
LastHeartBeat() int64

// 获取用户的token
Token() string

// 获取到用户的请求的链接
Request()*http.Request
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/golang/protobuf v1.5.2
github.com/gorilla/websocket v1.4.2
github.com/mongofs/api v0.0.0-20211224030416-43aee6abd907
github.com/mongofs/api v1.0.5
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44
github.com/sirupsen/logrus v1.8.1
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ github.com/mongofs/api v0.0.0-20211213065031-e14e8c59ae69 h1:OopIMdLpGdo5Xss3P40
github.com/mongofs/api v0.0.0-20211213065031-e14e8c59ae69/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v0.0.0-20211224030416-43aee6abd907 h1:wEE3/u3rwpnuviTFz50KeB0hX/xXJpAr/sPTiMFIshY=
github.com/mongofs/api v0.0.0-20211224030416-43aee6abd907/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.0 h1:uMm8oWQPR0kHJu/NwewURZwwOpClHMG7ApHjAsWe2wQ=
github.com/mongofs/api v1.0.0/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.1 h1:r8vUeHTn7p3TqQQa8RDuL4lw+f0ZAnXuDGJ/hSwtIe0=
github.com/mongofs/api v1.0.1/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.2 h1:7+TP0+gxhrHuM0HY+DmAfkR/Z59O4R8TuaTrqo3sCh4=
github.com/mongofs/api v1.0.2/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.3 h1:fRJXW+duFOHCwwxwElS1mg8YYEzaE05I9f9Q0N7ENuI=
github.com/mongofs/api v1.0.3/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.4 h1:r64CS/NOUOLelgftEYW0UuXw7FQibonBAQGYIe711Vo=
github.com/mongofs/api v1.0.4/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/api v1.0.5 h1:iqO+At5mxYjfuBR+97l4n5pxcefy5CwYtfTIJqFXOeI=
github.com/mongofs/api v1.0.5/go.mod h1:9QnZSLGrc6N9FNaGR0wrZ2+MdAsWPFhH9mnFcKxrsEw=
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44 h1:+LdeRyjL/EoKpNAIipVCABxXoiYm/urYpcKMq0auNOg=
github.com/mongofs/log v0.0.0-20211119020135-e51080dd6d44/go.mod h1:Wv4aMo8tAI+jNC9+SpfA+CH5CmLl9yEXEXY/O/P+Pyg=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
18 changes: 18 additions & 0 deletions im.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package im

import (
"context"
im "github.com/mongofs/api/im/v1"
grpc2 "github.com/mongofs/api/im/v1"
"github.com/mongofs/im/bucket"
"go.uber.org/atomic"
Expand All @@ -10,6 +11,19 @@ import (
)


type ImSrever struct {
http *http.ServeMux
rpc *grpc.Server
bs []bucket.Bucketer
ps atomic.Int64

buffer chan *im.BroadcastReq // 全局广播队列
cancel func()

opt *Option
}


func New(opts *Option) *ImSrever {
b := &ImSrever{
ps: atomic.Int64{},
Expand Down Expand Up @@ -54,3 +68,7 @@ func (b *ImSrever) prepareHttpServer() {
}


func (s *ImSrever) bucket(token string) bucket.Bucketer {
idx := Index(token,uint32(s.opt.ServerBucketNumber))
return s.bs[idx]
}
37 changes: 32 additions & 5 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,54 @@ package im

import (
"github.com/mongofs/im/client"
"github.com/mongofs/im/validate"
"github.com/mongofs/im/log"
"github.com/mongofs/im/plugins/wti"
"github.com/mongofs/im/validate"
"github.com/mongofs/im/validate/example"
)

const (
// 对客户端进行默认参数设置
DefaultClientHeartBeatInterval = 120
DefaultClientReaderBufferSize = 1024
DefaultClientWriteBufferSize = 1024
DefaultClientBufferSize = 8
DefaultClientMessageType = 1
DefaultClientProtocol = 1
DefaultBucketSize = 1 << 8 // 256

// 对分片进行基础设置
DefaultBucketSize = 1 << 8 // 256

// 默认基础的server配置
DefaultServerBucketNumber = 1 << 6 // 64
DefaultServerRpcPort = ":8081"
DefaultServerHttpPort = ":8080"
DefaultServerBuffer = 200

// 设置对广播能力的参数支持
DefaultBroadCastHandler = 10
DefaultBroadCastBuffer = 200

// plugins 的参数支持
PluginWTISupport = false // 是否支持WTI 进行扩展
)

var DefaultValidate validate.Validater = &example.DefaultValidate{}
var DefaultReceive client.Receiver = &client.Example{}
var DefaultLogger log.Logger = &log.DefaultLog{}

type Option struct {
// client
ClientHeartBeatInterval int // 用户心跳间隔
ClientReaderBufferSize int // 用户连接读取buffer
ClientWriteBufferSize int // 用户连接写入buffer
ClientBufferSize int // 用户应用层buffer
ClientMessageType int // 用户发送的数据类型
ClientProtocol int // 压缩协议

// bucket
BucketSize int // bucket用户

// server
ServerBucketNumber int // 所有
ServerRpcPort string
ServerHttpPort string
Expand All @@ -48,6 +60,9 @@ type Option struct {
//broadcast
BroadCastBuffer int
BroadCastHandler int

//plugins
SupportPluginWTI bool // 是否支持wti插件
}

func DefaultOption() *Option {
Expand All @@ -69,6 +84,9 @@ func DefaultOption() *Option {

BroadCastBuffer: DefaultBroadCastBuffer,
BroadCastHandler: DefaultBroadCastHandler,

// 插件支持
SupportPluginWTI: PluginWTISupport,
}
}

Expand Down Expand Up @@ -100,13 +118,12 @@ func WithServerValidate(ServerValidate validate.Validater) OptionFunc {
}
}

func WithServerLogger(ServerLogger log.Logger ) OptionFunc {
func WithServerLogger(ServerLogger log.Logger) OptionFunc {
return func(b *Option) {
b.ServerLogger = ServerLogger
}
}


func WithServerBucketNumber(ServerBucketNumber int) OptionFunc {
return func(b *Option) {
b.ServerBucketNumber = ServerBucketNumber
Expand Down Expand Up @@ -172,3 +189,13 @@ func WithBroadCastHandler(BroadCastHandler int) OptionFunc {
b.BroadCastHandler = BroadCastHandler
}
}

//设置plugin内容
func WithPluginsWTI(SupportPluginWTI bool) OptionFunc {
return func(b *Option) {
if SupportPluginWTI {
wti.SetSupport()
}
b.SupportPluginWTI = SupportPluginWTI
}
}
Loading

0 comments on commit f65c413

Please sign in to comment.