-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
108 lines (92 loc) · 3.27 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main
import (
"net/http"
"os"
"strings"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
. "github.com/larskluge/babl-qa/bablrequest"
. "github.com/larskluge/babl-qa/httpserver"
"github.com/larskluge/babl-server/kafka"
cache "github.com/muesli/cache2go"
)
type server struct {
kafkaClient *sarama.Client
kafkaProducer *sarama.SyncProducer
}
const Version = "1.1.1"
const clientID = "babl-qa"
var debug bool
func main() {
log.SetOutput(os.Stderr)
log.SetFormatter(&log.JSONFormatter{})
log.SetLevel(log.WarnLevel)
log.Warn("App START")
app := configureCli()
app.Run(os.Args)
}
func run(listen, kafkaBrokers string, dbg bool) {
debug = dbg
if debug {
log.SetLevel(log.DebugLevel)
}
s := server{}
// websockets
wsHub := NewHub()
const kafkaTopicQA = "logs.qa"
const kafkaTopicHistory = "logs.history"
const kafkaTopicDetails = "logs.details"
brokers := strings.Split(kafkaBrokers, ",")
s.kafkaClient = kafka.NewClient(brokers, kafkaTopicQA, debug)
defer (*s.kafkaClient).Close()
s.kafkaProducer = kafka.NewProducer(brokers, clientID+".producer")
defer (*s.kafkaProducer).Close()
chQAData := make(chan *QAJsonData)
chHistory := make(chan *RequestHistory)
chWSHistory := make(chan *[]byte)
chDetails := make(chan *[]RequestDetails)
cacheDetails := cache.Cache("cacheDetails")
// cache details
log.Warn("App Load Cache ...")
ReadRequestDetailsToCache(s.kafkaClient, kafkaTopicDetails, cacheDetails)
log.Warn("App Completed Load Cache")
//websockets
log.Warn("App Run Websockets Hub")
go wsHub.Run()
log.Warn("App Run ListenToLogsQA")
go ListenToLogsQA(s.kafkaClient, kafkaTopicQA, chQAData)
// other higher level go rotines go here
log.Warn("App Save/Broadcast Data")
go MonitorRequest(chQAData, chHistory, chWSHistory, chDetails, cacheDetails)
go SaveRequestHistory(s.kafkaProducer, kafkaTopicHistory, chHistory)
go WSBroadcastRequestHistory(wsHub, chWSHistory)
go SaveRequestDetails(s.kafkaProducer, kafkaTopicDetails, chDetails, cacheDetails)
// http callback function handler for Request History
// $ http 127.0.0.1:8888/api/request/history
// $ http 127.0.0.1:8888/api/request/history?blocksize=20
HandlerRequestHistory := func(w http.ResponseWriter, r *http.Request) {
lastn := GetVarsBlockSize(r, 10)
rhJson := ReadRequestHistory(s.kafkaClient, kafkaTopicHistory, lastn)
w.Header().Set("Content-Type", "application/json")
w.Write(rhJson)
}
// http callback function handler for Request Details
// http 127.0.0.1:8888/api/request/details/12345
HandlerRequestDetails := func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
rhJson := ReadRequestDetailsFromCache(vars["requestid"], cacheDetails)
w.Header().Set("Content-Type", "application/json")
w.Write(rhJson)
}
// http callback function handler for Request Payload
// http 127.0.0.1:8888/api/request/payload/babl.babl.Events.IO/4/3460
HandlerRequestPayload := func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
rhJson := ReadRequestPayload(s.kafkaClient, vars["topic"], vars["partition"], vars["offset"])
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(rhJson)
}
log.Warn("App Start WebServer")
StartHttpServer(listen, wsHub, HandlerRequestHistory, HandlerRequestDetails, HandlerRequestPayload)
}