Skip to content

Commit

Permalink
added otlp encode stages
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Nov 15, 2023
1 parent 06ac688 commit 912c0ac
Show file tree
Hide file tree
Showing 10 changed files with 1,062 additions and 7 deletions.
16 changes: 16 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### metrics_dropped
| **Name** | metrics_dropped |
|:---|:---|
| **Description** | Number of metrics dropped |
| **Type** | counter |
| **Labels** | stage |


### metrics_processed
| **Name** | metrics_processed |
|:---|:---|
| **Description** | Number of metrics processed |
| **Type** | counter |
| **Labels** | stage |


### metrics_processed
| **Name** | metrics_processed |
|:---|:---|
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
FakeType = "fake"
KafkaType = "kafka"
S3Type = "s3"
OtlpLogsType = "otlplogs"
OtlpMetricsType = "otlpmetrics"
OtlpTracesType = "otlptraces"
StdoutType = "stdout"
LokiType = "loki"
IpfixType = "ipfix"
Expand Down
11 changes: 7 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ type Extract struct {
}

type Encode struct {
Type string `yaml:"type" json:"type"`
Prom *api.PromEncode `yaml:"prom,omitempty" json:"prom,omitempty"`
Kafka *api.EncodeKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
S3 *api.EncodeS3 `yaml:"s3,omitempty" json:"s3,omitempty"`
Type string `yaml:"type" json:"type"`
Prom *api.PromEncode `yaml:"prom,omitempty" json:"prom,omitempty"`
Kafka *api.EncodeKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
S3 *api.EncodeS3 `yaml:"s3,omitempty" json:"s3,omitempty"`
OtlpLogs *api.EncodeOtlpLogs `yaml:"otlplogs,omitempty" json:"otlplogs,omitempty"`

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpLogs

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpLogs

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpLogs

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpLogs

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpLogs

Check failure on line 123 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpLogs
OtlpMetrics *api.EncodeOtlpMetrics `yaml:"otlpmetrics,omitempty" json:"otlpmetrics,omitempty"`

Check failure on line 124 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpMetrics

Check failure on line 124 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpMetrics

Check failure on line 124 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpMetrics

Check failure on line 124 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpMetrics
OtlpTraces *api.EncodeOtlpTraces `yaml:"otlptraces,omitempty" json:"otlptraces,omitempty"`

Check failure on line 125 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpTraces) (typecheck)

Check failure on line 125 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / e2e-tests (1.20)

undefined: api.EncodeOtlpTraces) (typecheck)

Check failure on line 125 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpTraces) (typecheck)

Check failure on line 125 in pkg/config/config.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20)

undefined: api.EncodeOtlpTraces) (typecheck)
}

type Write struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *
floatVal = floatVal / info.ValueScale
}

entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
entryLabels, key := ExtractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand All @@ -196,7 +196,7 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m
return nil, nil
}

entryLabels, key := e.extractLabelsAndKey(flow, &info.PromMetricsItem)
entryLabels, key := ExtractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok = e.mCache.UpdateCacheEntry(key, func() { m.Delete(entryLabels) })
if !ok {
Expand Down Expand Up @@ -224,7 +224,7 @@ func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *metricInf
return val
}

