From 7932d5e259b8d9725a04f02404dc7ada58090382 Mon Sep 17 00:00:00 2001 From: andersh Date: Sat, 2 Dec 2023 08:55:39 +0100 Subject: [PATCH] fix: cleanup log format and messages, spell check, use apic json api for auth --- README.md | 23 +++++--- aci-connection.go | 142 ++++++++++++++++++++++++++++------------------ streamer.go | 84 ++++++++++++++++++++------- 3 files changed, 167 insertions(+), 82 deletions(-) diff --git a/README.md b/README.md index 3a1d408..ea0a1bf 100644 --- a/README.md +++ b/README.md @@ -5,19 +5,27 @@ aci-streamer - A Cisco ACI log streamer using ACI REST API subscription mechanis # Overview -The aci-streamer use the Cisco ACI API subscription to provide log streaming on events on ACI classes. +The aci-streamer use the Cisco ACI API subscription to provide streaming on events on a ACI classes. +According to the Cisco [documentation](https://www.cisco.com/c/en/us/td/docs/dcn/aci/apic/all/apic-rest-api-configuration-guide/cisco-apic-rest-api-configuration-guide-42x-and-later/m_using_the_rest_api.html) +subscription works for: + +*"The REST API supports the subscription to one or more MOs during your active API session. +When any MO is created, changed, or deleted because of a user- or system-initiated action, an event is +generated. If the event changes the data on any of the active subscribed queries, +the APIC will send out a notification to the API client that created the subscription."* + The most basic example is subscribing to events on the ACI class `faultInst` to get continues stream of fault events. -The streamed events are by default written to stdout and format is json, so it's easy to consume by any log systems -like [Loki](https://github.com/grafana/loki) and [Elastic](https://www.elastic.co/). +The streamed events are by default written to stdout and the format is json, so it's easy to consume by any +systems like [Loki](https://github.com/grafana/loki) and [Elastic](https://www.elastic.co/). ![Dashboard example](images/streamer_example.png) In the above screenshot we have combined Prometheus and Loki data sources in the same dashboard. In the middle row we have the Loki logs based on a “stream” from the aci-streamer called faults. -In the left graph we have the log panel for Loki logs. The query is based on the upper left Grafana variable filters -that are applied for the whole dashboard. +In the left graph we have the log panel for Loki logs. The query is based on the upper left Grafana variable +filters that are applied for the whole dashboard. {fabric=~"$Aci",stream="faults",podid=~"$Podid",nodeid=~"$Nodeid",severity=~"$Severity"} @@ -25,7 +33,8 @@ On the right graph we create a simple fault rate metric based on the log data. sum by (nodeid,severity) (rate({stream="faults",fabric=~"$Aci",podid=~"$Podid",nodeid=~"$Nodeid",severity=~"$Severity"}[5m])) -The rest of the graphs are based on the aci-exporter Prometheus data using the same label filters, so we can drill down both on pod, node and severity. +The rest of the graphs are based on the [aci-exporter](https://github.com/opsdis/aci-exporter) +Prometheus data using the same label filters, so we can drill down both on pod, node and severity. The events streams are configured by a definition of `streams`. The below example create a stream of ACI created sessions. @@ -78,7 +87,7 @@ Original message from ACI looks like: }] } ``` -After processed by the streams configuration: +After processed by the above streams configuration: ```json { "annotation": "", diff --git a/aci-connection.go b/aci-connection.go index c231402..6594d08 100644 --- a/aci-connection.go +++ b/aci-connection.go @@ -75,7 +75,7 @@ type Socket struct { } -func newAciConnction(ctx context.Context, fabricConfig Fabric, streams Streams, output string) *AciConnection { +func newAcidConnection(ctx context.Context, fabricConfig Fabric, streams Streams, output string) *AciConnection { // Empty cookie jar jar, _ := cookiejar.New(nil) @@ -92,9 +92,9 @@ func newAciConnction(ctx context.Context, fabricConfig Fabric, streams Streams, urlMap := make(map[string]string) - urlMap["login"] = "/api/aaaLogin.xml" - urlMap["refresh"] = "/api/aaaRefresh.xml" - urlMap["logout"] = "/api/aaaLogout.xml" + urlMap["login"] = "/api/aaaLogin.json" + urlMap["refresh"] = "/api/aaaRefresh.json" + urlMap["logout"] = "/api/aaaLogout.json" urlMap["fabric_name"] = "/api/mo/topology/pod-1/node-1/av.json" // Create websocket definitions from fabricConfig @@ -139,18 +139,22 @@ func (c AciConnection) login() error { func (c AciConnection) authenticate(method string) error { for i, controller := range c.fabricConfig.Apic { - _, status, err, jars := c.doPostXML(fmt.Sprintf("%s%s", controller, c.URLMap[method]), - []byte(fmt.Sprintf("", c.fabricConfig.Username, c.fabricConfig.Password))) + _, status, err, jars := c.doPostJSON(fmt.Sprintf("%s%s", controller, c.URLMap[method]), + []byte(fmt.Sprintf("{\"aaaUser\":{\"attributes\":{\"name\":\"%s\",\"pwd\":\"%s\"}}}", c.fabricConfig.Username, c.fabricConfig.Password))) if err != nil || status != 200 { err = fmt.Errorf("failed to %s to %s, try next apic", method, controller) - log.Error(err) + log.WithFields(log.Fields{ + "requestid": c.ctx.Value("requestid"), + "method": method, + "controller": controller, + }).Error("authentication failed") } else { *c.activeController = i log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Info(fmt.Sprintf("Using apic %s", controller)) + }).Debug(fmt.Sprintf("Using apic %s", controller)) c.printCookie(jars) *c.cookieValue = jars[0].Value return nil @@ -167,8 +171,8 @@ func (c AciConnection) printCookie(jars []*http.Cookie) { var curCk = jars[i] //log.Printf("curCk.Raw=%s", curCk.Raw) log.Debug(fmt.Sprintf("Cookie [%d]", i)) - log.Info(fmt.Sprintf("Name=%s", curCk.Name)) - log.Info(fmt.Sprintf("Value\t=%s", curCk.Value)) + log.Debug(fmt.Sprintf("Name=%s", curCk.Name)) + log.Debug(fmt.Sprintf("Value\t=%s", curCk.Value)) log.Debug(fmt.Sprintf("Path\t=%s", curCk.Path)) log.Debug(fmt.Sprintf("Domain\t=%s", curCk.Domain)) log.Debug(fmt.Sprintf("Expires\t=%s", curCk.Expires)) @@ -192,12 +196,13 @@ func (c AciConnection) subscriptionRefresh(subscriptionId string) error { } func (c AciConnection) logout() bool { - _, status, err, _ := c.doPostXML(fmt.Sprintf("%s%s", c.fabricConfig.Apic[*c.activeController], c.URLMap["logout"]), - []byte(fmt.Sprintf("", c.fabricConfig.Username))) + _, status, err, _ := c.doPostJSON(fmt.Sprintf("%s%s", c.fabricConfig.Apic[*c.activeController], c.URLMap["logout"]), + []byte(fmt.Sprintf("{\"aaaUser\":{\"attributes\":{\"name\":\"%s\"}}}", c.fabricConfig.Username))) if err != nil || status != 200 { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "error": err, + }).Error("logout") return false } return true @@ -213,7 +218,11 @@ func (c AciConnection) subscribe(class string, query string) (string, error) { } if err != nil { - log.Error(fmt.Sprintf("Class request %s failed - %s.", class, err)) + log.WithFields(log.Fields{ + "class": class, + "query": query, + "error": err, + }).Error("subscribe failed") return "", err } subscriptionId := gjson.Get(string(data), "subscriptionId").Str @@ -233,7 +242,7 @@ func (c AciConnection) startWebSocket(fabricACIName string, ch chan string) { schema := c.websocketConfig.websocket[*c.activeController].schema u := url.URL{Scheme: schema, Host: host, Path: "/socket" + *c.cookieValue} - log.Info(fmt.Sprintf("WS connecting to %s", u.String())) + wsHeaders := http.Header{ "Origin": {u.Host}, "Sec-WebSocket-Extensions": {"permessage-deflate; client_max_window_bits, x-webkit-deflate-frame"}, @@ -242,13 +251,20 @@ func (c AciConnection) startWebSocket(fabricACIName string, ch chan string) { d := websocket.Dialer{TLSClientConfig: &config, HandshakeTimeout: 45 * time.Second} start := time.Now() wc, _, err := d.Dial(u.String(), wsHeaders) - log.Info(fmt.Sprintf("WS connection time %d", time.Since(start).Milliseconds())) if err != nil { - log.Error("dial:", err) + log.WithFields(log.Fields{ + "fabric": fabricACIName, + "error": err, + }).Error("websocket connection") ch <- "failed" return } + log.WithFields(log.Fields{ + "fabric": fabricACIName, + "exec_time": time.Since(start).Milliseconds(), + }).Info(fmt.Sprintf("websocket connection")) + loggo := make(log4go.Logger) if c.outputName == "" { flw := log4go.NewConsoleLogWriter() @@ -265,12 +281,14 @@ func (c AciConnection) startWebSocket(fabricACIName string, ch chan string) { ch <- "started" - defer wc.Close() + //defer wc.Close() for { _, mesg, err := wc.ReadMessage() if err != nil { - log.Error("WS read:", err) + log.WithFields(log.Fields{ + "error": err, + }).Error("websocket read message failed") // send 0 for reconnect ch <- "failed" wc.Close() @@ -279,30 +297,30 @@ func (c AciConnection) startWebSocket(fabricACIName string, ch chan string) { } wsCounter.With(prometheus.Labels{"fabric": c.fabricConfig.Name, "aci": fabricACIName}).Add(1) - c.output(c.reciver(fabricACIName, mesg), loggo) - + c.output(c.receiver(fabricACIName, mesg), loggo) } + if breakout == "breakout" { - log.Info("WS breakout") + wc.Close() return } } } -func (c AciConnection) activeSubscribtions(ids map[string]string) { +func (c AciConnection) activeSubscriptions(ids map[string]string) { for k, v := range ids { c.activeSubscribtionIds[k] = v } } -func (c AciConnection) reciver(fabricACIName string, mesg []byte) string { - subscribtionName := c.getSubscribersName(mesg) - if subscribtionName == "" { +func (c AciConnection) receiver(fabricACIName string, mesg []byte) string { + subscriptionName := c.getSubscribersName(mesg) + if subscriptionName == "" { // Not my subscribtion return "" } - stream := c.streams[subscribtionName] + stream := c.streams[subscriptionName] labels := make(map[string]string) json := gjson.Get(string(mesg), stream.Root) @@ -319,7 +337,6 @@ func (c AciConnection) reciver(fabricACIName string, mesg []byte) string { } messageProperties := make([]interface{}, len(stream.Message.Properties)) - //messageProperties := make(map[string]interface{}) for k, v := range stream.Message.Properties { messageProperties[k] = gjson.Get(json.Raw, v).Str val, ok := labels[v] @@ -328,30 +345,30 @@ func (c AciConnection) reciver(fabricACIName string, mesg []byte) string { } } - modjson := json.Raw + modJSON := json.Raw if len(labels) > 0 { for k, v := range labels { - modjson, _ = sjson.Set(modjson, k, v) + modJSON, _ = sjson.Set(modJSON, k, v) } } if stream.Message.Name != "" { - modjson, _ = sjson.Set(modjson, stream.Message.Name, fmt.Sprintf(stream.Message.Format, messageProperties...)) + modJSON, _ = sjson.Set(modJSON, stream.Message.Name, fmt.Sprintf(stream.Message.Format, messageProperties...)) } - modjson, _ = sjson.Set(modjson, "aci", fabricACIName) - modjson, _ = sjson.Set(modjson, "fabric", c.fabricConfig.Name) + modJSON, _ = sjson.Set(modJSON, "aci", fabricACIName) + modJSON, _ = sjson.Set(modJSON, "fabric", c.fabricConfig.Name) if stream.Timestamp.PropertyName != "" { - modjson, _ = sjson.Set(modjson, "timestamp", strings.Split(gjson.Get(json.Raw, stream.Timestamp.PropertyName).Str, "+")[0]+"000000Z") + modJSON, _ = sjson.Set(modJSON, "timestamp", strings.Split(gjson.Get(json.Raw, stream.Timestamp.PropertyName).Str, "+")[0]+"000000Z") } - modjson, _ = sjson.Set(modjson, "stream", subscribtionName) + modJSON, _ = sjson.Set(modJSON, "stream", subscriptionName) // drop for _, v := range stream.Drops { - modjson, _ = sjson.Delete(modjson, v.PropertyName) + modJSON, _ = sjson.Delete(modJSON, v.PropertyName) } - return modjson + return modJSON } // output write the data to the selected stream - default stdout @@ -377,12 +394,12 @@ func (c AciConnection) getSubscribersName(mesg []byte) string { } func (c AciConnection) getFabricACIName() (string, error) { - data, err := c.getByQuery("fabric_name") + data, err := c.getByClassQuery("infraCont", "?query-target=self") + if err != nil { return "", err } - - return gjson.Get(data, "imdata.0.infraCont.attributes.fbDmNm").Str, nil + return gjson.Get(data, "imdata.#.infraCont.attributes.fbDmNm").Array()[0].Str, nil } func (c AciConnection) getByQuery(table string) (string, error) { @@ -399,7 +416,10 @@ func (c AciConnection) getByClassQuery(class string, query string) (string, erro if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(fmt.Sprintf("Class request %s failed - %s.", class, err)) + "class": class, + "query": query, + "error": err, + }).Error("Query failed") return "", err } return string(data), nil @@ -416,8 +436,8 @@ func (c AciConnection) get(url string) ([]byte, error) { "length": len(body), "requestid": c.ctx.Value("requestid"), "exec_time": time.Since(start).Microseconds(), - "system": "monitor", - }).Info("api call monitor system") + "system": "fabric", + }).Info("api call") return body, err } @@ -427,7 +447,9 @@ func (c AciConnection) doGet(url string) ([]byte, int, error) { if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "url": url, + "error": err, + }).Error("create GET request") return nil, 0, err } for k, v := range c.Headers { @@ -436,7 +458,11 @@ func (c AciConnection) doGet(url string) ([]byte, int, error) { resp, err := c.Client.GetClient().Do(req) if err != nil { - log.Error(err) + log.WithFields(log.Fields{ + "requestid": c.ctx.Value("requestid"), + "url": url, + "error": err, + }).Error("GET request") return nil, 0, err } @@ -447,7 +473,8 @@ func (c AciConnection) doGet(url string) ([]byte, int, error) { if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "status": resp.StatusCode, + }).Error("GET status") return nil, resp.StatusCode, err } @@ -456,27 +483,29 @@ func (c AciConnection) doGet(url string) ([]byte, int, error) { return nil, resp.StatusCode, fmt.Errorf("ACI api returned %d", resp.StatusCode) } -func (c AciConnection) doPostXML(url string, requestBody []byte) ([]byte, int, error, []*http.Cookie) { +func (c AciConnection) doPostJSON(url string, requestBody []byte) ([]byte, int, error, []*http.Cookie) { req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "error": err, + }).Error("create POST request") return nil, 0, err, nil } for k, v := range c.Headers { req.Header.Set(k, v) } - req.Header.Set("Content-Type", "application/xml") + req.Header.Set("Content-Type", "application/json") start := time.Now() resp, err := c.Client.GetClient().Do(req) if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "error": err, + }).Error("POST request") return nil, 0, err, nil } var status = resp.StatusCode @@ -486,8 +515,8 @@ func (c AciConnection) doPostXML(url string, requestBody []byte) ([]byte, int, e "status": status, "requestid": c.ctx.Value("requestid"), "exec_time": time.Since(start).Microseconds(), - "system": "monitor", - }).Info("api call monitor system") + "system": "fabric", + }).Info("api call") defer resp.Body.Close() @@ -496,11 +525,16 @@ func (c AciConnection) doPostXML(url string, requestBody []byte) ([]byte, int, e if err != nil { log.WithFields(log.Fields{ "requestid": c.ctx.Value("requestid"), - }).Error(err) + "error": err, + }).Error("read body") return nil, resp.StatusCode, err, nil } - return bodyBytes, resp.StatusCode, nil, c.Client.GetJar().Cookies(req.URL) + } else { + log.WithFields(log.Fields{ + "requestid": c.ctx.Value("requestid"), + "status": resp.StatusCode, + }).Error("POST status") } return nil, resp.StatusCode, fmt.Errorf("ACI api returned %d", resp.StatusCode), nil diff --git a/streamer.go b/streamer.go index f2aa2ba..df5f1ce 100644 --- a/streamer.go +++ b/streamer.go @@ -82,7 +82,7 @@ func main() { if *writeConfig { err := viper.WriteConfigAs("./aci_streamer_default_config.yaml") if err != nil { - log.Error("Can not write default config file - ", err) + fmt.Printf("Can not write default config file - %s\n", err) } os.Exit(0) } @@ -90,14 +90,14 @@ func main() { // Find and read the config file err := viper.ReadInConfig() if err != nil { - log.Error("Configuration file not valid ", err) + fmt.Printf("Configuration file not valid - %s\n", err) os.Exit(1) } var streams = Streams{} err = viper.UnmarshalKey("streams", &streams) if err != nil { - log.Error("Unable to decode streams into struct - ", err) + fmt.Printf("Unable to decode streams into struct - %s\n", err) os.Exit(1) } @@ -107,7 +107,7 @@ func main() { fabricConfig := Fabric{Name: *fabric, Username: username, Password: password, Apic: apicControllers} ctx := context.TODO() - connection := newAciConnction(ctx, fabricConfig, streams, *output) + connection := newAcidConnection(ctx, fabricConfig, streams, *output) // Create a Prometheus histogram for response time of the exporter responseTime := promauto.NewHistogramVec(prometheus.HistogramOpts{ @@ -128,7 +128,10 @@ func main() { }, )) - log.Info(fmt.Sprintf("%s starting on port %d", ExporterName, viper.GetInt("port"))) + log.WithFields(log.Fields{ + "port": viper.GetInt("port"), + "name": ExporterName, + }).Info("starting") s := &http.Server{ ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, @@ -150,19 +153,25 @@ func startStreamer(connection *AciConnection, streams Streams) { subIds := make(map[string]string) - // TODO add handling to restart everything if any of below fail for { select { case fromWS := <-ch: if fromWS == "failed" { // The websocket has for some reason failed and must be restarted - log.Info(fmt.Sprintf("WS Restart ===========================================")) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + }).Info("websocket restarted on failed") + go connection.startWebSocket(fabricACIName, ch) } if fromWS == "started" { // The websocket has been started and is ready - log.Info(fmt.Sprintf("Re-subscribe =======================")) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + }).Info(fmt.Sprintf("websocket ready for subscribtion")) subIds = make(map[string]string) for k, v := range streams { @@ -170,36 +179,70 @@ func startStreamer(connection *AciConnection, streams Streams) { subIds[subscriptionId] = k } - connection.activeSubscribtions(subIds) + connection.activeSubscriptions(subIds) } case <-time.After(time.Second * 30): - log.Info(fmt.Sprintf("Start refresh")) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "interval": 30, + }).Info(fmt.Sprintf("start subscribtion refresh on interval")) } - - log.Info(fmt.Sprintf("Refresh session")) + /* + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + }).Info(fmt.Sprintf("Refresh subscribtion")) + */ + // this is login again err = connection.sessionRefresh() if err != nil { - log.Error("Session refresh failed - ", err) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "error": err, + }).Error("session refresh failed") err = connection.login() if err != nil { - log.Error("Session login failed - ", err) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "error": err, + }).Error("session login failed") } } for k, v := range subIds { - log.Info(fmt.Sprintf("Refresh id %s for %s", k, v)) if v == "" { // Must subscribe again since lost id - log.Warn(fmt.Sprintf("Found empty subscribtion id, re subscribe for %s", v)) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "subscriptionid": k, + }).Warn(fmt.Sprintf("empty stream name")) delete(subIds, k) subscriptionId, _ := connection.subscribe(streams[v].ClassName, streams[v].QueryParameter) subIds[subscriptionId] = v - } + + // This websocket refresh err = connection.subscriptionRefresh(k) if err != nil { - log.Error("Subscription refresh failed, will be reconnected next iteration - ", err) + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "subscriptionid": k, + "stream": v, + "error": err, + }).Error("subscription refresh failed, will be reconnected next iteration") + } else { + log.WithFields(log.Fields{ + "requestid": connection.ctx.Value("requestid"), + "fabric": fabricACIName, + "subscriptionid": k, + "stream": v, + }).Info(fmt.Sprintf("refresh subscribtion")) } } } @@ -235,9 +278,8 @@ func logcall(next http.Handler) http.Handler { w.Header().Set("Content-Length", strconv.Itoa(lrw.length)) log.WithFields(log.Fields{ - "method": r.Method, - "uri": r.RequestURI, - //"endpoint": endpoint, + "method": r.Method, + "uri": r.RequestURI, "status": lrw.statusCode, "length": lrw.length, "requestid": requestid,