Skip to content

Commit 6feb7c2

Browse files
codebotenMrAlias
andauthored
otelconf: add logger provider internal funcs (open-telemetry#8089)
Signed-off-by: alex boten <[email protected]> Co-authored-by: Tyler Yahn <[email protected]>
1 parent 62b9009 commit 6feb7c2

File tree

3 files changed

+1187
-1
lines changed

3 files changed

+1187
-1
lines changed

otelconf/config_common.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,32 @@
44
package otelconf // import "go.opentelemetry.io/contrib/otelconf"
55

66
import (
7+
"context"
8+
"errors"
79
"fmt"
810
"reflect"
11+
12+
"go.opentelemetry.io/otel/baggage"
13+
sdklog "go.opentelemetry.io/otel/sdk/log"
14+
)
15+
16+
const (
17+
compressionGzip = "gzip"
18+
compressionNone = "none"
919
)
1020

21+
type configOptions struct {
22+
ctx context.Context
23+
opentelemetryConfig OpenTelemetryConfiguration
24+
loggerProviderOptions []sdklog.LoggerProviderOption
25+
}
26+
27+
type shutdownFunc func(context.Context) error
28+
29+
func noopShutdown(context.Context) error {
30+
return nil
31+
}
32+
1133
type errBound struct {
1234
Field string
1335
Bound int
@@ -85,7 +107,7 @@ type errInvalid struct {
85107
}
86108

87109
func (e *errInvalid) Error() string {
88-
return "invalid " + e.Identifier
110+
return "invalid config: " + e.Identifier
89111
}
90112

91113
func (e *errInvalid) Is(target error) bool {
@@ -201,3 +223,25 @@ func validateSpanLimits(plain *SpanLimits) error {
201223
func ptr[T any](v T) *T {
202224
return &v
203225
}
226+
227+
// createHeadersConfig combines the two header config fields. Headers take precedence over headersList.
228+
func createHeadersConfig(headers []NameStringValuePair, headersList *string) (map[string]string, error) {
229+
result := make(map[string]string)
230+
if headersList != nil {
231+
// Parsing follows https://github.com/open-telemetry/opentelemetry-configuration/blob/568e5080816d40d75792eb754fc96bde09654159/schema/type_descriptions.yaml#L584.
232+
headerslist, err := baggage.Parse(*headersList)
233+
if err != nil {
234+
return nil, errors.Join(newErrInvalid("invalid headers_list"), err)
235+
}
236+
for _, kv := range headerslist.Members() {
237+
result[kv.Key()] = kv.Value()
238+
}
239+
}
240+
// Headers take precedence over HeadersList, so this has to be after HeadersList is processed.
241+
for _, kv := range headers {
242+
if kv.Value != nil {
243+
result[kv.Name] = *kv.Value
244+
}
245+
}
246+
return result, nil
247+
}

otelconf/log.go

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otelconf // import "go.opentelemetry.io/contrib/otelconf"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"net/url"
11+
"time"
12+
13+
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
14+
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
15+
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
16+
"go.opentelemetry.io/otel/log"
17+
"go.opentelemetry.io/otel/log/noop"
18+
sdklog "go.opentelemetry.io/otel/sdk/log"
19+
"go.opentelemetry.io/otel/sdk/resource"
20+
"google.golang.org/grpc/credentials"
21+
22+
"go.opentelemetry.io/contrib/otelconf/internal/tls"
23+
)
24+
25+
func loggerProvider(cfg configOptions, res *resource.Resource) (log.LoggerProvider, shutdownFunc, error) {
26+
if cfg.opentelemetryConfig.LoggerProvider == nil {
27+
return noop.NewLoggerProvider(), noopShutdown, nil
28+
}
29+
provider, ok := cfg.opentelemetryConfig.LoggerProvider.(*LoggerProviderJson)
30+
if !ok {
31+
return noop.NewLoggerProvider(), noopShutdown, newErrInvalid("logger_provider")
32+
}
33+
opts := append(cfg.loggerProviderOptions, sdklog.WithResource(res))
34+
35+
var errs []error
36+
for _, processor := range provider.Processors {
37+
sp, err := logProcessor(cfg.ctx, processor)
38+
if err == nil {
39+
opts = append(opts, sdklog.WithProcessor(sp))
40+
} else {
41+
errs = append(errs, err)
42+
}
43+
}
44+
45+
if len(errs) > 0 {
46+
return noop.NewLoggerProvider(), noopShutdown, errors.Join(errs...)
47+
}
48+
49+
lp := sdklog.NewLoggerProvider(opts...)
50+
return lp, lp.Shutdown, nil
51+
}
52+
53+
func logProcessor(ctx context.Context, processor LogRecordProcessor) (sdklog.Processor, error) {
54+
if processor.Batch != nil && processor.Simple != nil {
55+
return nil, newErrInvalid("must not specify multiple log processor type")
56+
}
57+
if processor.Batch != nil {
58+
exp, err := logExporter(ctx, processor.Batch.Exporter)
59+
if err != nil {
60+
return nil, err
61+
}
62+
return batchLogProcessor(processor.Batch, exp)
63+
}
64+
if processor.Simple != nil {
65+
exp, err := logExporter(ctx, processor.Simple.Exporter)
66+
if err != nil {
67+
return nil, err
68+
}
69+
return sdklog.NewSimpleProcessor(exp), nil
70+
}
71+
return nil, newErrInvalid("unsupported log processor type, must be one of simple or batch")
72+
}
73+
74+
func logExporter(ctx context.Context, exporter LogRecordExporter) (sdklog.Exporter, error) {
75+
exportersConfigured := 0
76+
var exportFunc func() (sdklog.Exporter, error)
77+
78+
if exporter.Console != nil {
79+
exportersConfigured++
80+
exportFunc = func() (sdklog.Exporter, error) {
81+
return stdoutlog.New(
82+
stdoutlog.WithPrettyPrint(),
83+
)
84+
}
85+
}
86+
87+
if exporter.OTLPHttp != nil {
88+
exportersConfigured++
89+
exportFunc = func() (sdklog.Exporter, error) {
90+
return otlpHTTPLogExporter(ctx, exporter.OTLPHttp)
91+
}
92+
}
93+
if exporter.OTLPGrpc != nil {
94+
exportersConfigured++
95+
exportFunc = func() (sdklog.Exporter, error) {
96+
return otlpGRPCLogExporter(ctx, exporter.OTLPGrpc)
97+
}
98+
}
99+
if exporter.OTLPFileDevelopment != nil {
100+
// TODO: implement file exporter https://github.com/open-telemetry/opentelemetry-go/issues/5408
101+
return nil, newErrInvalid("otlp_file/development")
102+
}
103+
104+
if exportersConfigured > 1 {
105+
return nil, newErrInvalid("must not specify multiple exporters")
106+
}
107+
108+
if exportFunc != nil {
109+
return exportFunc()
110+
}
111+
112+
return nil, newErrInvalid("no valid log exporter")
113+
}
114+
115+
func batchLogProcessor(blp *BatchLogRecordProcessor, exp sdklog.Exporter) (*sdklog.BatchProcessor, error) {
116+
var opts []sdklog.BatchProcessorOption
117+
if err := validateBatchLogRecordProcessor(blp); err != nil {
118+
return nil, err
119+
}
120+
if blp.ExportTimeout != nil {
121+
opts = append(opts, sdklog.WithExportTimeout(time.Millisecond*time.Duration(*blp.ExportTimeout)))
122+
}
123+
if blp.MaxExportBatchSize != nil {
124+
opts = append(opts, sdklog.WithExportMaxBatchSize(*blp.MaxExportBatchSize))
125+
}
126+
if blp.MaxQueueSize != nil {
127+
opts = append(opts, sdklog.WithMaxQueueSize(*blp.MaxQueueSize))
128+
}
129+
130+
if blp.ScheduleDelay != nil {
131+
opts = append(opts, sdklog.WithExportInterval(time.Millisecond*time.Duration(*blp.ScheduleDelay)))
132+
}
133+
134+
return sdklog.NewBatchProcessor(exp, opts...), nil
135+
}
136+
137+
func otlpHTTPLogExporter(ctx context.Context, otlpConfig *OTLPHttpExporter) (sdklog.Exporter, error) {
138+
var opts []otlploghttp.Option
139+
140+
if otlpConfig.Endpoint != nil {
141+
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
142+
if err != nil {
143+
return nil, errors.Join(newErrInvalid("endpoint parsing failed"), err)
144+
}
145+
opts = append(opts, otlploghttp.WithEndpoint(u.Host))
146+
147+
if u.Scheme == "http" {
148+
opts = append(opts, otlploghttp.WithInsecure())
149+
}
150+
if u.Path != "" {
151+
opts = append(opts, otlploghttp.WithURLPath(u.Path))
152+
}
153+
}
154+
if otlpConfig.Compression != nil {
155+
switch *otlpConfig.Compression {
156+
case compressionGzip:
157+
opts = append(opts, otlploghttp.WithCompression(otlploghttp.GzipCompression))
158+
case compressionNone:
159+
opts = append(opts, otlploghttp.WithCompression(otlploghttp.NoCompression))
160+
default:
161+
return nil, newErrInvalid(fmt.Sprintf("unsupported compression %q", *otlpConfig.Compression))
162+
}
163+
}
164+
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
165+
opts = append(opts, otlploghttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
166+
}
167+
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
168+
if err != nil {
169+
return nil, err
170+
}
171+
if len(headersConfig) > 0 {
172+
opts = append(opts, otlploghttp.WithHeaders(headersConfig))
173+
}
174+
175+
tlsConfig, err := tls.CreateConfig(otlpConfig.CertificateFile, otlpConfig.ClientCertificateFile, otlpConfig.ClientKeyFile)
176+
if err != nil {
177+
return nil, errors.Join(newErrInvalid("tls configuration"), err)
178+
}
179+
opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig))
180+
181+
return otlploghttp.New(ctx, opts...)
182+
}
183+
184+
func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLPGrpcExporter) (sdklog.Exporter, error) {
185+
var opts []otlploggrpc.Option
186+
187+
if otlpConfig.Endpoint != nil {
188+
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
189+
if err != nil {
190+
return nil, errors.Join(newErrInvalid("endpoint parsing failed"), err)
191+
}
192+
// ParseRequestURI leaves the Host field empty when no
193+
// scheme is specified (i.e. localhost:4317). This check is
194+
// here to support the case where a user may not specify a
195+
// scheme. The code does its best effort here by using
196+
// otlpConfig.Endpoint as-is in that case
197+
if u.Host != "" {
198+
opts = append(opts, otlploggrpc.WithEndpoint(u.Host))
199+
} else {
200+
opts = append(opts, otlploggrpc.WithEndpoint(*otlpConfig.Endpoint))
201+
}
202+
if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) {
203+
opts = append(opts, otlploggrpc.WithInsecure())
204+
}
205+
}
206+
if otlpConfig.Compression != nil {
207+
switch *otlpConfig.Compression {
208+
case compressionGzip:
209+
opts = append(opts, otlploggrpc.WithCompressor(*otlpConfig.Compression))
210+
case compressionNone:
211+
// none requires no options
212+
default:
213+
return nil, newErrInvalid(fmt.Sprintf("unsupported compression %q", *otlpConfig.Compression))
214+
}
215+
}
216+
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
217+
opts = append(opts, otlploggrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
218+
}
219+
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
220+
if err != nil {
221+
return nil, err
222+
}
223+
if len(headersConfig) > 0 {
224+
opts = append(opts, otlploggrpc.WithHeaders(headersConfig))
225+
}
226+
227+
if otlpConfig.CertificateFile != nil || otlpConfig.ClientCertificateFile != nil || otlpConfig.ClientKeyFile != nil {
228+
tlsConfig, err := tls.CreateConfig(otlpConfig.CertificateFile, otlpConfig.ClientCertificateFile, otlpConfig.ClientKeyFile)
229+
if err != nil {
230+
return nil, errors.Join(newErrInvalid("tls configuration"), err)
231+
}
232+
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
233+
}
234+
235+
return otlploggrpc.New(ctx, opts...)
236+
}

0 commit comments

Comments
 (0)