Skip to content

Commit

Permalink
feat: add elastic search ingester module
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <[email protected]>
  • Loading branch information
FUSAKLA committed Jan 28, 2022
1 parent 38ce5db commit 33d8209
Show file tree
Hide file tree
Showing 11 changed files with 529 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cmd/slo_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/seznam/slo-exporter/pkg/elasticsearch_ingester"
"runtime"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -71,6 +72,8 @@ func moduleFactory(moduleName string, logger logrus.FieldLogger, conf *viper.Vip
return prometheus_ingester.NewFromViper(conf, logger)
case "kafkaIngester":
return kafka_ingester.NewFromViper(conf, logger)
case "elasticSearchIngester":
return elasticsearch_ingester.NewFromViper(conf, logger)
case "envoyAccessLogServer":
return envoy_access_log_server.NewFromViper(conf, logger)
case "eventMetadataRenamer":
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Only produces new events from the specified data source.
- [`prometheusIngester`](modules/prometheus_ingester.md)
- [`envoyAccessLogServer`](modules/envoy_access_log_server.md)
- [`kafkaIngester`](modules/kafka_ingester.md)
- [`elasticSearchIngester`](modules/elasticsearch_ingester.md)
##### Processors:
Reads input events, does some processing based in the module type and produces modified event.
Expand Down
33 changes: 33 additions & 0 deletions docs/modules/elasticsearch_ingester.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Elasticsearch ingester

| | |
|----------------|-------------------------|
| `moduleName` | `elasticSearchIngester` |
| Module type | `producer` |
| Output event | `raw` |

This module allows you to read events as a documents form Elastic search (assuming ELK stack).

### Elastic search versions and support
Currently, only v7 is supported.

### moduleConfig
```yaml
addresses:
- "https://foo.bar:4433"
index: "*:sklik-production-search"
clientCertFile: "./client.pem"
clientKeyFile: "./client-key.pem"
clientCaCertFile: "./ca.cert"
debug: true
insecureSkipVerify: false
maxBatchSize: 100
interval: 5s
timeout: 5s
timestampField: "@timestamp"
timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples
query: "app_name: nginx AND namespace: test"
rawLogField: "log"
rawLogParseRegexp: '^(?P<ip>[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P<time>.*?)\] "(?P<httpMethod>[^\s]+)?\s+(?P<httpPath>[^\?\s]+)(?P<httpQuery>[^\s]+)?\s+(?P<protocolVersion>[^\s]+)\s*" (?P<statusCode>\d+) \d+ "(?P<referer>.*?)" uag="(?P<userAgent>[^"]+)" "[^"]+" ua="[^"]+" rt="(?P<requestDuration>\d+(\.\d+)??)".*? cc="(?P<contentClass>[^"]*)".*? ignore-slo="(?P<ignoreSloHeader>[^"]*)" slo-domain="(?P<sloDomain>[^"]*)" slo-app="(?P<sloApp>[^"]*)" slo-class="(?P<sloClass>[^"]*)" slo-endpoint="(?P<sloEndpoint>[^"]*)" slo-result="(?P<sloResult>[^"]*)"'
rawLogEmptyGroupRegexp: '^-$'
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/hpcloud/tail v1.0.1-0.20180514194441-a1dbeea552b7
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/klauspost/compress v1.14.1 // indirect
github.com/olivere/elastic/v7 v7.0.31
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.31.1
Expand Down
13 changes: 12 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.40.37/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go v1.42.23/go.mod h1:gyRszuZ/icHmHAVE4gc/r+cfCmhA1AD+vqfWbgI+eHs=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.8.1/go.mod h1:CM+19rL1+4dFWnOQKwDc7H1KwXTz+h61oUSHyhV0b3o=
Expand Down Expand Up @@ -608,6 +609,7 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
Expand Down Expand Up @@ -1179,6 +1181,7 @@ github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqx
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
Expand Down Expand Up @@ -1285,6 +1288,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
Expand Down Expand Up @@ -1433,6 +1438,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
github.com/olivere/elastic/v7 v7.0.31 h1:VJu9/zIsbeiulwlRCfGQf6Tzsr++uo+FeUgj5oj+xKk=
github.com/olivere/elastic/v7 v7.0.31/go.mod h1:idEQxe7Es+Wr4XAuNnJdKeMZufkA9vQprOIFck061vg=
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -1715,11 +1722,14 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
Expand Down Expand Up @@ -2144,6 +2154,7 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211101193420-4a448f8816b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
25 changes: 25 additions & 0 deletions pkg/elasticsearch_ingester/elastic_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package elasticsearch_ingester

import (
"context"
"encoding/json"
"github.com/prometheus/client_golang/prometheus"
"time"
)

var (
elasticApiCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "elasticsearch_request_seconds",
Help: "Duration histogram of elasticsearch api calls",
Buckets: prometheus.ExponentialBuckets(0.1, 2, 5),
}, []string{"api_version", "endpoint", "error"})
)

type document struct {
timestamp time.Time
fields map[string]string
}

type elasticClient interface {
RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, error)
}
171 changes: 171 additions & 0 deletions pkg/elasticsearch_ingester/elastic_tailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package elasticsearch_ingester

import (
"context"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
"regexp"
"sync"
"time"

tailer_module "github.com/seznam/slo-exporter/pkg/tailer"
)

var (
searchedDocuments = prometheus.NewCounter(prometheus.CounterOpts{
Name: "searched_documents_total",
Help: "How many documents were retrieved from the elastic search",
})
lastSearchTimestamp = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "last_document_timestamp_seconds",
Help: "Timestamp of the last processed document, next fetch will read since this timestamp",
})
missingRawLogField = prometheus.NewCounter(prometheus.CounterOpts{
Name: "missing_raw_log_filed_total",
Help: "How many times defined raw log wasn't found in the document",
})
invalidrawLogFormat = prometheus.NewCounter(prometheus.CounterOpts{
Name: "raw_log_invalid_format_total",
Help: "How many times the raw log had invalid format",
})
missingTimestampField = prometheus.NewCounter(prometheus.CounterOpts{
Name: "missing_timestamp_field_total",
Help: "How many times the timestamp field was missing",
})
invalidTimestampFormat = prometheus.NewCounter(prometheus.CounterOpts{
Name: "invalid_timestamp_format_total",
Help: "How many times the timestamp field had invalid format",
})
)

func newTailer(logger logrus.FieldLogger, client elasticClient, index, timestampField, timestampFormat, rawLogField string, rawLogFormatRegexp, rawLogEmptyGroupRegexp *regexp.Regexp, query string, timeout time.Duration, maxBatchSize int) tailer {
return tailer{
client: client,
index: index,
timestampField: timestampField,
timestampFormat: timestampFormat,
rawLogField: rawLogField,
rawLogFormatRegexp: rawLogFormatRegexp,
rawLogEmptyGroupRegexp: rawLogEmptyGroupRegexp,
lastTimestamp: time.Now(),
lastTimestampMtx: sync.RWMutex{},
maxBatchSize: maxBatchSize,
timeout: timeout,
query: query,
logger: logger,
}
}

type tailer struct {
client elasticClient
index string
timestampField string
timestampFormat string
rawLogField string
rawLogFormatRegexp *regexp.Regexp
rawLogEmptyGroupRegexp *regexp.Regexp
query string
lastTimestamp time.Time
lastTimestampMtx sync.RWMutex
maxBatchSize int
timeout time.Duration
logger logrus.FieldLogger

processing atomic.Bool
}

func (t *tailer) newDocumentFromJson(data json.RawMessage) (document, error) {
newDoc := document{
timestamp: time.Now(),
fields: map[string]string{},
}

var fields map[string]interface{}
err := json.Unmarshal(data, &fields)
if err != nil {
return newDoc, fmt.Errorf("unable to unmarshall document body: %w", err)
}
for k, v := range fields {
newDoc.fields[k] = fmt.Sprint(v)
}

if t.rawLogField != "" {
rawLog, ok := newDoc.fields[t.rawLogField]
if !ok {
missingRawLogField.Inc()
t.logger.WithField("document", newDoc.fields).Warnf("document missing the raw log field %s", t.rawLogField)
} else {
rawLogFields, err := tailer_module.ParseLine(t.rawLogFormatRegexp, t.rawLogEmptyGroupRegexp, rawLog)
if err != nil {
invalidrawLogFormat.Inc()
t.logger.WithField("document", newDoc.fields).Warnf("document has invalid format of the raw log field %s", t.rawLogField)
}
for k, v := range rawLogFields {
newDoc.fields[k] = v
}
}
}

timeFiled, ok := newDoc.fields[t.timestampField]
if !ok {
missingTimestampField.Inc()
t.logger.WithField("document", newDoc.fields).Warnf("document missing the timestamp field %s, using now instead", t.timestampField)
return newDoc, nil
} else {
ts, err := time.Parse(t.timestampFormat, timeFiled)
if err != nil {
invalidTimestampFormat.Inc()
t.logger.WithField("document", newDoc.fields).WithField("timestamp", timeFiled).Warnf("document has invalid timestamp field %s, using now instead", t.timestampField)
return newDoc, nil
}
newDoc.timestamp = ts
t.lastTimestamp = ts
lastSearchTimestamp.Set(float64(t.lastTimestamp.Unix()))
}
return newDoc, nil
}

func (t *tailer) run(ctx context.Context, interval time.Duration) chan document {
ticker := time.NewTicker(interval)
outChan := make(chan document, t.maxBatchSize)
go func() {
defer ticker.Stop()
defer close(outChan)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if t.processing.Load() {
t.logger.Warnf("skipping scheduled query")
continue
}
t.processing.Store(true)

jsonDocs, err := t.client.RangeSearch(ctx, t.index, t.timestampField, t.lastTimestamp, t.maxBatchSize, t.query, t.timeout)
if err != nil {
t.logger.WithFields(logrus.Fields{"error": err, "since": t.lastTimestamp}).Error("failed to search data from elastic search")
continue
}
for _, jd := range jsonDocs {
select {
case <-ctx.Done():
return
default:
newDoc, err := t.newDocumentFromJson(jd)
if err != nil {
t.logger.WithFields(logrus.Fields{"error": err, "document": jd}).Errorf("failed to read document")
}
searchedDocuments.Inc()
outChan <- newDoc
}
}
}
t.processing.Store(false)
}
}()
return outChan
}
Loading

0 comments on commit 33d8209

Please sign in to comment.