Skip to content

Commit

Permalink
feat: add elastic search ingester module
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Jan 28, 2022
1 parent 38ce5db commit 7c3c671
Show file tree
Hide file tree
Showing 12 changed files with 698 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cmd/slo_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/seznam/slo-exporter/pkg/elasticsearch_ingester"
"runtime"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -71,6 +72,8 @@ func moduleFactory(moduleName string, logger logrus.FieldLogger, conf *viper.Vip
return prometheus_ingester.NewFromViper(conf, logger)
case "kafkaIngester":
return kafka_ingester.NewFromViper(conf, logger)
case "elasticSearchIngester":
return elasticsearch_ingester.NewFromViper(conf, logger)
case "envoyAccessLogServer":
return envoy_access_log_server.NewFromViper(conf, logger)
case "eventMetadataRenamer":
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Only produces new events from the specified data source.
- [`prometheusIngester`](modules/prometheus_ingester.md)
- [`envoyAccessLogServer`](modules/envoy_access_log_server.md)
- [`kafkaIngester`](modules/kafka_ingester.md)
- [`elasticSearchIngester`](modules/elasticsearch_ingester.md)
##### Processors:
Reads input events, does some processing based in the module type and produces modified event.
Expand Down
82 changes: 82 additions & 0 deletions docs/modules/elasticsearch_ingester.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Elasticsearch ingester

| | |
|----------------|-------------------------|
| `moduleName` | `elasticSearchIngester` |
| Module type | `producer` |
| Output event | `raw` |

This module allows you to real time follow all new documents using Elasticsearch query and compute SLO based on those.

Most common use case would be, if running in Kubernetes for example and already collecting logs using the ELK stack. You
can simply hook to those data and compute SLO based on those application logs.

### Elastic search versions and support

Currently, only v7 is supported.

### How does it work

The module periodically(interval is configurable) queries(you can pass in custom Lucene query)
the Elasticsearch index(needs to be specified) and for every hit creates a new event from the document. All the
documents needs to have a field with a timestamp(field name and format configurable), so the module can sort them and
store the last queried document timestamp. In next iteration it will use this timestamp as lower limit for the range
query, so it does not miss any entries. Each query is limited by maximum batch size(configurable) co the requests are
not huge.

In case you do not use structured logging and your logs are not indexed, you can specify name of the field with the raw
log entry and regular expression with named groups which, if matched, will be propagated to the event metadata.

### moduleConfig

```yaml
# OPTIONAL Debug logging
debug: false
# REQUIRED Version of the Elasticsearch API to be used, possible values: 7
apiVersion: "v7"
# REQUIRED List of addresses pointing to the Elasticsearch API endpoint to query
addresses:
- "https://foo.bar:4433"
# OPTIONAL Skip verification of the server certificate
insecureSkipVerify: false
# OPTIONAL Timeout for the Elasticsearch API call
timeout: "5s"
# Enable/disable sniffing, autodiscovery of other nodes in Elasticsearch cluster
sniffing: true
# Enable/disable healtchecking of the Elasticsearch nodes
healthchecks: true

# OPTIONAL username to use for authentication
username: "foo"
# OPTIONAL password to use for authentication
password: "bar"
# OPTIONAL Client certificate to be used for authentication
clientCertFile: "./client.pem"
# OPTIONAL Client certificate key to be used for authentication
clientKeyFile: "./client-key.pem"
# OPTIONAL Custom CA certificate to verify the server certificate
clientCaCertFile: "./ca.cert"

# OPTIONAL Interval how often to check for new documents from the Elasticsearch API.
# If the module was falling behind fo the amount of documents in the Elaseticsearch, it will
# query it more often.
interval: 5s
# REQUIRED Name of the index to be queried
index: "my-index"
# OPTIONAL Additional Lucene query to filter the results
query: "app_name: nginx AND namespace: test"
# OPTIONAL Maximum number of documents to be read in one batch
maxBatchSize: 100

# REQUIRED Document filed name containing a timestamp of the event
timestampField: "@timestamp"
# REQUIRED Golang time parse format to parse the timestamp from the timestampField
timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples
# OPTIONAL Name of the field in document containing the raw log message you want to parse
rawLogField: "log"
# OPTIONAL Regular expression to be used to parse the raw log message,
# each matched named group will be propagated to the new event metadata
rawLogParseRegexp: '^(?P<ip>[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P<time>.*?)\] "(?P<httpMethod>[^\s]+)?\s+(?P<httpPath>[^\?\s]+).*'
# OPTIONAL If content of the named group match this regexp, it will be considered as an empty value.
rawLogEmptyGroupRegexp: '^-$'
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/hpcloud/tail v1.0.1-0.20180514194441-a1dbeea552b7
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/klauspost/compress v1.14.1 // indirect
github.com/olivere/elastic/v7 v7.0.31
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.31.1
Expand Down
13 changes: 12 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.40.37/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.42.23/go.mod h1:gyRszuZ/icHmHAVE4gc/r+cfCmhA1AD+vqfWbgI+eHs=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.8.1/go.mod h1:CM+19rL1+4dFWnOQKwDc7H1KwXTz+h61oUSHyhV0b3o=
Expand Down Expand Up @@ -608,6 +609,7 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down Expand Up @@ -1179,6 +1181,7 @@ github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqx
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
Expand Down Expand Up @@ -1285,6 +1288,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
Expand Down Expand Up @@ -1433,6 +1438,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
github.com/olivere/elastic/v7 v7.0.31 h1:VJu9/zIsbeiulwlRCfGQf6Tzsr++uo+FeUgj5oj+xKk=
github.com/olivere/elastic/v7 v7.0.31/go.mod h1:idEQxe7Es+Wr4XAuNnJdKeMZufkA9vQprOIFck061vg=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -1715,11 +1722,14 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
Expand Down Expand Up @@ -2144,6 +2154,7 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211101193420-4a448f8816b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
52 changes: 52 additions & 0 deletions pkg/elasticsearch_client/elastic_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package elasticsearch_client

import (
"context"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"time"
)

var (
ElasticApiCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "elasticsearch_request_seconds",
Help: "Duration histogram of elasticsearch api calls",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 5),
}, []string{"api_version", "endpoint", "error"})
)

type Config struct {
Addresses []string
Username string
Password string
Timeout time.Duration
Healtchecks bool
Sniffing bool
InsecureSkipVerify bool
ClientCertFile string
ClientKeyFile string
CaCertFile string
Debug bool
}

type Client interface {
RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, error)
}

var clientFactory = map[string]func(config Config, logger logrus.FieldLogger) (Client, error){
"v7": NewV7Client,
}

func NewClient(version string, config Config, logger logrus.FieldLogger) (Client, error) {
factoryFn, ok := clientFactory[version]
if !ok {
var supportedValues []string
for k, _ := range clientFactory {
supportedValues = append(supportedValues, k)
}
return nil, fmt.Errorf("unsupported Elasticsearch API version %s, only supported values are: %s", version, supportedValues)
}
return factoryFn(config, logger)
}
97 changes: 97 additions & 0 deletions pkg/elasticsearch_client/v7.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package elasticsearch_client

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
elasticV7 "github.com/olivere/elastic/v7"
"github.com/sirupsen/logrus"
"io/ioutil"
"net"
"net/http"
"time"
)

func NewV7Client(config Config, logger logrus.FieldLogger) (Client, error) {
var clientCertFn func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
if config.ClientKeyFile != "" && config.ClientCertFile != "" {
clientCertFn = func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to read client certs %s, %s: %w", config.ClientCertFile, config.ClientKeyFile, err)
}
return &cert, nil
}
}

var clientCaCertPool *x509.CertPool
if config.CaCertFile != "" {
cert, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
return nil, fmt.Errorf("failed to read clientCaCertFile %s: %w", config.CaCertFile, err)
}
clientCaCertPool = x509.NewCertPool()
clientCaCertPool.AppendCertsFromPEM(cert)
}
httpClient := http.Client{
Transport: &http.Transport{
ResponseHeaderTimeout: config.Timeout,
DialContext: (&net.Dialer{Timeout: config.Timeout}).DialContext,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: config.InsecureSkipVerify,
GetClientCertificate: clientCertFn,
ClientCAs: clientCaCertPool,
},
},
Timeout: config.Timeout,
}
opts := []elasticV7.ClientOptionFunc{
elasticV7.SetHttpClient(&httpClient),
elasticV7.SetErrorLog(logger),
elasticV7.SetURL(config.Addresses...),
elasticV7.SetScheme("https"),
elasticV7.SetSniff(config.Sniffing),
elasticV7.SetHealthcheck(config.Healtchecks),
}
if config.Debug {
opts = append(opts, elasticV7.SetTraceLog(logger), elasticV7.SetInfoLog(logger))
}
if config.Username != "" || config.Password != "" {
opts = append(opts, elasticV7.SetBasicAuth(config.Username, config.Password))
}
cli, err := elasticV7.NewClient(opts...)
if err != nil {
return nil, err
}
return &v7Client{client: cli, logger: logger}, nil
}

type v7Client struct {
logger logrus.FieldLogger
client *elasticV7.Client
}

func (v *v7Client) RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, error) {
filters := []elasticV7.Query{
elasticV7.NewRangeQuery(timestampField).From(since),
}
if query != "" {
filters = append(filters, elasticV7.NewQueryStringQuery(query))
}
q := elasticV7.NewBoolQuery().Filter(filters...)
start := time.Now()
result, err := v.client.Search().Index(index).TimeoutInMillis(int(timeout.Milliseconds())).Size(size).Sort(timestampField, true).Query(q).Do(ctx)
if err != nil {
ElasticApiCall.WithLabelValues("v7", "rangeSearch", err.Error()).Observe(time.Since(start).Seconds())
return nil, err
}
ElasticApiCall.WithLabelValues("v7", "rangeSearch", "").Observe(time.Since(start).Seconds())
v.logger.WithFields(logrus.Fields{"index": index, "hits": len(result.Hits.Hits), "duration_ms": result.TookInMillis, "query": query, "since": since}).Debug("elastic search range search call")
msgs := make([]json.RawMessage, len(result.Hits.Hits))
for i, h := range result.Hits.Hits {
msgs[i] = h.Source
}
return msgs, err
}
Loading

0 comments on commit 7c3c671

Please sign in to comment.