Skip to content

Commit

Permalink
added test plus code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Oct 24, 2023
1 parent b863035 commit 3cc44f0
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 68 deletions.
11 changes: 6 additions & 5 deletions pkg/api/encode_otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package api

type OtlpEncode struct {
GrpcAddress string `yaml:"grpcaddress" json:"grpcaddress" doc:"address of Otlp Collector"`
HttpAddress string `yaml:"httpaddress" json:"httpaddress" doc:"address of Otlp Collector; grpcaddress and httpaddress are mututally exclusive"`
//Path string `yaml:"path" json:"path" doc:"path component of URI"`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
Headers map[string]string `yaml:"headers" json:"headers" doc:"headers to add to messages (optional)"`
// grpcaddress and httpaddress are mutually exclusive
// TODO: Should we change these 2 fields to 3 fields: protocol (grpc/http), address, port?
GrpcAddress string `yaml:"grpcaddress" json:"grpcaddress" doc:"address of Otlp Collector (including port) using grpc protocol"`
HttpAddress string `yaml:"httpaddress" json:"httpaddress" doc:"address of Otlp Collector (including port) using http protocol"`
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
Headers map[string]string `yaml:"headers" json:"headers" doc:"headers to add to messages (optional)"`
}
117 changes: 117 additions & 0 deletions pkg/pipeline/encode/opentelemetry/encode_otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2021 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
"encoding/json"
"testing"

otel "github.com/agoda-com/opentelemetry-logs-go"
"github.com/agoda-com/opentelemetry-logs-go/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)

const testOtlpConfig = `---
log-level: debug
pipeline:
- name: encode1
parameters:
- name: encode1
encode:
type: otlp
otlp:
grpcaddress: localhost:4317
`

type fakeOltpLoggerProvider struct {
}
type fakeOltpLogger struct {
}

var receivedData []logs.LogRecord

func (f *fakeOltpLogger) Emit(msg logs.LogRecord) {
receivedData = append(receivedData, msg)
}

func (f *fakeOltpLoggerProvider) Logger(name string, options ...logs.LoggerOption) logs.Logger {
return &fakeOltpLogger{}
}

func initNewEncodeOltp(t *testing.T) encode.Encoder {
receivedData = []logs.LogRecord{}
v, cfg := test.InitConfig(t, testOtlpConfig)
require.NotNil(t, v)

newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg.Parameters[0])
require.NoError(t, err)

// intercept the loggerProvider function
loggerProvider := fakeOltpLoggerProvider{}
otel.SetLoggerProvider(&loggerProvider)
return newEncode
}

func Test_EncodeOtlp(t *testing.T) {
newEncode := initNewEncodeOltp(t)
encodeOtlp := newEncode.(*EncodeOtlpLogs)
require.Equal(t, "localhost:4317", encodeOtlp.cfg.GrpcAddress)
require.Nil(t, encodeOtlp.cfg.TLS)
require.Empty(t, encodeOtlp.cfg.HttpAddress)
require.Empty(t, encodeOtlp.cfg.Headers)

entry1 := test.GetIngestMockEntry(true)
entry2 := test.GetIngestMockEntry(false)
newEncode.Encode(entry1)
newEncode.Encode(entry2)
// verify contents of the output
require.Len(t, receivedData, 2)
expected1, _ := json.Marshal(entry1)
expected2, _ := json.Marshal(entry2)
require.Equal(t, string(expected1), *receivedData[0].Body())
require.Equal(t, string(expected2), *receivedData[1].Body())
}

