Skip to content

Commit

Permalink
ozontech#631 added loki output
Browse files Browse the repository at this point in the history
  • Loading branch information
romanchechyotkin committed Jul 22, 2024
1 parent 39bc75d commit 9815d27
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 28 deletions.
1 change: 1 addition & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
_ "github.com/ozontech/file.d/plugin/output/file"
_ "github.com/ozontech/file.d/plugin/output/gelf"
_ "github.com/ozontech/file.d/plugin/output/kafka"
_ "github.com/ozontech/file.d/plugin/output/loki"
_ "github.com/ozontech/file.d/plugin/output/postgres"
_ "github.com/ozontech/file.d/plugin/output/s3"
_ "github.com/ozontech/file.d/plugin/output/splunk"
Expand Down
75 changes: 47 additions & 28 deletions plugin/output/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -37,21 +38,24 @@ type data struct {
type Config struct {
// > @3@4@5@6
// >
// > TCP Loki host
// > A full URI address of Loki
// >
// > Example host
// > Example address
// >
// > 127.0.0.1 or localhost
Host string `json:"host" required:"true"` // *
// > http://127.0.0.1:3100 or https://loki:3100
Address string `json:"address" required:"true"` // *

// > @3@4@5@6
// >
// > Loki port
// > Array of labels to send logs
// >
// > Example port
// > Example labels
// >
// > 3100
Port string `json:"port" required:"true"` // *
// > label=value
Labels []struct {
Label string `json:"label" required:"true"`
Value string `json:"value" required:"true"`
} `json:"labels" required:"true"`

// > @3@4@5@6
// >
Expand Down Expand Up @@ -214,6 +218,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
backoffOpts,
onError,
)

p.batcher.Start(context.Background())
}

func (p *Plugin) Stop() {
Expand Down Expand Up @@ -243,8 +249,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
outBuf := data.outBuf[:0]

batch.ForEach(func(event *pipeline.Event) {
root.AddField("event").MutateToNode(event.Root.Node)
outBuf = root.Encode(outBuf)
outBuf = root.MutateToNode(event.Root.Node).Encode(outBuf)
_ = root.DecodeString("{}")
})
insaneJSON.Release(root)
Expand All @@ -255,7 +260,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
code, err := p.send(context.Background(), outBuf)
if err != nil {
p.sendErrorMetric.Inc()
p.logger.Errorf("can't send data to Loki address=%s: %v", fmt.Sprintf("%s:%s", p.config.Host, p.config.Port), err.Error())
p.logger.Errorf("can't send data to Loki address=%s: %v", p.config.Address, err.Error())

// skip retries for bad request
if code == http.StatusBadRequest {
Expand All @@ -269,9 +274,28 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
}

func (p *Plugin) send(ctx context.Context, data []byte) (int, error) {
output := map[string]interface{}{
"streams": []map[string]interface{}{
{
"stream": p.labels(),
"values": [][]interface{}{
{
fmt.Sprintf(`%d`, time.Now().UnixNano()),
string(data),
},
},
},
},
}

data, err := json.MarshalIndent(output, "", " ")
if err != nil {
return 0, err
}

r := bytes.NewReader(data)

url := fmt.Sprintf("http://%s:%s/loki/api/v1/push", p.config.Host, p.config.Port)
url := fmt.Sprintf("%s/loki/api/v1/push", p.config.Address)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, r)
if err != nil {
Expand All @@ -297,25 +321,10 @@ func (p *Plugin) send(ctx context.Context, data []byte) (int, error) {
return resp.StatusCode, fmt.Errorf("can't read response: %w", err)
}

if resp.StatusCode != http.StatusOK {
if resp.StatusCode != http.StatusNoContent {
return resp.StatusCode, fmt.Errorf("bad response: code=%s, body=%s", resp.Status, b)
}

root, err := insaneJSON.DecodeBytes(b)
defer insaneJSON.Release(root)
if err != nil {
return resp.StatusCode, fmt.Errorf("can't decode response: %w", err)
}

code := root.Dig("code")
if code == nil {
return resp.StatusCode, fmt.Errorf("invalid response format, expecting json with 'code' field, got: %s", string(b))
}

if code.AsInt() > 0 {
return resp.StatusCode, fmt.Errorf("error while sending to splunk: %s", string(b))
}

return resp.StatusCode, nil
}

Expand All @@ -339,3 +348,13 @@ func (p *Plugin) newClient(timeout time.Duration) *http.Client {

return client
}

func (p *Plugin) labels() map[string]string {
labels := make(map[string]string, len(p.config.Labels))

for _, v := range p.config.Labels {
labels[v.Label] = v.Value
}

return labels
}

0 comments on commit 9815d27

Please sign in to comment.