Skip to content

Commit

Permalink
Merge branch 'main' into otlploghttp-transform
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 12, 2024
2 parents 2ef33c8 + 7da00d9 commit 663db5c
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 24 deletions.
169 changes: 145 additions & 24 deletions exporters/otlp/otlplog/otlploghttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,61 @@ import (
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
)

// Default values.
var (
defaultEndpoint = "localhost:4318"
defaultPath = "/v1/logs"
defaultTimeout = 10 * time.Second
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
defaultRetryCfg = RetryConfig(retry.DefaultConfig)
)

// Option applies an option to the Exporter.
type Option interface {
applyHTTPOption(config) config
}

type fnOpt func(config) config

func (f fnOpt) applyHTTPOption(c config) config { return f(c) }

type config struct {
// TODO: implement.
endpoint setting[string]
path setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[RetryConfig]
}

func newConfig(options []Option) config {
var c config
for _, opt := range options {
c = opt.applyHTTPOption(c)
}

c.endpoint = c.endpoint.Resolve(
fallback[string](defaultEndpoint),
)
c.path = c.path.Resolve(
fallback[string](defaultPath),
)
c.timeout = c.timeout.Resolve(
fallback[time.Duration](defaultTimeout),
)
c.proxy = c.proxy.Resolve(
fallback[HTTPTransportProxyFunc](defaultProxy),
)
c.retryCfg = c.retryCfg.Resolve(
fallback[RetryConfig](defaultRetryCfg),
)

return c
}

Expand All @@ -41,8 +80,10 @@ func newConfig(options []Option) config {
// By default, if an environment variable is not set, and this option is not
// passed, "localhost:4318" will be used.
func WithEndpoint(endpoint string) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.endpoint = newSetting(endpoint)
return c
})
}

// WithEndpointURL sets the target endpoint URL the Exporter will connect to.
Expand All @@ -59,15 +100,34 @@ func WithEndpoint(endpoint string) Option {
//
// By default, if an environment variable is not set, and this option is not
// passed, "localhost:4318" will be used.
func WithEndpointURL(u string) Option {
// TODO: implement.
return nil
func WithEndpointURL(rawURL string) Option {
u, err := url.Parse(rawURL)
if err != nil {
global.Error(err, "otlpmetric: parse endpoint url", "url", rawURL)
return fnOpt(func(c config) config { return c })
}
return fnOpt(func(c config) config {
c.endpoint = newSetting(u.Host)
c.path = newSetting(u.Path)
if u.Scheme != "https" {
c.insecure = newSetting(true)
} else {
c.insecure = newSetting(false)
}
return c
})
}

// Compression describes the compression used for payloads sent to the
// collector.
// Compression describes the compression used for exported payloads.
type Compression int

const (
// NoCompression represents that no compression should be used.
NoCompression Compression = iota
// GzipCompression represents that gzip compression should be used.
GzipCompression
)

// WithCompression sets the compression strategy the Exporter will use to
// compress the HTTP body.
//
Expand All @@ -80,8 +140,10 @@ type Compression int
// By default, if an environment variable is not set, and this option is not
// passed, no compression strategy will be used.
func WithCompression(compression Compression) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.compression = newSetting(compression)
return c
})
}

// WithURLPath sets the URL path the Exporter will send requests to.
Expand All @@ -94,8 +156,10 @@ func WithCompression(compression Compression) Option {
// By default, if an environment variable is not set, and this option is not
// passed, "/v1/logs" will be used.
func WithURLPath(urlPath string) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.path = newSetting(urlPath)
return c
})
}

// WithTLSClientConfig sets the TLS configuration the Exporter will use for
Expand All @@ -110,8 +174,10 @@ func WithURLPath(urlPath string) Option {
// By default, if an environment variable is not set, and this option is not
// passed, the system default configuration is used.
func WithTLSClientConfig(tlsCfg *tls.Config) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.tlsCfg = newSetting(tlsCfg.Clone())
return c
})
}

// WithInsecure disables client transport security for the Exporter's HTTP
Expand All @@ -126,8 +192,10 @@ func WithTLSClientConfig(tlsCfg *tls.Config) Option {
// By default, if an environment variable is not set, and this option is not
// passed, client security will be used.
func WithInsecure() Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.insecure = newSetting(true)
return c
})
}

// WithHeaders will send the provided headers with each HTTP requests.
Expand All @@ -142,8 +210,10 @@ func WithInsecure() Option {
// By default, if an environment variable is not set, and this option is not
// passed, no user headers will be set.
func WithHeaders(headers map[string]string) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.headers = newSetting(headers)
return c
})
}

