From b68c01ee0a63ec88148136447e312ad577fcced4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E8=BF=8E=E6=9D=BE?= Date: Fri, 8 Jun 2018 16:49:21 +0800 Subject: [PATCH] controller: optimize code structure && add more logs --- common/types/alarm.go | 48 +++++------ common/types/timeseries.go | 5 +- conf/controller.conf | 1 + controller/callback.go | 76 ++++++++++++++++ controller/config.go | 4 + controller/controller.go | 38 ++++---- controller/handle.go | 55 ------------ controller/health.go | 4 +- controller/notify.go | 33 +++---- controller/pool.go | 172 ++++++++++++++++++++++++++----------- controller/result.go | 144 +++++++++++++++++++------------ controller/strategy.go | 47 +++++----- inspector/callback.go | 49 +++++++++++ inspector/handle.go | 41 --------- inspector/inspector.go | 122 +++++++++++++++++--------- 15 files changed, 513 insertions(+), 326 deletions(-) create mode 100644 controller/callback.go delete mode 100644 controller/handle.go create mode 100644 inspector/callback.go delete mode 100644 inspector/handle.go diff --git a/common/types/alarm.go b/common/types/alarm.go index 7898189..93fa281 100644 --- a/common/types/alarm.go +++ b/common/types/alarm.go @@ -1,31 +1,29 @@ package types import ( - "bytes" - "encoding/binary" "encoding/json" "fmt" "sort" "strings" "time" -) -type AlarmMessageType byte + "github.com/wuyingsong/tcp" +) //报警服务消息类型定义 const ( - ALAR_MESS_INSPECTOR_HEARTBEAT AlarmMessageType = iota + 1 + ALAR_MESS_INSPECTOR_HEARTBEAT tcp.PacketType = iota + 1 ALAR_MESS_INSPECTOR_TASK_REQUEST ALAR_MESS_INSPECTOR_TASKS - ALAR_MESS_INSPECTOR_RESULTS + ALAR_MESS_INSPECTOR_RESULT ) //报警服务消息类型可读映射 -var AlarmMessageTypeText map[AlarmMessageType]string = map[AlarmMessageType]string{ +var AlarmMessageTypeText map[tcp.PacketType]string = map[tcp.PacketType]string{ ALAR_MESS_INSPECTOR_HEARTBEAT: "inspector heartbeat", ALAR_MESS_INSPECTOR_TASK_REQUEST: "inspector task request", ALAR_MESS_INSPECTOR_TASKS: "inspector tasks", - ALAR_MESS_INSPECTOR_RESULTS: "inspector results", + ALAR_MESS_INSPECTOR_RESULT: "inspector result", } //报警服务消息接口 @@ -33,15 +31,15 @@ type AlarmMessage interface { Encode() []byte } -func AlarmPack(t AlarmMessageType, m AlarmMessage) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.BigEndian, t) - if m != nil { - binary.Write(&buf, binary.BigEndian, m.Encode()) - } - binary.Write(&buf, binary.BigEndian, make([]byte, 0)) - return buf.Bytes() -} +// func AlarmPack(t MessageType, m AlarmMessage) []byte { +// var buf bytes.Buffer +// binary.Write(&buf, binary.BigEndian, t) +// if m != nil { +// binary.Write(&buf, binary.BigEndian, m.Encode()) +// } +// binary.Write(&buf, binary.BigEndian, make([]byte, 0)) +// return buf.Bytes() +// } type AlarmTask struct { ID string @@ -122,15 +120,15 @@ type AlarmResults struct { } type StrategyResult struct { - TaskID string - Priority int - TriggerResultSets map[string]*TriggerResultSet - ErrorMessage string - Triggered bool - CreateTime time.Time + TaskID string `json:"taskid"` + Priority int `json:"priority"` + TriggerResultSets map[string]*TriggerResultSet `json:"trigger_resultset"` + ErrorMessage string `json:"error_message"` + Triggered bool `json:"triggered"` + CreateTime time.Time `json:"create_time"` } -func (this *AlarmResults) Encode() []byte { +func (this *StrategyResult) Encode() []byte { data, err := json.Marshal(this) if err != nil { fmt.Println(err.Error()) @@ -138,7 +136,7 @@ func (this *AlarmResults) Encode() []byte { return data } -func (this *AlarmResults) Decode(data []byte) error { +func (this *StrategyResult) Decode(data []byte) error { return json.Unmarshal(data, &this) } diff --git a/common/types/timeseries.go b/common/types/timeseries.go index 128f81c..39bd973 100644 --- a/common/types/timeseries.go +++ b/common/types/timeseries.go @@ -15,7 +15,7 @@ type TimeSeriesData struct { DataType string `json:"data_type"` //COUNTER,GAUGE,DERIVE Value float64 `json:"value"` //99.00 Timestamp int64 `json:"timestamp"` //unix timestamp - Cycle int `json:"cycle"` + Cycle int `json:"cycle,omitempty"` Tags map[string]string `json:"tags"` //{"product":"app01", "group":"dev02"} } @@ -99,6 +99,9 @@ func ParseTags(name string) map[string]string { kv := strings.Split(name, ",") for _, v := range kv { tmp := strings.Split(v, "=") + if len(tmp) != 2 { + continue + } res[tmp[0]] = tmp[1] } return res diff --git a/conf/controller.conf b/conf/controller.conf index 3cdae9e..b01fbb4 100644 --- a/conf/controller.conf +++ b/conf/controller.conf @@ -16,6 +16,7 @@ max_packet_size=409600 load_strategies_interval=300 task_pool_size=409600 result_pool_size=409600 +event_pool_size = 1024 task_size=100 worker_count=20 diff --git a/controller/callback.go b/controller/callback.go new file mode 100644 index 0000000..cf14a67 --- /dev/null +++ b/controller/callback.go @@ -0,0 +1,76 @@ +package main + +import ( + "owl/common/types" + + "github.com/wuyingsong/tcp" +) + +type callback struct { +} + +func (cb *callback) OnConnected(conn *tcp.TCPConn) { + lg.Info("callback:%s connected", conn.GetRemoteAddr().String()) +} + +//链接断开回调 +func (cb *callback) OnDisconnected(conn *tcp.TCPConn) { + lg.Info("callback:%s disconnect ", conn.GetRemoteAddr().String()) +} + +//错误回调 +func (cb *callback) OnError(err error) { + lg.Error("callback: %s", err) +} + +func (cb *callback) OnMessage(conn *tcp.TCPConn, p tcp.Packet) { + defer func() { + if r := recover(); r != nil { + lg.Error("Recovered in OnMessage", r) + } + }() + pkt := p.(*tcp.DefaultPacket) + switch pkt.Type { + case types.ALAR_MESS_INSPECTOR_HEARTBEAT: + lg.Debug("receive %v %v", types.AlarmMessageTypeText[pkt.Type], string(pkt.Body)) + heartbeat := &types.HeartBeat{} + if err := heartbeat.Decode(pkt.Body); err != nil { + lg.Error(err.Error()) + return + } + controller.receiveHearbeat(heartbeat) + //TODO: optimized task allocate algorithm + case types.ALAR_MESS_INSPECTOR_TASK_REQUEST: + lg.Debug("receive get task request inspector %s", conn.GetRemoteAddr()) + tasks := &types.AlarmTasks{ + Tasks: controller.taskPool.getTasks(GlobalConfig.TASK_SIZE), + } + if len(tasks.Tasks) == 0 { + lg.Debug("task pool has no tasks can be sent to inspector %s", conn.GetRemoteAddr()) + return + } + lg.Info("sent %d task to inspector %s", len(tasks.Tasks), conn.GetRemoteAddr()) + conn.AsyncWritePacket( + tcp.NewDefaultPacket( + types.ALAR_MESS_INSPECTOR_TASKS, + tasks.Encode(), + ), + ) + // sess.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_TASKS, tasks)) + case types.ALAR_MESS_INSPECTOR_RESULT: + lg.Info("receive %v %v", types.AlarmMessageTypeText[pkt.Type], string(pkt.Body)) + var ( + result types.StrategyResult + err error + ) + if err = result.Decode(pkt.Body); err != nil { + lg.Error(err.Error()) + return + } + if err = controller.resultPool.putResult(&result); err != nil { + lg.Error("put task result to pool failed error:%s result:%s", err, result.Encode()) + } + default: + lg.Error("unknown option: %v", pkt.Type) + } +} diff --git a/controller/config.go b/controller/config.go index 2bdab81..6f8e609 100644 --- a/controller/config.go +++ b/controller/config.go @@ -20,6 +20,7 @@ const ( DEFAULT_LOAD_STRATEGIES_INTERVAL = 300 //seconds DEFAULT_TASK_POOL_SIZE = 409600 DEFAULT_RESULT_POOL_SIZE = 409600 + DEFAULT_EVENT_POOL_SIZE = 1024 DEFAULT_HTTP_SERVER = ":10051" DEFAULT_TASK_SIZE = 100 DEFAULT_WORKER_COUNT = 20 @@ -53,6 +54,8 @@ type Config struct { RESULT_POOL_SIZE int //结果池的缓冲大小 + EVENT_POOL_SIZE int + HTTP_SERVER string //Http服务的地址 TASK_SIZE int //单次获取任务数 @@ -87,6 +90,7 @@ func InitGlobalConfig() error { LOAD_STRATEGIES_INTERVAL: cfg.MustInt(goconfig.DEFAULT_SECTION, "load_strategies_interval", DEFAULT_LOAD_STRATEGIES_INTERVAL), TASK_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "task_pool_size", DEFAULT_TASK_POOL_SIZE), RESULT_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "result_pool_size", DEFAULT_RESULT_POOL_SIZE), + EVENT_POOL_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "event_pool_size", DEFAULT_EVENT_POOL_SIZE), HTTP_SERVER: cfg.MustValue(goconfig.DEFAULT_SECTION, "http_server", DEFAULT_HTTP_SERVER), TASK_SIZE: cfg.MustInt(goconfig.DEFAULT_SECTION, "task_size", DEFAULT_TASK_SIZE), WORKER_COUNT: cfg.MustInt(goconfig.DEFAULT_SECTION, "worker_count", DEFAULT_WORKER_COUNT), diff --git a/controller/controller.go b/controller/controller.go index aaae630..121cc96 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -4,37 +4,39 @@ import ( "sync" "time" - "owl/common/tcp" "owl/common/types" "owl/controller/cache" + + "github.com/wuyingsong/tcp" ) var controller *Controller type Controller struct { - *tcp.Server + *tcp.AsyncTCPServer taskPool *TaskPool resultPool *ResultPool nodePool *NodePool - eventQueues map[int]*Queue + eventQueues map[int]*EventPool eventQueuesMutex *sync.RWMutex statusCache *cache.Cache taskCache *cache.Cache } func InitController() error { - controllerServer := tcp.NewServer(GlobalConfig.TCP_BIND, &ControllerHandle{}) - controllerServer.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE)) - if err := controllerServer.ListenAndServe(); err != nil { + protocol := &tcp.DefaultProtocol{} + protocol.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE)) + server := tcp.NewAsyncTCPServer(GlobalConfig.TCP_BIND, &callback{}, protocol) + if err := server.ListenAndServe(); err != nil { return err } lg.Info("Start listen: %v", GlobalConfig.TCP_BIND) - controller = &Controller{controllerServer, + controller = &Controller{server, NewTaskPool(GlobalConfig.TASK_POOL_SIZE), NewResultPool(GlobalConfig.RESULT_POOL_SIZE), NewNodePool(), - generateQueues(), + make(map[int]*EventPool), new(sync.RWMutex), cache.New(time.Duration(GlobalConfig.LOAD_STRATEGIES_INTERVAL)*time.Second, 30*time.Second), cache.New(10*time.Minute, 10*time.Minute)} @@ -48,21 +50,12 @@ func InitController() error { return nil } -// generateQueues 生成产品线报警队列 -func generateQueues() map[int]*Queue { - pq := make(map[int]*Queue) - for _, product := range mydb.GetProducts() { - pq[product.ID] = NewQueue(0) - } - return pq -} - //checkNodesForever 持续运行检查节点函数,并维护节点数组 func (c *Controller) checkNodesForever() { for { - time_now := time.Now() + now := time.Now() for ip, node := range c.nodePool.Nodes { - if time_now.Sub(node.Update) > time.Duration(time.Second*10) { + if now.Sub(node.Update) > time.Duration(time.Second*10) { delete(c.nodePool.Nodes, ip) lg.Warn("Inspector %v, %v lost from controller", ip, node.Hostname) } @@ -96,12 +89,17 @@ func (c *Controller) refreshQueue(products []*types.Product) { now := time.Now() for _, product := range products { if _, ok := c.eventQueues[product.ID]; !ok { - c.eventQueues[product.ID] = NewQueue(0) + lg.Info("create event queue %s", product.Name) + eventPool := NewEventPool(product.Name, GlobalConfig.EVENT_POOL_SIZE) + c.eventQueues[product.ID] = eventPool + go processSingleQueue(eventPool) } + lg.Info("refresh event queue %s", product.Name) c.eventQueues[product.ID].update_time = now } for p, q := range c.eventQueues { if now.Sub(q.update_time).Minutes() > 10 { + lg.Warn("product id %s event queue update time is expires, delete", p) delete(c.eventQueues, p) } } diff --git a/controller/handle.go b/controller/handle.go deleted file mode 100644 index 6426ec9..0000000 --- a/controller/handle.go +++ /dev/null @@ -1,55 +0,0 @@ -package main - -import ( - "owl/common/tcp" - "owl/common/types" -) - -type ControllerHandle struct { -} - -func (this *ControllerHandle) MakeSession(sess *tcp.Session) { - lg.Info("%s new connection ", sess.RemoteAddr()) -} - -func (this *ControllerHandle) LostSession(sess *tcp.Session) { - lg.Info("%s disconnect ", sess.RemoteAddr()) -} - -func (this *ControllerHandle) Handle(sess *tcp.Session, data []byte) { - defer func() { - if err := recover(); err != nil { - lg.Error("Recovered in HandleMessage", err) - } - }() - mt := types.AlarmMessageType(data[0]) - switch mt { - case types.ALAR_MESS_INSPECTOR_HEARTBEAT: - lg.Info("Receive %v %v", types.AlarmMessageTypeText[mt], string(data[1:])) - heartbeat := &types.HeartBeat{} - if err := heartbeat.Decode(data[1:]); err != nil { - lg.Error(err.Error()) - return - } - controller.receiveHearbeat(heartbeat) - case types.ALAR_MESS_INSPECTOR_TASK_REQUEST: - tasks := GetAlarmTasks() - sess.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_TASKS, tasks)) - case types.ALAR_MESS_INSPECTOR_RESULTS: - lg.Info("Receive %v %v", types.AlarmMessageTypeText[mt], string(data[1:])) - results := &types.AlarmResults{} - if err := results.Decode(data[1:]); err != nil { - lg.Error(err.Error()) - return - } - controller.resultPool.PutResults(results) - default: - lg.Error("Unknown option: %v", mt) - } -} - -func GetAlarmTasks() *types.AlarmTasks { - tasks := controller.taskPool.GetTasks(GlobalConfig.TASK_SIZE) - tasks_resp := &types.AlarmTasks{tasks} - return tasks_resp -} diff --git a/controller/health.go b/controller/health.go index 4e46a45..5575657 100644 --- a/controller/health.go +++ b/controller/health.go @@ -49,7 +49,7 @@ func queueStatus(c *gin.Context) { defer controller.eventQueuesMutex.RUnlock() for _, product := range mydb.GetProducts() { if queue, ok := controller.eventQueues[product.ID]; ok { - qs = append(qs, gin.H{"id": product.ID, "name": product.Name, "count": queue.Size(), "mute": queue.mute}) + qs = append(qs, gin.H{"id": product.ID, "name": product.Name, "count": queue.len(), "mute": queue.mute}) } } c.JSON(http.StatusOK, gin.H{"queues": qs}) @@ -59,7 +59,7 @@ func queueClean(c *gin.Context) { id := c.Param("id") i, _ := strconv.Atoi(id) if queue, ok := controller.eventQueues[i]; ok { - queue.Clear() + queue.clean() c.JSON(http.StatusOK, gin.H{"message": "cleaned!"}) return } diff --git a/controller/notify.go b/controller/notify.go index 3ef3dd7..1eb5a19 100644 --- a/controller/notify.go +++ b/controller/notify.go @@ -43,25 +43,27 @@ func (c *Controller) processStrategyEventForever() { c.eventQueuesMutex.RLock() defer c.eventQueuesMutex.RUnlock() for _, queue := range c.eventQueues { + lg.Info("process queue event %s", queue.name) go processSingleQueue(queue) } } -func processSingleQueue(queue *Queue) { +//TODO: fix when product delete, goroutine leak +func processSingleQueue(queue *EventPool) { + duration := time.Millisecond * 100 for { - duration := time.Millisecond * 100 - if !queue.mute { - event, err := queue.Get(0) - if err != nil { - lg.Error(err.Error()) - continue + if queue.len() > GlobalConfig.SEND_MAX { + duration = time.Microsecond * time.Duration(queue.len()) + if duration.Seconds() > float64(GlobalConfig.MAX_INTERVAL_WAIT_TIME) { + duration = time.Second * time.Duration(GlobalConfig.MAX_INTERVAL_WAIT_TIME) } - go processSingleEvent(event.(*QueueEvent)) } - if queue.Size() > GlobalConfig.SEND_MAX { - duration = time.Millisecond * time.Duration(queue.Size()) - if duration > time.Second*time.Duration(GlobalConfig.MAX_INTERVAL_WAIT_TIME) { - duration = time.Second * time.Duration(GlobalConfig.MAX_INTERVAL_WAIT_TIME) + if !queue.mute { + event := queue.getQueueEvent() + if queue.mute { + queue.putQueueEvent(event) + } else { + go processSingleEvent(event) } } time.Sleep(duration) @@ -71,8 +73,8 @@ func processSingleQueue(queue *Queue) { func processSingleEvent(event *QueueEvent) { switch event.status { case NEW_ALARM: - last_id, _ := mydb.CreateStrategyEvent(event.strategy_event, event.trigger_events) - event.strategy_event.ID = last_id + lastID, _ := mydb.CreateStrategyEvent(event.strategy_event, event.trigger_events) + event.strategy_event.ID = lastID go doAlarmAction(event.strategy_event, event.trigger_events) case OLD_ALARM: mydb.UpdateStrategyEvent(event.strategy_event, event.trigger_events) @@ -81,7 +83,7 @@ func processSingleEvent(event *QueueEvent) { mydb.UpdateStrategyEvent(event.strategy_event, event.trigger_events) mydb.CreateStrategyEventProcess(event.strategy_event.ID, event.strategy_event.Status, "系统", "报警恢复", time.Now().Format("2006-01-02 15:04:05")) go doRestoreAction(event.strategy_event, event.trigger_events) - event.strategy_event.Count += 1 + event.strategy_event.Count++ default: lg.Error(fmt.Sprintf("unknow event type %d from event queue.", event.status)) } @@ -93,6 +95,7 @@ func doAlarmAction(strategy_event *types.StrategyEvent, trigger_events map[strin strategy_event.Status = types.EVENT_NEW actions := mydb.GetAllActions(strategy_event.StrategyID) for _, action := range actionFilter(actions, strategy_event) { + lg.Info("do action:%v", action) switch action.Kind { case types.ACTION_NOTIFY: subject := action.AlarmSubject diff --git a/controller/pool.go b/controller/pool.go index 42bd531..187cb3e 100644 --- a/controller/pool.go +++ b/controller/pool.go @@ -1,13 +1,19 @@ package main import ( - "owl/controller/cache" + "errors" "sync" "time" "owl/common/types" ) +var ( + ErrTaskPoolFull = errors.New("task pool is full") + ErrResultPoolFull = errors.New("result pool is full") + ErrEventPoolFull = errors.New("event pool is full") +) + type TaskPool struct { tasks chan *types.AlarmTask } @@ -16,53 +22,70 @@ func NewTaskPool(size int) *TaskPool { return &TaskPool{make(chan *types.AlarmTask, size)} } -func (this *TaskPool) PutTasks(items map[string]cache.Item) error { - for { - if len(this.tasks) != 0 { - lg.Warn("task pool still have task: %v, wait it to be finished.", len(this.tasks)) - time.Sleep(time.Second * time.Duration(1)) - continue - } else { - break - } - } - for _, item := range items { - task, _ := item.Object.(*types.AlarmTask) - LOOP: - for { - select { - case this.tasks <- task: - lg.Debug("load task %v into task pool", task.ID) - break LOOP - default: - lg.Warn("task pool is full") - time.Sleep(time.Second * time.Duration(1)) - } - } +// func (this *TaskPool) PutTasks(items map[string]cache.Item) error { +// if len(this.tasks) != 0 { +// this.clean() +// } +// for _, item := range items { +// task, _ := item.Object.(*types.AlarmTask) + +// for { +// err := this.putTask(task) +// if err == nil { +// break +// } +// if err == ErrTaskPoolFull { +// expireTask := <-this.tasks +// lg.Warn("task pool is full, drop %s hostid:%s hostname:%s ip:%s", +// expireTask.Strategy.Name, expireTask.Host.ID, expireTask.Host.Hostname, expireTask.Host.IP) +// } +// } +// } +// return nil +// } + +// func (this *TaskPool) clean() { +// for { +// select { +// case task := <-this.tasks: +// lg.Warn("drop expires task %s hostid:%s hostname:%s ip:%s", +// task.Strategy.Name, task.Host.ID, task.Host.Hostname, task.Host.IP) +// default: +// return +// } +// } +// } + +func (tp *TaskPool) putTask(task *types.AlarmTask) error { + select { + case tp.tasks <- task: + lg.Info("put new task into task pool, taskid:%s strategy:%s hostname:%s ip:%s", + task.ID, task.Strategy.Name, task.Host.Hostname, task.Host.IP) + return nil + default: + return ErrTaskPoolFull } - return nil } -func (this *TaskPool) GetTasks(task_count int) []*types.AlarmTask { +func (tp *TaskPool) getTasks(batchSize int) []*types.AlarmTask { tasks := make([]*types.AlarmTask, 0) -OUTTER_LOOP: - for task_count > 0 { - INNER_LOOP: - for { - select { - case task := <-this.tasks: - tasks = append(tasks, task) - task_count -= 1 - lg.Debug("get task %v from task pool", task.ID) - break INNER_LOOP - default: - break OUTTER_LOOP - } + for ; batchSize > 0; batchSize-- { + if task := tp.getTask(); task != nil { + tasks = append(tasks, task) } } return tasks } +func (tp *TaskPool) getTask() *types.AlarmTask { + var task *types.AlarmTask + select { + case task = <-tp.tasks: + default: + } + return task +} + type ResultPool struct { results chan *types.StrategyResult } @@ -73,20 +96,26 @@ func NewResultPool(size int) *ResultPool { func (this *ResultPool) PutResults(ar *types.AlarmResults) { for _, result := range ar.Results { - LOOP: - for { - select { - case this.results <- result: - lg.Debug("load result %v into result pool", result.TaskID) - break LOOP - default: - lg.Warn("result pool is full") - time.Sleep(time.Second * time.Duration(1)) - } + select { + case this.results <- result: + lg.Debug("put task result into result pool, taskid:%s", result.TaskID) + default: + lg.Warn("result pool is full") + time.Sleep(time.Second * time.Duration(1)) } } } +func (this *ResultPool) putResult(result *types.StrategyResult) error { + select { + case this.results <- result: + lg.Debug("put task result into result pool, taskid:%s", result.TaskID) + return nil + default: + return ErrResultPoolFull + } +} + type NodePool struct { Nodes map[string]*types.Node Lock *sync.Mutex @@ -95,3 +124,48 @@ type NodePool struct { func NewNodePool() *NodePool { return &NodePool{make(map[string]*types.Node), &sync.Mutex{}} } + +type EventPool struct { + name string + events chan *QueueEvent + update_time time.Time + mute bool +} + +func NewEventPool(name string, size int) *EventPool { + return &EventPool{name, make(chan *QueueEvent, size), time.Now(), false} +} + +func (ep *EventPool) putQueueEvent(event *QueueEvent) error { + select { + case ep.events <- event: + return nil + default: + return ErrEventPoolFull + } +} + +func (ep *EventPool) getQueueEvent() *QueueEvent { + select { + case qEvent := <-ep.events: + return qEvent + } +} + +func (ep *EventPool) cap() int { + return cap(ep.events) +} + +func (ep *EventPool) len() int { + return len(ep.events) +} + +func (ep *EventPool) clean() { + for { + select { + case <-ep.events: + default: + return + } + } +} diff --git a/controller/result.go b/controller/result.go index 524698d..3193da3 100644 --- a/controller/result.go +++ b/controller/result.go @@ -20,119 +20,149 @@ type QueueEvent struct { //持续处理报警结果 func (c *Controller) processStrategyResultForever() { - worker_count := GlobalConfig.WORKER_COUNT - for worker_count > 0 { + for i := 0; i < GlobalConfig.WORKER_COUNT; i++ { go func() { for { select { case result := <-c.resultPool.results: - lg.Debug("get result %v from result pool", result.TaskID) + lg.Debug("get task result from result pool, taskid:%s", result.TaskID) c.processResult(result) default: time.Sleep(time.Millisecond * 100) } } }() - worker_count -= 1 } } //处理来自Inspector计算后的结果对象 -func (c *Controller) processResult(strategy_result *types.StrategyResult) { - item, ok := c.taskCache.Get(strategy_result.TaskID) +func (c *Controller) processResult(strategyResult *types.StrategyResult) { + item, ok := c.taskCache.Get(strategyResult.TaskID) if !ok { - lg.Error(fmt.Sprintf("task %v not in cached task pool", strategy_result.TaskID)) + lg.Error(fmt.Sprintf("task %v not in cached task pool", strategyResult.TaskID)) return } task, _ := item.(*types.AlarmTask) - if strategy_result.ErrorMessage != "" { - event_status := 0 - if strategy_result.ErrorMessage == "no data" { - event_status = types.EVENT_NODATA - } else { - event_status = types.EVENT_UNKNOW + if strategyResult.ErrorMessage != "" { + eventStatus := types.EVENT_UNKNOW + if strategyResult.ErrorMessage == "no data" { + eventStatus = types.EVENT_NODATA } - mydb.CreateStrategyEventFailed(task.Strategy.ID, task.Host.ID, event_status, strategy_result.ErrorMessage) - lg.Error(fmt.Sprintf("task %v has problem %v", strategy_result.TaskID, strategy_result.ErrorMessage)) + lg.Warn(fmt.Sprintf("task %v %v strategy:%s hostname:%s ip:%s", + strategyResult.TaskID, strategyResult.ErrorMessage, task.Strategy.Name, task.Host.Hostname, task.Host.IP)) + mydb.CreateStrategyEventFailed(task.Strategy.ID, task.Host.ID, eventStatus, strategyResult.ErrorMessage) return - } else { - go syncStrategyEventFailed(task.Strategy.ID, task.Host.ID) } - var aware_strategy_event, old_strategy_event *types.StrategyEvent + go syncStrategyEventFailed(task.Strategy.ID, task.Host.ID) + + var awareStrategyEvent, oldStrategyEvent *types.StrategyEvent - if aware_strategy_event = mydb.GetStrategyEvent(task.Strategy.ID, types.EVENT_AWARE, task.Host.ID); aware_strategy_event != nil { - strategy_event, trigger_events := generateEvent(aware_strategy_event, strategy_result, task) - if strategy_result.Triggered == false { - triggered_trigger_events := mydb.GetTriggeredTriggerEvents(strategy_event.ID) - trigger_events = tagChangedTrigger(triggered_trigger_events, trigger_events) - strategy_event.Status = types.EVENT_CLOSED + //获取知悉 + if awareStrategyEvent = mydb.GetStrategyEvent(task.Strategy.ID, types.EVENT_AWARE, task.Host.ID); awareStrategyEvent != nil { + strategyEvent, triggerEvents := generateEvent(awareStrategyEvent, strategyResult, task) + // 告警恢复 + if strategyResult.Triggered == false { + triggeredTriggerEvents := mydb.GetTriggeredTriggerEvents(strategyEvent.ID) + triggerEvents = tagChangedTrigger(triggeredTriggerEvents, triggerEvents) + strategyEvent.Status = types.EVENT_CLOSED + event := &QueueEvent{RESTORE_ALARM, strategyEvent, triggerEvents} c.eventQueuesMutex.RLock() - c.eventQueues[strategy_event.ProductID].PutNoWait(&QueueEvent{RESTORE_ALARM, strategy_event, trigger_events}) + if err := c.eventQueues[strategyEvent.ProductID].putQueueEvent(event); err != nil { + lg.Error("put strategy event to event queue failed %s", err) + } else { + lg.Info("put strategy event to queue %d strategy:%s hostname:%s ip:%s", + strategyEvent.ProductID, strategyEvent.StrategyName, strategyEvent.HostName, strategyEvent.IP) + } c.eventQueuesMutex.RUnlock() - lg.Debug(fmt.Sprintf("put restore event by strategy %s into event queue.", strategy_event.StrategyName)) + lg.Debug(fmt.Sprintf("put restore event by strategy %s into event queue.", strategyEvent.StrategyName)) return } - if time.Now().Sub(strategy_event.AwareEndTime) > 0 && strategy_event.AwareEndTime.Sub(types.DEFAULT_TIME) != 0 { - strategy_event.Status = types.EVENT_NEW - strategy_event.AwareEndTime = types.DEFAULT_TIME - if strategy_event.Count < task.Strategy.AlarmCount || task.Strategy.AlarmCount == 0 { - strategy_event.Count += 1 + // 知悉过期 + if time.Now().Sub(strategyEvent.AwareEndTime) > 0 && strategyEvent.AwareEndTime.Sub(types.DEFAULT_TIME) != 0 { + strategyEvent.Status = types.EVENT_NEW + strategyEvent.AwareEndTime = types.DEFAULT_TIME + // 未达到最大告警次数 + if strategyEvent.Count < task.Strategy.AlarmCount || task.Strategy.AlarmCount == 0 { + strategyEvent.Count++ + event := &QueueEvent{OLD_ALARM, strategyEvent, triggerEvents} c.eventQueuesMutex.RLock() - c.eventQueues[strategy_event.ProductID].PutNoWait(&QueueEvent{OLD_ALARM, strategy_event, trigger_events}) + if err := c.eventQueues[strategyEvent.ProductID].putQueueEvent(event); err != nil { + lg.Error("put strategy event to event queue failed %s", err) + } else { + lg.Info("put strategy event to queue %d strategy:%s hostname:%s ip:%s", + strategyEvent.ProductID, strategyEvent.StrategyName, strategyEvent.HostName, strategyEvent.IP) + } c.eventQueuesMutex.RUnlock() - lg.Debug(fmt.Sprintf("put old alarm event by strategy %s into event queue.", strategy_event.StrategyName)) return } } - mydb.UpdateStrategyEvent(strategy_event, trigger_events) + mydb.UpdateStrategyEvent(strategyEvent, triggerEvents) return } - if old_strategy_event = mydb.GetStrategyEvent(task.Strategy.ID, types.EVENT_NEW, task.Host.ID); old_strategy_event != nil { - strategy_event, trigger_events := generateEvent(old_strategy_event, strategy_result, task) - if strategy_result.Triggered == false { - triggered_trigger_events := mydb.GetTriggeredTriggerEvents(strategy_event.ID) - trigger_events = tagChangedTrigger(triggered_trigger_events, trigger_events) - strategy_event.Status = types.EVENT_CLOSED + //获取已产生报警 + if oldStrategyEvent = mydb.GetStrategyEvent(task.Strategy.ID, types.EVENT_NEW, task.Host.ID); oldStrategyEvent != nil { + strategyEvent, triggerEvents := generateEvent(oldStrategyEvent, strategyResult, task) + //告警恢复 + if strategyResult.Triggered == false { + triggeredTriggerEvents := mydb.GetTriggeredTriggerEvents(strategyEvent.ID) + triggerEvents = tagChangedTrigger(triggeredTriggerEvents, triggerEvents) + strategyEvent.Status = types.EVENT_CLOSED + event := &QueueEvent{RESTORE_ALARM, strategyEvent, triggerEvents} c.eventQueuesMutex.RLock() - c.eventQueues[strategy_event.ProductID].PutNoWait(&QueueEvent{RESTORE_ALARM, strategy_event, trigger_events}) + if err := c.eventQueues[strategyEvent.ProductID].putQueueEvent(event); err != nil { + lg.Error("put strategy event to event queue failed %s", err) + } else { + lg.Info("put strategy event to queue %d strategy:%s hostname:%s ip:%s", + strategyEvent.ProductID, strategyEvent.StrategyName, strategyEvent.HostName, strategyEvent.IP) + } c.eventQueuesMutex.RUnlock() - lg.Debug(fmt.Sprintf("put restore event by strategy %s into event queue.", strategy_event.StrategyName)) return } - if strategy_event.Count >= task.Strategy.AlarmCount && task.Strategy.AlarmCount != 0 { - mydb.UpdateStrategyEvent(strategy_event, trigger_events) + if strategyEvent.Count >= task.Strategy.AlarmCount && task.Strategy.AlarmCount != 0 { + mydb.UpdateStrategyEvent(strategyEvent, triggerEvents) return } - strategy_event.Count += 1 + strategyEvent.Count++ + event := &QueueEvent{OLD_ALARM, strategyEvent, triggerEvents} c.eventQueuesMutex.RLock() - c.eventQueues[strategy_event.ProductID].PutNoWait(&QueueEvent{OLD_ALARM, strategy_event, trigger_events}) + if err := c.eventQueues[strategyEvent.ProductID].putQueueEvent(event); err != nil { + lg.Error("put strategy event to event queue failed %s", err) + } else { + lg.Info("put strategy event to queue %d strategy:%s hostname:%s ip:%s", + strategyEvent.ProductID, strategyEvent.StrategyName, strategyEvent.HostName, strategyEvent.IP) + } c.eventQueuesMutex.RUnlock() - lg.Debug(fmt.Sprintf("put old alarm event by strategy %s into event queue.", strategy_event.StrategyName)) return } - if old_strategy_event == nil && aware_strategy_event == nil { - strategy_event, trigger_events := generateEvent(nil, strategy_result, task) - if strategy_result.Triggered == true { - strategy_event.Status = types.EVENT_NEW + // 新产生的告警 + if oldStrategyEvent == nil && awareStrategyEvent == nil { + strategyEvent, triggerEvents := generateEvent(nil, strategyResult, task) + if strategyResult.Triggered == true { + strategyEvent.Status = types.EVENT_NEW + event := &QueueEvent{NEW_ALARM, strategyEvent, triggerEvents} c.eventQueuesMutex.RLock() - c.eventQueues[strategy_event.ProductID].PutNoWait(&QueueEvent{NEW_ALARM, strategy_event, trigger_events}) + if err := c.eventQueues[strategyEvent.ProductID].putQueueEvent(event); err != nil { + lg.Error("put strategy event to event queue failed %s", err) + } else { + lg.Info("put strategy event to queue %d strategy:%s hostname:%s ip:%s", + strategyEvent.ProductID, strategyEvent.StrategyName, strategyEvent.HostName, strategyEvent.IP) + } c.eventQueuesMutex.RUnlock() - lg.Debug(fmt.Sprintf("put new alarm event by strategy %s into event queue.", strategy_event.StrategyName)) } } } -func tagChangedTrigger(triggered_trigger_events []*types.TriggerEvent, trigger_events map[string]*types.TriggerEvent) map[string]*types.TriggerEvent { - for _, e := range triggered_trigger_events { - if value, ok := trigger_events[e.Index+e.Metric+e.Tags]; ok { +func tagChangedTrigger(triggeredTriggerEvents []*types.TriggerEvent, triggerEvents map[string]*types.TriggerEvent) map[string]*types.TriggerEvent { + for _, e := range triggeredTriggerEvents { + if value, ok := triggerEvents[e.Index+e.Metric+e.Tags]; ok { value.TriggerChanged = true } } - return trigger_events + return triggerEvents } func syncStrategyEventFailed(strategy_id int, host_id string) error { diff --git a/controller/strategy.go b/controller/strategy.go index 4e194a4..01119c6 100644 --- a/controller/strategy.go +++ b/controller/strategy.go @@ -6,42 +6,38 @@ import ( "time" ) -func (c *Controller) Add(task *types.AlarmTask) { - if len(task.Triggers) == 0 { - lg.Warn("task %v has no triggers, skipped it.", task.ID) - return - } - if task.Host.Status == "2" { - lg.Debug("host %v is forbidden, skipped it.", task.Host.ID) - return - } - c.taskCache.Set(task.ID, task, 10*time.Minute) -} - //持续定时加载报警策略 func (c *Controller) loadStrategiesForever() { + var wg sync.WaitGroup for { - var wait_group sync.WaitGroup + // 获取所有产品线 products := mydb.GetProducts() + // 更新产品线告警队列 c.refreshQueue(products) for _, product := range products { + // 根据产品线 id 获取策略 strategies := mydb.GetStrategies(product.ID) - wait_group.Add(1) + wg.Add(1) go func(strategies []*types.Strategy) { - defer wait_group.Done() + defer wg.Done() for _, strategy := range strategies { if strategy.Enable == false { - lg.Info("strategy %v is not enabled, skipped it.", strategy.Name) + lg.Info("strategy %s is not enabled, skipped it.", strategy.Name) continue } + // 根据策略 id 获取 trigger triggers := mydb.GetTriggersByStrategyID(strategy.ID) + //如果没有 trigger 则忽略 + if len(triggers) == 0 { + lg.Warn("strategy %s has no trigger, skipped it.", strategy.Name) + continue + } + // 生成 AlarmTask c.processSingleStrategy(strategy, triggers) } }(strategies) } - wait_group.Wait() - c.taskPool.PutTasks(c.taskCache.GetItems()) - lg.Info("loaded tasks %v for all products", c.taskCache.ItemCount()) + wg.Wait() time.Sleep(time.Second * time.Duration(GlobalConfig.LOAD_STRATEGIES_INTERVAL)) } } @@ -54,8 +50,17 @@ func (c *Controller) processSingleStrategy(strategy *types.Strategy, triggers ma } exHosts := mydb.GetHostsExByStrategyID(strategy.ID) for _, host := range globalHosts { - if _, ok := exHosts[host.ID]; !ok { - c.Add(types.NewAlarmTask(host, strategy, triggers)) + // 过滤排除主机 + if _, ok := exHosts[host.ID]; ok { + lg.Debug("strategy %d:%v exclude host %v:%v:%v", strategy.ID, strategy.Name, host.ID, host.IP, host.Hostname) + continue + } + // 向 taskCache 添加任务 + task := types.NewAlarmTask(host, strategy, triggers) + if err := c.taskPool.putTask(task); err != nil { + lg.Error("put new task into task pool failed %v, maybe you need to increase the task_pool_size", err) + continue } + c.taskCache.Set(task.ID, task, 10*time.Minute) } } diff --git a/inspector/callback.go b/inspector/callback.go new file mode 100644 index 0000000..0535b11 --- /dev/null +++ b/inspector/callback.go @@ -0,0 +1,49 @@ +package main + +import ( + "owl/common/types" + + "github.com/wuyingsong/tcp" +) + +type callback struct { +} + +func (cb *callback) OnConnected(conn *tcp.TCPConn) { + lg.Info("callback:%s connected", conn.GetRemoteAddr().String()) +} + +//链接断开回调 +func (cb *callback) OnDisconnected(conn *tcp.TCPConn) { + lg.Info("callback:%s disconnect ", conn.GetRemoteAddr().String()) +} + +//错误回调 +func (cb *callback) OnError(err error) { + lg.Error("callback: %s", err) +} + +//消息处理回调 +func (cb *callback) OnMessage(conn *tcp.TCPConn, p tcp.Packet) { + defer func() { + if r := recover(); r != nil { + lg.Error("Recovered in OnMessage", r) + } + }() + pkt := p.(*tcp.DefaultPacket) + switch pkt.Type { + case types.ALAR_MESS_INSPECTOR_TASKS: + at := &types.AlarmTasks{} + if err := at.Decode(pkt.Body); err != nil { + lg.Error(err.Error()) + return + } + if len(at.Tasks) == 0 { + return + } + lg.Info("receive %v %v", types.AlarmMessageTypeText[pkt.Type], string(pkt.Body)) + inspector.taskPool.PutTasks(at.Tasks) + default: + lg.Error("unknown option: %v", pkt.Type) + } +} diff --git a/inspector/handle.go b/inspector/handle.go deleted file mode 100644 index e4d8d1a..0000000 --- a/inspector/handle.go +++ /dev/null @@ -1,41 +0,0 @@ -package main - -import ( - "owl/common/tcp" - "owl/common/types" -) - -type InspectorHandle struct { -} - -func (this *InspectorHandle) MakeSession(sess *tcp.Session) { - lg.Info("%s new connection ", sess.RemoteAddr()) -} - -func (this *InspectorHandle) LostSession(sess *tcp.Session) { - lg.Info("%s disconnect ", sess.RemoteAddr()) -} - -func (this *InspectorHandle) Handle(sess *tcp.Session, data []byte) { - defer func() { - if err := recover(); err != nil { - lg.Error("recovered in HandleMessage", err) - } - }() - mt := types.AlarmMessageType(data[0]) - switch mt { - case types.ALAR_MESS_INSPECTOR_TASKS: - at := &types.AlarmTasks{} - if err := at.Decode(data[1:]); err != nil { - lg.Error(err.Error()) - return - } - if len(at.Tasks) == 0 { - return - } - lg.Info("receive %v %v", types.AlarmMessageTypeText[mt], string(data[1:])) - inspector.taskPool.PutTasks(at.Tasks) - default: - lg.Error("unknown option: %v", mt) - } -} diff --git a/inspector/inspector.go b/inspector/inspector.go index 43139cb..22c578b 100644 --- a/inspector/inspector.go +++ b/inspector/inspector.go @@ -3,17 +3,18 @@ package main import ( "errors" "fmt" + "net" "os" "strings" "time" - "owl/common/tcp" "owl/common/types" + + "github.com/wuyingsong/tcp" ) type Inspector struct { - *tcp.Server - session *tcp.Session + controller *tcp.TCPConn taskPool *TaskPool resultPool *ResultPool } @@ -29,9 +30,11 @@ func GetHostName() string { } func InitInspector() error { - server := tcp.NewServer("", &InspectorHandle{}) - server.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE)) - inspector = &Inspector{server, nil, NewTaskPool(GlobalConfig.MAX_TASK_BUFFER), NewResultPool(GlobalConfig.MAX_RESULT_BUFFER)} + + inspector = &Inspector{nil, NewTaskPool(GlobalConfig.MAX_TASK_BUFFER), NewResultPool(GlobalConfig.MAX_RESULT_BUFFER)} + if err := inspector.Dial(); err != nil { + return err + } go inspector.DialForever() go inspector.HeartBeatForever() go inspector.GetInspectorTasksForever() @@ -40,22 +43,29 @@ func InitInspector() error { return nil } -func (this *Inspector) Dial() { -retry: - session, err := this.Connect(GlobalConfig.CONTROLLER_ADDR, nil) - if err != nil || session.IsClosed() { - lg.Error("can not connect to controller %v, error: %v", GlobalConfig.CONTROLLER_ADDR, err) - time.Sleep(time.Second * time.Duration(3)) - goto retry +func (this *Inspector) Dial() error { + protocol := &tcp.DefaultProtocol{} + protocol.SetMaxPacketSize(uint32(GlobalConfig.MAX_PACKET_SIZE)) + tcpAddr, err := net.ResolveTCPAddr("tcp", GlobalConfig.CONTROLLER_ADDR) + if err != nil { + return err + } + tcpConn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return err } - this.session = session - this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_HEARTBEAT, types.NewHeartBeat(this.session.LocalAddr(), GetHostName()))) + this.controller = tcp.NewTCPConn(tcpConn, &callback{}, protocol) + this.controller.AsyncWritePacket( + tcp.NewDefaultPacket(types.ALAR_MESS_INSPECTOR_HEARTBEAT, types.NewHeartBeat(this.controller.GetLocalAddr().String(), GetHostName()).Encode()), + ) + // this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_HEARTBEAT, types.NewHeartBeat(this.session.LocalAddr(), GetHostName()))) lg.Info("inspector connected to controller: %v", GlobalConfig.CONTROLLER_ADDR) + return this.controller.Serve() } func (this *Inspector) DialForever() { for { - if this.session == nil || this.session.IsClosed() { + if this.controller == nil || this.controller.IsClosed() { this.Dial() } time.Sleep(time.Second * time.Duration(3)) @@ -64,8 +74,17 @@ func (this *Inspector) DialForever() { func (this *Inspector) HeartBeatForever() { for { - if this.session != nil { - this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_HEARTBEAT, types.NewHeartBeat(this.session.LocalAddr(), GetHostName()))) + if this.controller != nil { + heartBeat := types.NewHeartBeat( + this.controller.GetLocalAddr().String(), + GetHostName(), + ) + this.controller.AsyncWritePacket( + tcp.NewDefaultPacket( + types.ALAR_MESS_INSPECTOR_HEARTBEAT, + heartBeat.Encode(), + ), + ) } time.Sleep(time.Second * 5) } @@ -73,40 +92,63 @@ func (this *Inspector) HeartBeatForever() { func (this *Inspector) GetInspectorTasksForever() { for { - if len(this.taskPool.tasks) == 0 && this.session != nil { - this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_TASK_REQUEST, nil)) + if len(this.taskPool.tasks) == 0 && this.controller != nil { + this.controller.AsyncWritePacket( + tcp.NewDefaultPacket( + types.ALAR_MESS_INSPECTOR_TASK_REQUEST, + []byte{}, + ), + ) } - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Second * 1) } } func (this *Inspector) SendResultForever() { - result_buffer := &types.AlarmResults{} - for { - select { - case result := <-this.resultPool.results: - result_buffer.Results = append(result_buffer.Results, result) - if len(result_buffer.Results) == GlobalConfig.RESULT_BUFFER { - this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_RESULTS, result_buffer)) - lg.Info("Send %d Results to controller", len(result_buffer.Results)) - result_buffer.Results = result_buffer.Results[:0] - } - default: - if len(result_buffer.Results) > 0 { - this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_RESULTS, result_buffer)) - lg.Info("Send %d Results to controller", len(result_buffer.Results)) - result_buffer.Results = result_buffer.Results[:0] + var err error + for result := range this.resultPool.results { + for { + if err = this.controller.AsyncWritePacket( + tcp.NewDefaultPacket(types.ALAR_MESS_INSPECTOR_RESULT, result.Encode()), + ); err == nil { + break } - time.Sleep(time.Millisecond * 100) + lg.Warn("send task result to controller failed error %s", err) + time.Sleep(time.Second) } + lg.Info("send task result to controller %s result:%s", result.Encode(), GlobalConfig.CONTROLLER_ADDR) } } +// for { +// select { +// case result := <-this.resultPool.results: +// result_buffer.Results = append(result_buffer.Results, result) +// if len(result_buffer.Results) == GlobalConfig.RESULT_BUFFER { +// this.controller.AsyncWritePacket( +// tcp.NewDefaultPacket(types.ALAR_MESS_INSPECTOR_RESULT, result_buffer.Encode()), +// ) +// lg.Info("Send %d Results to controller", len(result_buffer.Results)) +// result_buffer.Results = result_buffer.Results[:0] +// } +// default: +// if len(result_buffer.Results) > 0 { +// this.controller.AsyncWritePacket( +// tcp.NewDefaultPacket(types.ALAR_MESS_INSPECTOR_RESULTS, result_buffer.Encode()), +// ) +// // this.session.Send(types.AlarmPack(types.ALAR_MESS_INSPECTOR_RESULTS, result_buffer)) +// lg.Info("Send %d Results to controller", len(result_buffer.Results)) +// result_buffer.Results = result_buffer.Results[:0] +// } +// time.Sleep(time.Millisecond * 100) +// } +// } +// } + func (this *Inspector) ProcessInspectorTasksForever() { - worker_count := GlobalConfig.WORKER_COUNT - for worker_count > 0 { + + for wc := GlobalConfig.WORKER_COUNT; wc > 0; wc-- { go this.taskWorker() - worker_count -= 1 } }