-
Notifications
You must be signed in to change notification settings - Fork 2
/
parallel.go
90 lines (80 loc) · 1.82 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package im
import (
"golang.org/x/sync/errgroup"
"net"
"net/http"
"time"
"github.com/mongofs/im/plugins/wti"
)
// 并行的启动,使用goroutine 来进行管理
func ParallelRun (parallels ... func()error)error{
wg := errgroup.Group{}
for _,v:= range parallels {
wg.Go(v)
}
return wg.Wait()
}
// 统计用户在线人数
// 监控buffer 长度 并进行报警
func (s *ImSrever) monitorOnline() error {
for {
n := int64(0)
for _, bck := range s.bs {
bck.Flush()
n += bck.Onlines()
}
s.ps.Store(n)
time.Sleep(10 * time.Second)
}
return nil
}
// 统计用户在线人数
// 监控buffer 长度 并进行报警
func (s *ImSrever) monitorWTI() error {
if s.opt.SupportPluginWTI {
for {
wti.FlushWTI()
time.Sleep(20*time.Second)
}
}
return nil
}
// 监控rpc 服务
func (s *ImSrever)runGrpcServer ()error{
listen, err := net.Listen("tcp", s.opt.ServerRpcPort)
if err !=nil { s.opt.ServerLogger.Fatal(err) }
s.opt.ServerLogger.Infof("im/run : start GRPC server at %s ", s.opt.ServerRpcPort)
if err := s.rpc.Serve(listen);err !=nil {
s.opt.ServerLogger.Fatal(err)
}
return nil
}
// 监控http服务
func (s *ImSrever)runhttpServer ()error{
listen, err := net.Listen("tcp", s.opt.ServerHttpPort)
if err !=nil { s.opt.ServerLogger.Fatal(err) }
s.opt.ServerLogger.Infof("im/run : start HTTP server at %s ", s.opt.ServerHttpPort)
if err := http.Serve(listen,s.http);err !=nil {
s.opt.ServerLogger.Fatal(err)
}
return nil
}
// 单独处理广播业务
func (s *ImSrever) PushBroadCast() error {
wg := errgroup.Group{}
for i := 0; i < s.opt.BroadCastHandler; i++ {
wg.Go(func() error {
for {
req := <-s.buffer
for _, v := range s.bs {
err := v.BroadCast(req.Data, false)
if err != nil {
s.opt.ServerLogger.Error(err)
}
}
}
return nil
})
}
return wg.Wait()
}