func Test_TLSConfigEmpty(t *testing.T) {
cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", TLS: nil}}}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_TLSConfigNotEmpty(t *testing.T) {
cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", TLS: &api.ClientTLS{InsecureSkipVerify: true}}}}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}

func Test_HeadersNotEmpty(t *testing.T) {
headers := make(map[string]string)
headers["key1"] = "value1"
headers["key2"] = "value2"
cfg := config.StageParam{Encode: &config.Encode{Otlp: &api.OtlpEncode{HttpAddress: "1.2.3.4:999", Headers: headers}}}
newEncode, err := NewEncodeOtlpLogs(operational.NewMetrics(&config.MetricsSettings{}), cfg)
require.NoError(t, err)
require.NotNil(t, newEncode)
}
7 changes: 3 additions & 4 deletions pkg/pipeline/encode/opentelemetry/encode_otlplogs.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023IBM, Inc.
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,12 +54,12 @@ func NewEncodeOtlpLogs(opMetrics *operational.Metrics, params config.StageParam)
if params.Encode != nil && params.Encode.Otlp != nil {
cfg = *params.Encode.Otlp
}
log.Infof("cfg = %v \n", cfg)
log.Debugf("NewEncodeOtlpLogs cfg = %v \n", cfg)

ctx := context.Background()
res := newResource()

lp, err := NewOltpLoggerProvider(ctx, params, res)
lp, err := NewOtlpLoggerProvider(ctx, params, res)
if err != nil {
log.Fatal(err)
return nil, err
Expand All @@ -82,7 +82,6 @@ func newResource() *resource.Resource {
semconv.SchemaURL,
semconv.ServiceName(flpOtlpResourceName),
semconv.ServiceVersion(flpOtlpResourceVersion),
//attribute.String("environment", "demo"),
),
)
return r
Expand Down
94 changes: 35 additions & 59 deletions pkg/pipeline/encode/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package opentelemetry

import (
Expand All @@ -6,70 +23,31 @@ import (
"fmt"
"time"

otel2 "github.com/agoda-com/opentelemetry-logs-go"
otel "github.com/agoda-com/opentelemetry-logs-go"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogsgrpc"
"github.com/agoda-com/opentelemetry-logs-go/exporters/otlp/otlplogs/otlplogshttp"
"github.com/agoda-com/opentelemetry-logs-go/logs"
sdklog "github.com/agoda-com/opentelemetry-logs-go/sdk/logs"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/metric"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"google.golang.org/grpc/credentials"
)

// Note:
// As of the writing of this module, go.opentelemetry.io does not provide interfaces for logs.
// We therefore temporarily use agoda-com/opentelemetry-logs-go for logs.
// When go.opentelemetry.io provides interfaces for logs, the code here should be updated to use those interfaces.

const (
flpOtlpLoggerName = "flp-oltp-logger"
flpOtlpLoggerName = "flp-otlp-logger"
)

func NewOltpTraceProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdktrace.TracerProvider, error) {
traceExporter, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithEndpoint(params.Encode.Otlp.GrpcAddress),
// TODO: tls stuff
otlptracegrpc.WithInsecure(),
)

if err != nil {
return nil, err
}
traceProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(res),
)
otel.SetTracerProvider(traceProvider)
return traceProvider, nil
}

func NewOltpMetricsProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*metric.MeterProvider, error) {

metricExporter, err := otlpmetricgrpc.New(ctx,
otlpmetricgrpc.WithEndpoint(params.Encode.Otlp.GrpcAddress),
// TODO: add tls support
otlpmetricgrpc.WithInsecure(),
)
if err != nil {
return nil, err
}

meterProvider := metric.NewMeterProvider(
metric.WithResource(res),
metric.WithReader(metric.NewPeriodicReader(metricExporter,
// Default is 1m. Set to 3s for demonstrative purposes.
metric.WithInterval(3*time.Second))),
)
otel.SetMeterProvider(meterProvider)
return meterProvider, nil
}

func NewOltpLoggerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdklog.LoggerProvider, error) {
func NewOtlpLoggerProvider(ctx context.Context, params config.StageParam, res *resource.Resource) (*sdklog.LoggerProvider, error) {
var expOption otlplogs.ExporterOption
fmt.Printf("otlp headers = %v \n", params.Encode.Otlp.Headers)
if params.Encode.Otlp.GrpcAddress != "" {
var tlsOption otlplogsgrpc.Option
tlsOption = otlplogsgrpc.WithInsecure()
Expand Down Expand Up @@ -115,36 +93,35 @@ func NewOltpLoggerProvider(ctx context.Context, params config.StageParam, res *r
sdklog.WithBatcher(logExporter),
sdklog.WithResource(newResource()),
)
otel2.SetLoggerProvider(loggerProvider)
otel.SetLoggerProvider(loggerProvider)
return loggerProvider, nil
}

func (e *EncodeOtlpLogs) LogWrite(entry config.GenericMap) {
now := time.Now()
sn := logs.INFO
st := "INFO"
msgbytearray, _ := json.Marshal(entry)
msg := string(msgbytearray)
msgByteArray, _ := json.Marshal(entry)
msg := string(msgByteArray)
// TODO: Decide whether the content should be delivered as Body or as Attributes
lrc := logs.LogRecordConfig{
//Timestamp: &now, // take timestamp from entry?
//Timestamp: &now, // take timestamp from entry, if present?
ObservedTimestamp: now,
SeverityNumber: &sn,
SeverityText: &st,
Resource: e.res,
//Attributes: obtainAttributes(entry),
Body: &msg,
// Body: empty_string?
Body: &msg,
Attributes: obtainAttributes(entry),
}
logRecord := logs.NewLogRecord(lrc)

logger := otel2.GetLoggerProvider().Logger(
logger := otel.GetLoggerProvider().Logger(
flpOtlpLoggerName,
logs.WithSchemaURL(semconv.SchemaURL),
)
logger.Emit(logRecord)
}

/*
func obtainAttributes(entry config.GenericMap) *[]attribute.KeyValue {
// convert the entry fields to Attributes of the message
var att = make([]attribute.KeyValue, len(entry))
Expand Down Expand Up @@ -172,4 +149,3 @@ func obtainAttributes(entry config.GenericMap) *[]attribute.KeyValue {
addjustedAtt := att[0:index]
return &addjustedAtt
}
*/

0 comments on commit 3cc44f0

Please sign in to comment.