Skip to content

Commit

Permalink
controller: optimize code structure && add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
吴迎松 committed Jun 8, 2018
1 parent c5de53b commit b68c01e
Show file tree
Hide file tree
Showing 15 changed files with 513 additions and 326 deletions.
48 changes: 23 additions & 25 deletions common/types/alarm.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,45 @@
package types

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)

type AlarmMessageType byte
"github.com/wuyingsong/tcp"
)

//报警服务消息类型定义
const (
ALAR_MESS_INSPECTOR_HEARTBEAT AlarmMessageType = iota + 1
ALAR_MESS_INSPECTOR_HEARTBEAT tcp.PacketType = iota + 1
ALAR_MESS_INSPECTOR_TASK_REQUEST
ALAR_MESS_INSPECTOR_TASKS
ALAR_MESS_INSPECTOR_RESULTS
ALAR_MESS_INSPECTOR_RESULT
)

//报警服务消息类型可读映射
var AlarmMessageTypeText map[AlarmMessageType]string = map[AlarmMessageType]string{
var AlarmMessageTypeText map[tcp.PacketType]string = map[tcp.PacketType]string{
ALAR_MESS_INSPECTOR_HEARTBEAT: "inspector heartbeat",
ALAR_MESS_INSPECTOR_TASK_REQUEST: "inspector task request",
ALAR_MESS_INSPECTOR_TASKS: "inspector tasks",
ALAR_MESS_INSPECTOR_RESULTS: "inspector results",
ALAR_MESS_INSPECTOR_RESULT: "inspector result",
}

//报警服务消息接口
type AlarmMessage interface {
Encode() []byte
}

func AlarmPack(t AlarmMessageType, m AlarmMessage) []byte {
var buf bytes.Buffer
binary.Write(&buf, binary.BigEndian, t)
if m != nil {
binary.Write(&buf, binary.BigEndian, m.Encode())
}
binary.Write(&buf, binary.BigEndian, make([]byte, 0))
return buf.Bytes()
}
// func AlarmPack(t MessageType, m AlarmMessage) []byte {
// var buf bytes.Buffer
// binary.Write(&buf, binary.BigEndian, t)
// if m != nil {
// binary.Write(&buf, binary.BigEndian, m.Encode())
// }
// binary.Write(&buf, binary.BigEndian, make([]byte, 0))
// return buf.Bytes()
// }

type AlarmTask struct {
ID string
Expand Down Expand Up @@ -122,23 +120,23 @@ type AlarmResults struct {
}

type StrategyResult struct {
TaskID string
Priority int
TriggerResultSets map[string]*TriggerResultSet
ErrorMessage string
Triggered bool
CreateTime time.Time
TaskID string `json:"taskid"`
Priority int `json:"priority"`
TriggerResultSets map[string]*TriggerResultSet `json:"trigger_resultset"`
ErrorMessage string `json:"error_message"`
Triggered bool `json:"triggered"`
CreateTime time.Time `json:"create_time"`
}

func (this *AlarmResults) Encode() []byte {
func (this *StrategyResult) Encode() []byte {
data, err := json.Marshal(this)
if err != nil {
fmt.Println(err.Error())
}
return data
}

func (this *AlarmResults) Decode(data []byte) error {
func (this *StrategyResult) Decode(data []byte) error {
return json.Unmarshal(data, &this)
}

Expand Down
5 changes: 4 additions & 1 deletion common/types/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type TimeSeriesData struct {
DataType string `json:"data_type"` //COUNTER,GAUGE,DERIVE
Value float64 `json:"value"` //99.00
Timestamp int64 `json:"timestamp"` //unix timestamp
Cycle int `json:"cycle"`
Cycle int `json:"cycle,omitempty"`
Tags map[string]string `json:"tags"` //{"product":"app01", "group":"dev02"}
}

Expand Down Expand Up @@ -99,6 +99,9 @@ func ParseTags(name string) map[string]string {
kv := strings.Split(name, ",")
for _, v := range kv {
tmp := strings.Split(v, "=")
if len(tmp) != 2 {
continue
}
res[tmp[0]] = tmp[1]
}
return res
Expand Down
1 change: 1 addition & 0 deletions conf/controller.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ max_packet_size=409600
load_strategies_interval=300
task_pool_size=409600
result_pool_size=409600
event_pool_size = 1024
task_size=100
worker_count=20

Expand Down
76 changes: 76 additions & 0 deletions controller/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"owl/common/types"

"github.com/wuyingsong/tcp"
)

type callback struct {
}

func (cb *callback) OnConnected(conn *tcp.TCPConn) {
lg.Info("callback:%s connected", conn.GetRemoteAddr().String())
}

//链接断开回调
func (cb *callback) OnDisconnected(conn *tcp.TCPConn) {
lg.Info("callback:%s disconnect ", conn.GetRemoteAddr().String())
}

//错误回调
func (cb *callback) OnError(err error) {
lg.Error("callback: %s", err)
}

func (cb *callback) OnMessage(conn *tcp.TCPConn, p tcp.Packet) {
defer func() {
if r := recover(); r != nil {
lg.Error("Recovered in OnMessage", r)
}
}()
pkt := p.(*tcp.DefaultPacket)
switch pkt.Type {
case types.ALAR_MESS_INSPECTOR_HEARTBEAT:
lg.Debug("receive %v %v", types.AlarmMessageTypeText[pkt.Type], string(pkt.Body))
heartbeat := &types.HeartBeat{}
if err := heartbeat.Decode(pkt.Body); err != nil {
lg.Error(err.Error())
return
}
controller.receiveHearbeat(heartbeat)
//TODO: optimized task allocate algorithm
case types.ALAR_MESS_INSPECTOR_TASK_REQUEST:
lg.Debug("receive get task request inspector %s", conn.GetRemoteAddr())
tasks := &types.AlarmTasks{
Tasks: controller.taskPool.getTasks(GlobalConfig.TASK_SIZE),
}
if len(tasks.Tasks) == 0 {
lg.Debug("task pool has no tasks can be sent to inspector %s", conn.GetRemoteAddr())
return
}
lg.Info("sent %d task to inspector %s", len(tasks.Tasks), conn.GetRemoteAddr())
conn.AsyncWritePacket(
tcp.NewDefaultPacket(
types.ALAR_MESS_INSPECTOR_TASKS,
tasks.Encode(),
),
)
// sess.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_TASKS, tasks))
case types.ALAR_MESS_INSPECTOR_RESULT:
lg.Info("receive %v %v", types.AlarmMessageTypeText[pkt.Type], string(pkt.Body))
var (
result types.StrategyResult
err error
)
if err = result.Decode(pkt.Body); err != nil {
lg.Error(err.Error())
return
}
if err = controller.resultPool.putResult(&result); err != nil {
lg.Error("put task result to pool failed error:%s result:%s", err, result.Encode())
}
default:
lg.Error("unknown option: %v", pkt.Type)
}
}
4 changes: 4 additions & 0 deletions controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
DEFAULT_LOAD_STRATEGIES_INTERVAL = 300 //seconds
DEFAULT_TASK_POOL_SIZE = 409600
DEFAULT_RESULT_POOL_SIZE = 409600
DEFAULT_EVENT_POOL_SIZE = 1024
DEFAULT_HTTP_SERVER = ":10051"
DEFAULT_TASK_SIZE = 100
DEFAULT_WORKER_COUNT = 20
Expand Down Expand Up @@ -53,6 +54,8 @@ type Config struct {

RESULT_POOL_SIZE int //结果池的缓冲大小

EVENT_POOL_SIZE int

HTTP_SERVER string //Http服务的地址

TASK_SIZE int //单次获取任务数
Expand Down Expand Up @@ -87,6 +90,7 @@ func InitGlobalConfig() error {
LOAD_STRATEGIES_INTERVAL: cfg.MustInt(goconfig.DEFAULT_SECTION, "load_strategies_interval", DEFAULT_LOAD_STRATEGIES_INTERVAL),
TASK_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "task_pool_size", DEFAULT_TASK_POOL_SIZE),
RESULT_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "result_pool_size", DEFAULT_RESULT_POOL_SIZE),
EVENT_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "event_pool_size", DEFAULT_EVENT_POOL_SIZE),
HTTP_SERVER: cfg.MustValue(goconfig.DEFAULT_SECTION, "http_server", DEFAULT_HTTP_SERVER),
TASK_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "task_size", DEFAULT_TASK_SIZE),
WORKER_COUNT: cfg.MustInt(goconfig.DEFAULT_SECTION, "worker_count", DEFAULT_WORKER_COUNT),
Expand Down
38 changes: 18 additions & 20 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,39 @@ import (
"sync"
"time"

"owl/common/tcp"
"owl/common/types"
"owl/controller/cache"

"github.com/wuyingsong/tcp"
)

var controller *Controller

type Controller struct {
*tcp.Server
*tcp.AsyncTCPServer
taskPool *TaskPool
resultPool *ResultPool
nodePool *NodePool
eventQueues map[int]*Queue
eventQueues map[int]*EventPool
eventQueuesMutex *sync.RWMutex
statusCache *cache.Cache
taskCache *cache.Cache
}

func InitController() error {
controllerServer := tcp.NewServer(GlobalConfig.TCP_BIND, &ControllerHandle{})
controllerServer.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE))
if err := controllerServer.ListenAndServe(); err != nil {
protocol := &tcp.DefaultProtocol{}
protocol.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE))
server := tcp.NewAsyncTCPServer(GlobalConfig.TCP_BIND, &callback{}, protocol)
if err := server.ListenAndServe(); err != nil {
return err
}
lg.Info("Start listen: %v", GlobalConfig.TCP_BIND)