// WithTimeout sets the max amount of time an Exporter will attempt an export.
Expand All @@ -161,8 +231,10 @@ func WithHeaders(headers map[string]string) Option {
// By default, if an environment variable is not set, and this option is not
// passed, a timeout of 10 seconds will be used.
func WithTimeout(duration time.Duration) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.timeout = newSetting(duration)
return c
})
}

// RetryConfig defines configuration for retrying the export of log data that
Expand All @@ -180,8 +252,10 @@ type RetryConfig retry.Config
// 5 seconds after receiving a retryable error and increase exponentially
// after each error for no more than a total time of 1 minute.
func WithRetry(rc RetryConfig) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.retryCfg = newSetting(rc)
return c
})
}

// HTTPTransportProxyFunc is a function that resolves which URL to use as proxy
Expand All @@ -193,6 +267,53 @@ type HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
// proxy to use for an HTTP request. If this option is not used, the client
// will use [http.ProxyFromEnvironment].
func WithProxy(pf HTTPTransportProxyFunc) Option {
// TODO: implement.
return nil
return fnOpt(func(c config) config {
c.proxy = newSetting(pf)
return c
})
}

// setting is a configuration setting value.
type setting[T any] struct {
Value T
Set bool
}

// newSetting returns a new setting with the value set.
func newSetting[T any](value T) setting[T] {
return setting[T]{Value: value, Set: true}
}

// resolver returns an updated setting after applying an resolution operation.
type resolver[T any] func(setting[T]) setting[T]

// Resolve returns a resolved version of s.
//
// It will apply all the passed fn in the order provided, chaining together the
// return setting to the next input. The setting s is used as the initial
// argument to the first fn.
//
// Each fn needs to validate if it should apply given the Set state of the
// setting. This will not perform any checks on the set state when chaining
// function.
func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] {
for _, f := range fn {
s = f(s)
}
return s
}

// fallback returns a resolve that will set a setting value to val if it is not
// already set.
//
// This is usually passed at the end of a resolver chain to ensure a default is
// applied if the setting has not already been set.
func fallback[T any](val T) resolver[T] {
return func(s setting[T]) setting[T] {
if !s.Set {
s.Value = val
s.Set = true
}
return s
}
}
127 changes: 127 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlploghttp

import (
"crypto/tls"
"net/http"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewConfig(t *testing.T) {
tlsCfg := &tls.Config{}
headers := map[string]string{"a": "A"}
rc := RetryConfig{}

testcases := []struct {
name string
options []Option
envars map[string]string
want config
}{
{
name: "Defaults",
want: config{
endpoint: newSetting(defaultEndpoint),
path: newSetting(defaultPath),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
},
},
{
name: "Options",
options: []Option{
WithEndpoint("test"),
WithURLPath("/path"),
WithInsecure(),
WithTLSClientConfig(tlsCfg),
WithCompression(GzipCompression),
WithHeaders(headers),
WithTimeout(time.Second),
WithRetry(rc),
// Do not test WithProxy. Requires func comparison.
},
want: config{
endpoint: newSetting("test"),
path: newSetting("/path"),
insecure: newSetting(true),
tlsCfg: newSetting(tlsCfg),
headers: newSetting(headers),
compression: newSetting(GzipCompression),
timeout: newSetting(time.Second),
retryCfg: newSetting(rc),
},
},
{
name: "WithEndpointURL",
options: []Option{
WithEndpointURL("http://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
},
},
{
name: "EndpointPrecidence",
options: []Option{
WithEndpointURL("https://test:8080/path"),
WithEndpoint("not-test:9090"),
WithURLPath("/alt"),
WithInsecure(),
},
want: config{
endpoint: newSetting("not-test:9090"),
path: newSetting("/alt"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
},
},
{
name: "EndpointURLPrecidence",
options: []Option{
WithEndpoint("not-test:9090"),
WithURLPath("/alt"),
WithInsecure(),
WithEndpointURL("https://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for key, value := range tc.envars {
t.Setenv(key, value)
}
c := newConfig(tc.options)
// Cannot compare funcs
c.proxy = setting[HTTPTransportProxyFunc]{}
assert.Equal(t, tc.want, c)
})
}
}

func TestWithProxy(t *testing.T) {
proxy := func(*http.Request) (*url.URL, error) { return nil, nil }
opts := []Option{WithProxy(HTTPTransportProxyFunc(proxy))}
c := newConfig(opts)

assert.True(t, c.proxy.Set)
assert.NotNil(t, c.proxy.Value)
}

0 comments on commit 663db5c

Please sign in to comment.