func (e *EncodeProm) extractLabelsAndKey(flow config.GenericMap, info *api.PromMetricsItem) (map[string]string, string) {
func ExtractLabelsAndKey(flow config.GenericMap, info *api.PromMetricsItem) (map[string]string, string) {
entryLabels := make(map[string]string, len(info.Labels))
key := strings.Builder{}
key.WriteString(info.Name)
Expand Down
201 changes: 201 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
"encoding/json"
"testing"

otel "github.com/agoda-com/opentelemetry-logs-go"
"github.com/agoda-com/opentelemetry-logs-go/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testOtlpConfig = `---
log-level: debug
pipeline:
- name: encode1
parameters:
- name: encode1
encode:
type: otlplogs
otlplogs:
address: localhost
port: 4317
connectionType: grpc
`

type fakeOltpLoggerProvider struct {
}
type fakeOltpLogger struct {
}

var receivedData []logs.LogRecord

func (f *fakeOltpLogger) Emit(msg logs.LogRecord) {
receivedData = append(receivedData, msg)
}

func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger {
return &fakeOltpLogger{}
}

func initNewEncodeOtlpLogs(t *testing.T) encode.Encoder {
receivedData = []logs.LogRecord{}
v, cfg := test.InitConfig(t, testOtlpConfig)
require.NotNil(t, v)

newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0])
require.NoError(t, err)

// intercept the loggerProvider function
loggerProvider := fakeOltpLoggerProvider{}
otel.SetLoggerProvider(&loggerProvider)
return newEncode
}

func Test_EncodeOtlpLogs(t *testing.T) {
newEncode := initNewEncodeOtlpLogs(t)
encodeOtlp := newEncode.(*EncodeOtlpLogs)
require.Equal(t, "localhost", encodeOtlp.cfg.OtlpConnectionInfo.Address)
require.Equal(t, 4317, encodeOtlp.cfg.OtlpConnectionInfo.Port)
require.Equal(t, "grpc", encodeOtlp.cfg.ConnectionType)
require.Nil(t, encodeOtlp.cfg.TLS)
require.Empty(t, encodeOtlp.cfg.Headers)

entry1 := test.GetIngestMockEntry(true)
entry2 := test.GetIngestMockEntry(false)
newEncode.Encode(entry1)
newEncode.Encode(entry2)
// verify contents of the output
require.Len(t, receivedData, 2)
expected1, _ := json.Marshal(entry1)
expected2, _ := json.Marshal(entry2)
require.Equal(t, string(expected1), *receivedData[0].Body())
require.Equal(t, string(expected2), *receivedData[1].Body())
}

func Test_TLSConfigEmpty(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
TLS: nil,
ConnectionType: "grpc",
Headers: nil,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_TLSConfigNotEmpty(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
TLS: &api.ClientTLS{InsecureSkipVerify: true},
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_HeadersNotEmpty(t *testing.T) {
headers := make(map[string]string)
headers["key1"] = "value1"
headers["key2"] = "value2"
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: headers,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_NoConnectionType(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpLogs: &api.EncodeOtlpLogs{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
}}},
}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.Error(t, err)
require.Nil(t, newEncode)
}

func Test_EncodeOtlpTraces(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpTraces: &api.EncodeOtlpTraces{OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: nil,
}}},
}
newEncode, err := NewEncodeOtlpTraces(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_EncodeOtlpMetrics(t *testing.T) {
cfg := config.StageParam{
Encode: &config.Encode{
OtlpMetrics: &api.EncodeOtlpMetrics{
OtlpConnectionInfo: &api.OtlpConnectionInfo{
Address: "1.2.3.4",
Port: 999,
ConnectionType: "grpc",
Headers: nil,
},
Prefix: "flp_test",
Metrics: []api.PromMetricsItem{
{Name: "metric1", Type: "counter", Labels: []string{"label11", "label12"}},
{Name: "metric2", Type: "gauge", Labels: []string{"label21", "label22"}},
{Name: "metric3", Type: "counter", Labels: []string{"label31", "label32"}},
},
}},
}
newEncode, err := NewEncodeOtlpMetrics(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
em := newEncode.(*EncodeOtlpMetrics)
require.Equal(t, 2, len(em.counters))
require.Equal(t, 1, len(em.gauges))
require.Equal(t, "metric3", em.counters[1].info.Name)
require.Equal(t, []string{"label21", "label22"}, em.gauges[0].info.Labels)
}
68 changes: 68 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlplogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
"context"

sdklog "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/resource"
)

type EncodeOtlpLogs struct {
cfg api.EncodeOtlpLogs
ctx context.Context
res *resource.Resource
lp *sdklog.LoggerProvider
}

// Encode encodes a log entry to be exported
func (e *EncodeOtlpLogs) Encode(entry config.GenericMap) {
log.Tracef("entering EncodeOtlpLogs. entry = %v", entry)
e.LogWrite(entry)
}

func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) (encode.Encoder, error) {
log.Tracef("entering NewEncodeOtlpLogs \n")
cfg := api.EncodeOtlpLogs{}
if params.Encode != nil && params.Encode.OtlpLogs != nil {
cfg = *params.Encode.OtlpLogs
}
log.Debugf("NewEncodeOtlpLogs cfg = %v \n", cfg)

ctx := context.Background()
res := newResource()

lp, err := NewOtlpLoggerProvider(ctx, params, res)
if err != nil {
return nil, err
}

w := &EncodeOtlpLogs{
cfg: cfg,
ctx: ctx,
res: res,
lp: lp,
}
return w, nil
}
Loading

0 comments on commit 912c0ac

Please sign in to comment.