controller = &Controller{controllerServer,
controller = &Controller{server,
NewTaskPool(GlobalConfig.TASK_POOL_SIZE),
NewResultPool(GlobalConfig.RESULT_POOL_SIZE),
NewNodePool(),
generateQueues(),
make(map[int]*EventPool),
new(sync.RWMutex),
cache.New(time.Duration(GlobalConfig.LOAD_STRATEGIES_INTERVAL)*time.Second, 30*time.Second),
cache.New(10*time.Minute, 10*time.Minute)}
Expand All @@ -48,21 +50,12 @@ func InitController() error {
return nil
}

// generateQueues 生成产品线报警队列
func generateQueues() map[int]*Queue {
pq := make(map[int]*Queue)
for _, product := range mydb.GetProducts() {
pq[product.ID] = NewQueue(0)
}
return pq
}

//checkNodesForever 持续运行检查节点函数,并维护节点数组
func (c *Controller) checkNodesForever() {
for {
time_now := time.Now()
now := time.Now()
for ip, node := range c.nodePool.Nodes {
if time_now.Sub(node.Update) > time.Duration(time.Second*10) {
if now.Sub(node.Update) > time.Duration(time.Second*10) {
delete(c.nodePool.Nodes, ip)
lg.Warn("Inspector %v, %v lost from controller", ip, node.Hostname)
}
Expand Down Expand Up @@ -96,12 +89,17 @@ func (c *Controller) refreshQueue(products []*types.Product) {
now := time.Now()
for _, product := range products {
if _, ok := c.eventQueues[product.ID]; !ok {
c.eventQueues[product.ID] = NewQueue(0)
lg.Info("create event queue %s", product.Name)
eventPool := NewEventPool(product.Name, GlobalConfig.EVENT_POOL_SIZE)
c.eventQueues[product.ID] = eventPool
go processSingleQueue(eventPool)
}
lg.Info("refresh event queue %s", product.Name)
c.eventQueues[product.ID].update_time = now
}
for p, q := range c.eventQueues {
if now.Sub(q.update_time).Minutes() > 10 {
lg.Warn("product id %s event queue update time is expires, delete", p)
delete(c.eventQueues, p)
}
}
Expand Down
55 changes: 0 additions & 55 deletions controller/handle.go

This file was deleted.

4 changes: 2 additions & 2 deletions controller/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func queueStatus(c *gin.Context) {
defer controller.eventQueuesMutex.RUnlock()
for _, product := range mydb.GetProducts() {
if queue, ok := controller.eventQueues[product.ID]; ok {
qs = append(qs, gin.H{"id": product.ID, "name": product.Name, "count": queue.Size(), "mute": queue.mute})
qs = append(qs, gin.H{"id": product.ID, "name": product.Name, "count": queue.len(), "mute": queue.mute})
}
}
c.JSON(http.StatusOK, gin.H{"queues": qs})
Expand All @@ -59,7 +59,7 @@ func queueClean(c *gin.Context) {
id := c.Param("id")
i, _ := strconv.Atoi(id)
if queue, ok := controller.eventQueues[i]; ok {
queue.Clear()
queue.clean()
c.JSON(http.StatusOK, gin.H{"message": "cleaned!"})
return
}
Expand Down
Loading

0 comments on commit b68c01e

Please sign in to comment.