-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add elastic search ingester module #82
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,82 @@ | ||||||
# Elasticsearch ingester | ||||||
|
||||||
| | | | ||||||
|----------------|-------------------------| | ||||||
| `moduleName` | `elasticSearchIngester` | | ||||||
| Module type | `producer` | | ||||||
| Output event | `raw` | | ||||||
|
||||||
This module allows you to real time follow all new documents using Elasticsearch query and compute SLO based on those. | ||||||
|
||||||
Most common use case would be, if running in Kubernetes for example and already collecting logs using the ELK stack. You | ||||||
can simply hook to those data and compute SLO based on those application logs. | ||||||
|
||||||
### Elastic search versions and support | ||||||
|
||||||
Currently, only v7 is supported. | ||||||
|
||||||
### How does it work | ||||||
|
||||||
The module periodically(interval is configurable) queries(you can pass in custom Lucene query) | ||||||
the Elasticsearch index(needs to be specified) and for every hit creates a new event from the document. All the | ||||||
documents needs to have a field with a timestamp(field name and format configurable), so the module can sort them and | ||||||
store the last queried document timestamp. In next iteration it will use this timestamp as lower limit for the range | ||||||
query, so it does not miss any entries. Each query is limited by maximum batch size(configurable) co the requests are | ||||||
not huge. | ||||||
|
||||||
In case you do not use structured logging and your logs are not indexed, you can specify name of the field with the raw | ||||||
log entry and regular expression with named groups which, if matched, will be propagated to the event metadata. | ||||||
|
||||||
### moduleConfig | ||||||
|
||||||
```yaml | ||||||
# OPTIONAL Debug logging | ||||||
debug: false | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once I checked what this option actually does, I would prefer it to be refactored to sth like - ES client log level. |
||||||
# REQUIRED Version of the Elasticsearch API to be used, possible values: 7 | ||||||
apiVersion: "v7" | ||||||
# REQUIRED List of addresses pointing to the Elasticsearch API endpoint to query | ||||||
addresses: | ||||||
- "https://foo.bar:4433" | ||||||
# OPTIONAL Skip verification of the server certificate | ||||||
insecureSkipVerify: false | ||||||
# OPTIONAL Timeout for the Elasticsearch API call | ||||||
timeout: "5s" | ||||||
# Enable/disable sniffing, autodiscovery of other nodes in Elasticsearch cluster | ||||||
sniffing: true | ||||||
# Enable/disable healtchecking of the Elasticsearch nodes | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
healthchecks: true | ||||||
|
||||||
# OPTIONAL username to use for authentication | ||||||
username: "foo" | ||||||
# OPTIONAL password to use for authentication | ||||||
password: "bar" | ||||||
# OPTIONAL Client certificate to be used for authentication | ||||||
clientCertFile: "./client.pem" | ||||||
# OPTIONAL Client certificate key to be used for authentication | ||||||
clientKeyFile: "./client-key.pem" | ||||||
# OPTIONAL Custom CA certificate to verify the server certificate | ||||||
clientCaCertFile: "./ca.cert" | ||||||
|
||||||
# OPTIONAL Interval how often to check for new documents from the Elasticsearch API. | ||||||
# If the module was falling behind fo the amount of documents in the Elaseticsearch, it will | ||||||
# query it more often. | ||||||
interval: 5s | ||||||
# REQUIRED Name of the index to be queried | ||||||
index: "my-index" | ||||||
# OPTIONAL Additional Lucene query to filter the results | ||||||
query: "app_name: nginx AND namespace: test" | ||||||
# OPTIONAL Maximum number of documents to be read in one batch | ||||||
maxBatchSize: 100 | ||||||
|
||||||
# REQUIRED Document filed name containing a timestamp of the event | ||||||
timestampField: "@timestamp" | ||||||
# REQUIRED Golang time parse format to parse the timestamp from the timestampField | ||||||
timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's reference something more official here :) |
||||||
# OPTIONAL Name of the field in document containing the raw log message you want to parse | ||||||
rawLogField: "log" | ||||||
# OPTIONAL Regular expression to be used to parse the raw log message, | ||||||
# each matched named group will be propagated to the new event metadata | ||||||
rawLogParseRegexp: '^(?P<ip>[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P<time>.*?)\] "(?P<httpMethod>[^\s]+)?\s+(?P<httpPath>[^\?\s]+).*' | ||||||
# OPTIONAL If content of the named group match this regexp, it will be considered as an empty value. | ||||||
rawLogEmptyGroupRegexp: '^-$' | ||||||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package elasticsearch_client | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/sirupsen/logrus" | ||
"time" | ||
) | ||
|
||
var ( | ||
ElasticApiCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{ | ||
Name: "elasticsearch_request_seconds", | ||
Help: "Duration histogram of elasticsearch api calls", | ||
Buckets: prometheus.ExponentialBuckets(0.1, 2, 5), | ||
}, []string{"api_version", "endpoint", "error"}) | ||
) | ||
|
||
type Config struct { | ||
Addresses []string | ||
Username string | ||
Password string | ||
Timeout time.Duration | ||
Healtchecks bool | ||
Sniffing bool | ||
InsecureSkipVerify bool | ||
ClientCertFile string | ||
ClientKeyFile string | ||
CaCertFile string | ||
Debug bool | ||
} | ||
|
||
type Client interface { | ||
RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error) | ||
} | ||
|
||
var clientFactory = map[string]func(config Config, logger logrus.FieldLogger) (Client, error){ | ||
"v7": NewV7Client, | ||
} | ||
|
||
func NewClient(version string, config Config, logger logrus.FieldLogger) (Client, error) { | ||
factoryFn, ok := clientFactory[version] | ||
if !ok { | ||
var supportedValues []string | ||
for k, _ := range clientFactory { | ||
supportedValues = append(supportedValues, k) | ||
} | ||
return nil, fmt.Errorf("unsupported Elasticsearch API version %s, only supported values are: %s", version, supportedValues) | ||
} | ||
return factoryFn(config, logger) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package elasticsearch_client | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"time" | ||
) | ||
|
||
func NewClientMock(data []json.RawMessage, documentsLeft int, err error) Client { | ||
return &clientMock{ | ||
data: data, | ||
documentsLeft: documentsLeft, | ||
error: err, | ||
} | ||
} | ||
|
||
type clientMock struct { | ||
data []json.RawMessage | ||
documentsLeft int | ||
error error | ||
} | ||
|
||
func (c *clientMock) RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error) { | ||
return c.data, c.documentsLeft, c.error | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package elasticsearch_client | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"crypto/x509" | ||
"encoding/json" | ||
"fmt" | ||
elasticV7 "github.com/olivere/elastic/v7" | ||
"github.com/sirupsen/logrus" | ||
"io/ioutil" | ||
"net" | ||
"net/http" | ||
"time" | ||
) | ||
|
||
func NewV7Client(config Config, logger logrus.FieldLogger) (Client, error) { | ||
var clientCertFn func(*tls.CertificateRequestInfo) (*tls.Certificate, error) | ||
if config.ClientKeyFile != "" && config.ClientCertFile != "" { | ||
clientCertFn = func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) { | ||
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read client certs %s, %s: %w", config.ClientCertFile, config.ClientKeyFile, err) | ||
} | ||
return &cert, nil | ||
} | ||
} | ||
|
||
var clientCaCertPool *x509.CertPool | ||
if config.CaCertFile != "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't if I understand the I suggest to refactor this to one of the two:
|
||
cert, err := ioutil.ReadFile(config.CaCertFile) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read clientCaCertFile %s: %w", config.CaCertFile, err) | ||
} | ||
clientCaCertPool = x509.NewCertPool() | ||
clientCaCertPool.AppendCertsFromPEM(cert) | ||
} | ||
httpClient := http.Client{ | ||
Transport: &http.Transport{ | ||
ResponseHeaderTimeout: config.Timeout, | ||
DialContext: (&net.Dialer{Timeout: config.Timeout}).DialContext, | ||
TLSClientConfig: &tls.Config{ | ||
InsecureSkipVerify: config.InsecureSkipVerify, | ||
GetClientCertificate: clientCertFn, | ||
ClientCAs: clientCaCertPool, | ||
}, | ||
}, | ||
Timeout: config.Timeout, | ||
} | ||
opts := []elasticV7.ClientOptionFunc{ | ||
elasticV7.SetHttpClient(&httpClient), | ||
elasticV7.SetErrorLog(logger), | ||
elasticV7.SetURL(config.Addresses...), | ||
elasticV7.SetScheme("https"), | ||
elasticV7.SetSniff(config.Sniffing), | ||
elasticV7.SetHealthcheck(config.Healtchecks), | ||
} | ||
if config.Debug { | ||
opts = append(opts, elasticV7.SetTraceLog(logger), elasticV7.SetInfoLog(logger)) | ||
} | ||
if config.Username != "" || config.Password != "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should not this be mutual exclusive to tls auth? |
||
opts = append(opts, elasticV7.SetBasicAuth(config.Username, config.Password)) | ||
} | ||
cli, err := elasticV7.NewClient(opts...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &v7Client{client: cli, logger: logger}, nil | ||
} | ||
|
||
type v7Client struct { | ||
logger logrus.FieldLogger | ||
client *elasticV7.Client | ||
} | ||
|
||
func (v *v7Client) RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we are not really consistent with this across the slo-exporter code, but please add documention for the public methods. |
||
filters := []elasticV7.Query{ | ||
elasticV7.NewRangeQuery(timestampField).From(since), | ||
} | ||
if query != "" { | ||
filters = append(filters, elasticV7.NewQueryStringQuery(query)) | ||
} | ||
q := elasticV7.NewBoolQuery().Filter(filters...) | ||
start := time.Now() | ||
result, err := v.client.Search().Index(index).TimeoutInMillis(int(timeout.Milliseconds())).Size(size).Sort(timestampField, true).Query(q).Do(ctx) | ||
if err != nil { | ||
ElasticApiCall.WithLabelValues("v7", "rangeSearch", err.Error()).Observe(time.Since(start).Seconds()) | ||
return nil, 0, err | ||
} | ||
ElasticApiCall.WithLabelValues("v7", "rangeSearch", "").Observe(time.Since(start).Seconds()) | ||
v.logger.WithFields(logrus.Fields{"index": index, "hits": len(result.Hits.Hits), "duration_ms": result.TookInMillis, "query": query, "since": since}).Debug("elastic search range search call") | ||
msgs := make([]json.RawMessage, len(result.Hits.Hits)) | ||
for i, h := range result.Hits.Hits { | ||
msgs[i] = h.Source | ||
} | ||
return msgs, int(result.TotalHits()) - len(result.Hits.Hits), err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.