From 3cc44f0d47cf98715d9970678bda203af8004ad4 Mon Sep 17 00:00:00 2001 From: Kalman Meth Date: Tue, 24 Oct 2023 13:04:04 +0300 Subject: [PATCH] added test plus code cleanup --- pkg/api/encode_otlp.go | 11 +- .../encode/opentelemetry/encode_otlp_test.go | 117 ++++++++++++++++++ .../encode/opentelemetry/encode_otlplogs.go | 7 +- .../encode/opentelemetry/opentelemetry.go | 94 ++++++-------- 4 files changed, 161 insertions(+), 68 deletions(-) create mode 100644 pkg/pipeline/encode/opentelemetry/encode_otlp_test.go diff --git a/pkg/api/encode_otlp.go b/pkg/api/encode_otlp.go index 5ff1299e4..a0d6a334d 100644 --- a/pkg/api/encode_otlp.go +++ b/pkg/api/encode_otlp.go @@ -18,9 +18,10 @@ package api type OtlpEncode struct { - GrpcAddress string `yaml:"grpcaddress" json:"grpcaddress" doc:"address of Otlp Collector"` - HttpAddress string `yaml:"httpaddress" json:"httpaddress" doc:"address of Otlp Collector; grpcaddress and httpaddress are mututally exclusive"` - //Path string `yaml:"path" json:"path" doc:"path component of URI"` - TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"` - Headers map[string]string `yaml:"headers" json:"headers" doc:"headers to add to messages (optional)"` + // grpcaddress and httpaddress are mutually exclusive + // TODO: Should we change these 2 fields to 3 fields: protocol (grpc/http), address, port? + GrpcAddress string `yaml:"grpcaddress" json:"grpcaddress" doc:"address of Otlp Collector (including port) using grpc protocol"` + HttpAddress string `yaml:"httpaddress" json:"httpaddress" doc:"address of Otlp Collector (including port) using http protocol"` + TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"` + Headers map[string]string `yaml:"headers" json:"headers" doc:"headers to add to messages (optional)"` } diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go new file mode 100644 index 000000000..1510dbc11 --- /dev/null +++ b/pkg/pipeline/encode/opentelemetry/encode_otlp_test.go @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package opentelemetry + +import ( + "encoding/json" + "testing" + + otel "github.com/agoda-com/opentelemetry-logs-go" + "github.com/agoda-com/opentelemetry-logs-go/logs" + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/operational" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +const testOtlpConfig = `--- +log-level: debug +pipeline: + - name: encode1 +parameters: + - name: encode1 + encode: + type: otlp + otlp: + grpcaddress: localhost:4317 +` + +type fakeOltpLoggerProvider struct { +} +type fakeOltpLogger struct { +} + +var receivedData []logs.LogRecord + +func (f *fakeOltpLogger) Emit(msg logs.LogRecord) { + receivedData = append(receivedData, msg) +} + +func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger { + return &fakeOltpLogger{} +} + +func initNewEncodeOltp(t *testing.T) encode.Encoder { + receivedData = []logs.LogRecord{} + v, cfg := test.InitConfig(t, testOtlpConfig) + require.NotNil(t, v) + + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0]) + require.NoError(t, err) + + // intercept the loggerProvider function + loggerProvider := fakeOltpLoggerProvider{} + otel.SetLoggerProvider(&loggerProvider) + return newEncode +} + +func Test_EncodeOtlp(t *testing.T) { + newEncode := initNewEncodeOltp(t) + encodeOtlp := newEncode.(*EncodeOtlpLogs) + require.Equal(t, "localhost:4317", encodeOtlp.cfg.GrpcAddress) + require.Nil(t, encodeOtlp.cfg.TLS) + require.Empty(t, encodeOtlp.cfg.HttpAddress) + require.Empty(t, encodeOtlp.cfg.Headers) + + entry1 := test.GetIngestMockEntry(true) + entry2 := test.GetIngestMockEntry(false) + newEncode.Encode(entry1) + newEncode.Encode(entry2) + // verify contents of the output + require.Len(t, receivedData, 2) + expected1, _ := json.Marshal(entry1) + expected2, _ := json.Marshal(entry2) + require.Equal(t, string(expected1), *receivedData[0].Body()) + require.Equal(t, string(expected2), *receivedData[1].Body()) +} + +func Test_TLSConfigEmpty(t *testing.T) { + cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", TLS: nil}}} + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} + +func Test_TLSConfigNotEmpty(t *testing.T) { + cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", TLS: &api.ClientTLS{InsecureSkipVerify: true}}}} + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} + +func Test_HeadersNotEmpty(t *testing.T) { + headers := make(map[string]string) + headers["key1"] = "value1" + headers["key2"] = "value2" + cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", Headers: headers}}} + newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg) + require.NoError(t, err) + require.NotNil(t, newEncode) +} diff --git a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go index 2c5940fee..bc62af8c0 100644 --- a/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go +++ b/pkg/pipeline/encode/opentelemetry/encode_otlplogs.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023IBM, Inc. + * Copyright (C) 2023 IBM, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,12 +54,12 @@ func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam) if params.Encode != nil && params.Encode.Otlp != nil { cfg = *params.Encode.Otlp } - log.Infof("cfg = %v \n", cfg) + log.Debugf("NewEncodeOtlpLogs cfg = %v \n", cfg) ctx := context.Background() res := newResource() - lp, err := NewOltpLoggerProvider(ctx, params, res) + lp, err := NewOtlpLoggerProvider(ctx, params, res) if err != nil { log.Fatal(err) return nil, err @@ -82,7 +82,6 @@ func newResource() *resource.Resource { semconv.SchemaURL, semconv.ServiceName(flpOtlpResourceName), semconv.ServiceVersion(flpOtlpResourceVersion), - //attribute.String("environment", "demo"), ), ) return r diff --git a/pkg/pipeline/encode/opentelemetry/opentelemetry.go b/pkg/pipeline/encode/opentelemetry/opentelemetry.go index 5dc478c32..c01888fd4 100644 --- a/pkg/pipeline/encode/opentelemetry/opentelemetry.go +++ b/pkg/pipeline/encode/opentelemetry/opentelemetry.go @@ -1,3 +1,20 @@ +/* + * Copyright (C) 2023 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package opentelemetry import ( @@ -6,70 +23,31 @@ import ( "fmt" "time" - otel2 "github.com/agoda-com/opentelemetry-logs-go" + otel "github.com/agoda-com/opentelemetry-logs-go" "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs" "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogsgrpc" "github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp" "github.com/agoda-com/opentelemetry-logs-go/logs" sdklog "github.com/agoda-com/opentelemetry-logs-go/sdk/logs" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/sdk/metric" + "github.com/netobserv/flowlogs-pipeline/pkg/utils" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" "google.golang.org/grpc/credentials" ) +// Note: +// As of the writing of this module, go.opentelemetry.io does not provide interfaces for logs. +// We therefore temporarily use agoda-com/opentelemetry-logs-go for logs. +// When go.opentelemetry.io provides interfaces for logs, the code here should be updated to use those interfaces. + const ( - flpOtlpLoggerName = "flp-oltp-logger" + flpOtlpLoggerName = "flp-otlp-logger" ) -func NewOltpTraceProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdktrace.TracerProvider, error) { - traceExporter, err := otlptracegrpc.New( - ctx, - otlptracegrpc.WithEndpoint(params.Encode.Otlp.GrpcAddress), - // TODO: tls stuff - otlptracegrpc.WithInsecure(), - ) - - if err != nil { - return nil, err - } - traceProvider := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(traceExporter), - sdktrace.WithResource(res), - ) - otel.SetTracerProvider(traceProvider) - return traceProvider, nil -} - -func NewOltpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*metric.MeterProvider, error) { - - metricExporter, err := otlpmetricgrpc.New(ctx, - otlpmetricgrpc.WithEndpoint(params.Encode.Otlp.GrpcAddress), - // TODO: add tls support - otlpmetricgrpc.WithInsecure(), - ) - if err != nil { - return nil, err - } - - meterProvider := metric.NewMeterProvider( - metric.WithResource(res), - metric.WithReader(metric.NewPeriodicReader(metricExporter, - // Default is 1m. Set to 3s for demonstrative purposes. - metric.WithInterval(3*time.Second))), - ) - otel.SetMeterProvider(meterProvider) - return meterProvider, nil -} - -func NewOltpLoggerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdklog.LoggerProvider, error) { +func NewOtlpLoggerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdklog.LoggerProvider, error) { var expOption otlplogs.ExporterOption - fmt.Printf("otlp headers = %v \n", params.Encode.Otlp.Headers) if params.Encode.Otlp.GrpcAddress != "" { var tlsOption otlplogsgrpc.Option tlsOption = otlplogsgrpc.WithInsecure() @@ -115,7 +93,7 @@ func NewOltpLoggerProvider(ctx context.Context, params config.StageParam, res *r sdklog.WithBatcher(logExporter), sdklog.WithResource(newResource()), ) - otel2.SetLoggerProvider(loggerProvider) + otel.SetLoggerProvider(loggerProvider) return loggerProvider, nil } @@ -123,28 +101,27 @@ func (e *EncodeOtlpLogs) LogWrite(entry config.GenericMap) { now := time.Now() sn := logs.INFO st := "INFO" - msgbytearray, _ := json.Marshal(entry) - msg := string(msgbytearray) + msgByteArray, _ := json.Marshal(entry) + msg := string(msgByteArray) + // TODO: Decide whether the content should be delivered as Body or as Attributes lrc := logs.LogRecordConfig{ - //Timestamp: &now, // take timestamp from entry? + //Timestamp: &now, // take timestamp from entry, if present? ObservedTimestamp: now, SeverityNumber: &sn, SeverityText: &st, Resource: e.res, - //Attributes: obtainAttributes(entry), - Body: &msg, - // Body: empty_string? + Body: &msg, + Attributes: obtainAttributes(entry), } logRecord := logs.NewLogRecord(lrc) - logger := otel2.GetLoggerProvider().Logger( + logger := otel.GetLoggerProvider().Logger( flpOtlpLoggerName, logs.WithSchemaURL(semconv.SchemaURL), ) logger.Emit(logRecord) } -/* func obtainAttributes(entry config.GenericMap) *[]attribute.KeyValue { // convert the entry fields to Attributes of the message var att = make([]attribute.KeyValue, len(entry)) @@ -172,4 +149,3 @@ func obtainAttributes(entry config.GenericMap) *[]attribute.KeyValue { addjustedAtt := att[0:index] return &addjustedAtt } -*/