From f2a8cf2533f8180de672cfa485210c1b78ee6559 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 16 Sep 2024 10:34:35 -0400 Subject: [PATCH] Uses the new parsing structure for RBAC parsing (#3206) * begin interface change * move over processor config * use it * fix tests * Generics * rename * builder * Change to use the builder pattern instead of the option pattern * remove unused methods * fix unit tests * rename somethings * missed a spot * fix builder --- apis/v1beta1/config.go | 42 +++- internal/components/builder.go | 122 ++++++++++ internal/components/builder_test.go | 217 ++++++++++++++++++ internal/components/component.go | 65 +----- internal/components/exporters/helpers.go | 4 +- internal/components/exporters/helpers_test.go | 2 +- internal/components/generic_parser.go | 23 +- internal/components/generic_parser_test.go | 117 +++++++++- internal/components/multi_endpoint.go | 46 +++- internal/components/multi_endpoint_test.go | 189 +++++++++------ internal/components/processors/helpers.go | 50 ++++ .../components/processors/helpers_test.go | 85 +++++++ .../components/processors/k8sattribute.go | 81 +++++++ .../processors/k8sattribute_test.go | 151 ++++++++++++ .../processors/resourcedetection.go | 52 +++++ .../processors/resourcedetection_test.go | 121 ++++++++++ internal/components/receivers/helpers.go | 157 +++++++------ internal/components/single_endpoint.go | 33 +-- internal/components/single_endpoint_test.go | 106 ++++----- internal/manifests/collector/rbac.go | 26 +-- 20 files changed, 1367 insertions(+), 322 deletions(-) create mode 100644 internal/components/builder.go create mode 100644 internal/components/builder_test.go create mode 100644 internal/components/processors/helpers.go create mode 100644 internal/components/processors/helpers_test.go create mode 100644 internal/components/processors/k8sattribute.go create mode 100644 internal/components/processors/k8sattribute_test.go create mode 100644 internal/components/processors/resourcedetection.go create mode 100644 internal/components/processors/resourcedetection_test.go diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 61fecb3ad1..fce6a610f4 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -27,9 +27,11 @@ import ( "github.com/go-logr/logr" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "github.com/open-telemetry/opentelemetry-operator/internal/components" "github.com/open-telemetry/opentelemetry-operator/internal/components/exporters" + "github.com/open-telemetry/opentelemetry-operator/internal/components/processors" "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" ) @@ -139,9 +141,43 @@ type Config struct { Service Service `json:"service" yaml:"service"` } +// getRbacRulesForComponentKinds gets the RBAC Rules for the given ComponentKind(s). +func (c *Config) getRbacRulesForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) ([]rbacv1.PolicyRule, error) { + var rules []rbacv1.PolicyRule + enabledComponents := c.GetEnabledComponents() + for _, componentKind := range componentKinds { + var retriever components.ParserRetriever + var cfg AnyConfig + switch componentKind { + case KindReceiver: + retriever = receivers.ReceiverFor + cfg = c.Receivers + case KindExporter: + retriever = exporters.ParserFor + cfg = c.Exporters + case KindProcessor: + retriever = processors.ProcessorFor + if c.Processors == nil { + cfg = AnyConfig{} + } else { + cfg = *c.Processors + } + } + for componentName := range enabledComponents[componentKind] { + // TODO: Clean up the naming here and make it simpler to use a retriever. + parser := retriever(componentName) + if parsedRules, err := parser.GetRBACRules(logger, cfg.Object[componentName]); err != nil { + return nil, err + } else { + rules = append(rules, parsedRules...) + } + } + } + return rules, nil +} + // getPortsForComponentKinds gets the ports for the given ComponentKind(s). func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) ([]corev1.ServicePort, error) { - var ports []corev1.ServicePort enabledComponents := c.GetEnabledComponents() for _, componentKind := range componentKinds { @@ -187,6 +223,10 @@ func (c *Config) GetAllPorts(logger logr.Logger) ([]corev1.ServicePort, error) { return c.getPortsForComponentKinds(logger, KindReceiver, KindExporter) } +func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error) { + return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor) +} + // Yaml encodes the current object and returns it as a string. func (c *Config) Yaml() (string, error) { var buf bytes.Buffer diff --git a/internal/components/builder.go b/internal/components/builder.go new file mode 100644 index 0000000000..7dc3ba186e --- /dev/null +++ b/internal/components/builder.go @@ -0,0 +1,122 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType]) + +type Settings[ComponentConfigType any] struct { + protocol corev1.Protocol + appProtocol *string + targetPort intstr.IntOrString + nodePort int32 + name string + port int32 + portParser PortParser[ComponentConfigType] + rbacGen RBACRuleGenerator[ComponentConfigType] +} + +func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] { + return &Settings[ComponentConfigType]{} +} + +func (o *Settings[ComponentConfigType]) Apply(opts ...ParserOption[ComponentConfigType]) { + for _, opt := range opts { + opt(o) + } +} + +func (o *Settings[ComponentConfigType]) GetServicePort() *corev1.ServicePort { + return &corev1.ServicePort{ + Name: naming.PortName(o.name, o.port), + Port: o.port, + Protocol: o.protocol, + AppProtocol: o.appProtocol, + TargetPort: o.targetPort, + NodePort: o.nodePort, + } +} + +type Builder[ComponentConfigType any] []ParserOption[ComponentConfigType] + +func NewBuilder[ComponentConfigType any]() Builder[ComponentConfigType] { + return []ParserOption[ComponentConfigType]{} +} + +func (b Builder[ComponentConfigType]) WithProtocol(protocol corev1.Protocol) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.protocol = protocol + }) +} +func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.appProtocol = appProtocol + }) +} +func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.targetPort = intstr.FromInt32(targetPort) + }) +} +func (b Builder[ComponentConfigType]) WithNodePort(nodePort int32) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.nodePort = nodePort + }) +} +func (b Builder[ComponentConfigType]) WithName(name string) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.name = name + }) +} +func (b Builder[ComponentConfigType]) WithPort(port int32) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.port = port + }) +} +func (b Builder[ComponentConfigType]) WithPortParser(portParser PortParser[ComponentConfigType]) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.portParser = portParser + }) +} +func (b Builder[ComponentConfigType]) WithRbacGen(rbacGen RBACRuleGenerator[ComponentConfigType]) Builder[ComponentConfigType] { + return append(b, func(o *Settings[ComponentConfigType]) { + o.rbacGen = rbacGen + }) +} + +func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) { + o := NewEmptySettings[ComponentConfigType]() + o.Apply(b...) + if len(o.name) == 0 { + return nil, fmt.Errorf("invalid settings struct, no name specified") + } + return &GenericParser[ComponentConfigType]{name: o.name, portParser: o.portParser, rbacGen: o.rbacGen, settings: o}, nil +} + +func (b Builder[ComponentConfigType]) MustBuild() *GenericParser[ComponentConfigType] { + if p, err := b.Build(); err != nil { + panic(err) + } else { + return p + } +} diff --git a/internal/components/builder_test.go b/internal/components/builder_test.go new file mode 100644 index 0000000000..d600db4a70 --- /dev/null +++ b/internal/components/builder_test.go @@ -0,0 +1,217 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package components_test + +import ( + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +func TestBuilder_Build(t *testing.T) { + type sampleConfig struct { + example string + number int + m map[string]interface{} + } + type want struct { + name string + ports []corev1.ServicePort + rules []rbacv1.PolicyRule + } + type fields[T any] struct { + b components.Builder[T] + } + type params struct { + conf interface{} + } + type testCase[T any] struct { + name string + fields fields[T] + params params + want want + wantErr assert.ErrorAssertionFunc + wantRbacErr assert.ErrorAssertionFunc + } + examplePortParser := func(logger logr.Logger, name string, defaultPort *corev1.ServicePort, config sampleConfig) ([]corev1.ServicePort, error) { + if defaultPort != nil { + return []corev1.ServicePort{*defaultPort}, nil + } + return nil, nil + } + tests := []testCase[sampleConfig]{ + { + name: "basic valid configuration", + fields: fields[sampleConfig]{ + b: components.NewBuilder[sampleConfig](). + WithPortParser(examplePortParser). + WithName("test-service"). + WithPort(80). + WithNodePort(80). + WithProtocol(corev1.ProtocolTCP), + }, + params: params{ + conf: sampleConfig{}, + }, + want: want{ + name: "__test-service", + ports: []corev1.ServicePort{ + { + Name: "test-service", + Port: 80, + NodePort: 80, + Protocol: corev1.ProtocolTCP, + }, + }, + rules: nil, + }, + wantErr: assert.NoError, + wantRbacErr: assert.NoError, + }, + { + name: "missing name", + fields: fields[sampleConfig]{ + b: components.NewBuilder[sampleConfig](). + WithPort(8080). + WithProtocol(corev1.ProtocolUDP), + }, + params: params{ + conf: sampleConfig{}, + }, + want: want{}, + wantErr: assert.Error, + wantRbacErr: assert.NoError, + }, + { + name: "complete configuration with RBAC rules", + fields: fields[sampleConfig]{ + b: components.NewBuilder[sampleConfig](). + WithName("secure-service"). + WithPort(443). + WithProtocol(corev1.ProtocolTCP). + WithRbacGen(func(logger logr.Logger, config sampleConfig) ([]rbacv1.PolicyRule, error) { + rules := []rbacv1.PolicyRule{ + { + NonResourceURLs: []string{config.example}, + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + } + if config.number > 100 { + rules = append(rules, rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "list"}, + }) + } + return rules, nil + }), + }, + params: params{ + conf: sampleConfig{ + example: "test", + number: 100, + m: map[string]interface{}{ + "key": "value", + }, + }, + }, + want: want{ + name: "__secure-service", + ports: nil, + rules: []rbacv1.PolicyRule{ + { + NonResourceURLs: []string{"test"}, + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + }, + }, + wantErr: assert.NoError, + wantRbacErr: assert.NoError, + }, + { + name: "complete configuration with RBAC rules errors", + fields: fields[sampleConfig]{ + b: components.NewBuilder[sampleConfig](). + WithName("secure-service"). + WithPort(443). + WithProtocol(corev1.ProtocolTCP). + WithRbacGen(func(logger logr.Logger, config sampleConfig) ([]rbacv1.PolicyRule, error) { + rules := []rbacv1.PolicyRule{ + { + NonResourceURLs: []string{config.example}, + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + } + if v, ok := config.m["key"]; ok && v == "value" { + return nil, fmt.Errorf("errors from function") + } + return rules, nil + }), + }, + params: params{ + conf: sampleConfig{ + example: "test", + number: 100, + m: map[string]interface{}{ + "key": "value", + }, + }, + }, + want: want{ + name: "__secure-service", + ports: nil, + rules: nil, + }, + wantErr: assert.NoError, + wantRbacErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.fields.b.Build() + if tt.wantErr(t, err, "WantErr()") && err != nil { + return + } + assert.Equalf(t, tt.want.name, got.ParserName(), "ParserName()") + ports, err := got.Ports(logr.Discard(), got.ParserType(), tt.params.conf) + assert.NoError(t, err) + assert.Equalf(t, tt.want.ports, ports, "Ports()") + rules, rbacErr := got.GetRBACRules(logr.Discard(), tt.params.conf) + if tt.wantRbacErr(t, rbacErr, "WantRbacErr()") && rbacErr != nil { + return + } + assert.Equalf(t, tt.want.rules, rules, "GetRBACRules()") + }) + } +} + +func TestMustBuildPanics(t *testing.T) { + b := components.Builder[*components.SingleEndpointConfig]{} + assert.Panics(t, func() { + b.MustBuild() + }) +} diff --git a/internal/components/component.go b/internal/components/component.go index 7e9ea78c74..36d3744839 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -22,9 +22,8 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/util/intstr" - - "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) var ( @@ -34,66 +33,17 @@ var ( PortNotFoundErr = errors.New("port should not be empty") ) -// PortParser is a function that returns a list of servicePorts given a config of type T. -type PortParser[T any] func(logger logr.Logger, name string, o *Option, config T) ([]corev1.ServicePort, error) - type PortRetriever interface { GetPortNum() (int32, error) GetPortNumOrDefault(logr.Logger, int32) int32 } -type Option struct { - protocol corev1.Protocol - appProtocol *string - targetPort intstr.IntOrString - nodePort int32 - name string - port int32 -} - -func NewOption(name string, port int32) *Option { - return &Option{ - name: name, - port: port, - } -} - -func (o *Option) Apply(opts ...PortBuilderOption) { - for _, opt := range opts { - opt(o) - } -} - -func (o *Option) GetServicePort() *corev1.ServicePort { - return &corev1.ServicePort{ - Name: naming.PortName(o.name, o.port), - Port: o.port, - Protocol: o.protocol, - AppProtocol: o.appProtocol, - TargetPort: o.targetPort, - NodePort: o.nodePort, - } -} - -type PortBuilderOption func(*Option) +// PortParser is a function that returns a list of servicePorts given a config of type Config. +type PortParser[ComponentConfigType any] func(logger logr.Logger, name string, defaultPort *corev1.ServicePort, config ComponentConfigType) ([]corev1.ServicePort, error) -func WithTargetPort(targetPort int32) PortBuilderOption { - return func(opt *Option) { - opt.targetPort = intstr.FromInt32(targetPort) - } -} - -func WithAppProtocol(proto *string) PortBuilderOption { - return func(opt *Option) { - opt.appProtocol = proto - } -} - -func WithProtocol(proto corev1.Protocol) PortBuilderOption { - return func(opt *Option) { - opt.protocol = proto - } -} +// RBACRuleGenerator is a function that generates a list of RBAC Rules given a configuration of type Config +// It's expected that type Config is the configuration used by a parser. +type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) ([]rbacv1.PolicyRule, error) // ComponentType returns the type for a given component name. // components have a name like: @@ -137,6 +87,9 @@ type Parser interface { // of the form "name" or "type/name" Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) + // GetRBACRules returns the rbac rules for this component + GetRBACRules(logger logr.Logger, config interface{}) ([]rbacv1.PolicyRule, error) + // ParserType returns the type of this parser ParserType() string diff --git a/internal/components/exporters/helpers.go b/internal/components/exporters/helpers.go index 3dd9ad2054..82108c5fc0 100644 --- a/internal/components/exporters/helpers.go +++ b/internal/components/exporters/helpers.go @@ -38,12 +38,12 @@ func ParserFor(name string) components.Parser { return parser } // We want the default for exporters to fail silently. - return components.NewGenericParser[any](components.ComponentType(name), components.UnsetPort, nil) + return components.NewBuilder[any]().WithName(name).MustBuild() } var ( componentParsers = []components.Parser{ - components.NewSinglePortParser("prometheus", 8888), + components.NewSinglePortParserBuilder("prometheus", 8888).MustBuild(), } ) diff --git a/internal/components/exporters/helpers_test.go b/internal/components/exporters/helpers_test.go index 80dfed3a20..77fb67aced 100644 --- a/internal/components/exporters/helpers_test.go +++ b/internal/components/exporters/helpers_test.go @@ -39,7 +39,7 @@ func TestParserForReturns(t *testing.T) { func TestCanRegister(t *testing.T) { const testComponentName = "test" - exporters.Register(testComponentName, components.NewSinglePortParser(testComponentName, 9000)) + exporters.Register(testComponentName, components.NewSinglePortParserBuilder(testComponentName, 9000).MustBuild()) assert.True(t, exporters.IsRegistered(testComponentName)) parser := exporters.ParserFor(testComponentName) assert.Equal(t, "test", parser.ParserType()) diff --git a/internal/components/generic_parser.go b/internal/components/generic_parser.go index 27c4f44b1a..4e3c3325ed 100644 --- a/internal/components/generic_parser.go +++ b/internal/components/generic_parser.go @@ -20,6 +20,7 @@ import ( "github.com/go-logr/logr" "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" ) var ( @@ -30,8 +31,20 @@ var ( // functionality to idempotent functions. type GenericParser[T any] struct { name string + settings *Settings[T] portParser PortParser[T] - option *Option + rbacGen RBACRuleGenerator[T] +} + +func (g *GenericParser[T]) GetRBACRules(logger logr.Logger, config interface{}) ([]rbacv1.PolicyRule, error) { + if g.rbacGen == nil { + return nil, nil + } + var parsed T + if err := mapstructure.Decode(config, &parsed); err != nil { + return nil, err + } + return g.rbacGen(logger, parsed) } func (g *GenericParser[T]) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { @@ -42,7 +55,7 @@ func (g *GenericParser[T]) Ports(logger logr.Logger, name string, config interfa if err := mapstructure.Decode(config, &parsed); err != nil { return nil, err } - return g.portParser(logger, name, g.option, parsed) + return g.portParser(logger, name, g.settings.GetServicePort(), parsed) } func (g *GenericParser[T]) ParserType() string { @@ -52,9 +65,3 @@ func (g *GenericParser[T]) ParserType() string { func (g *GenericParser[T]) ParserName() string { return fmt.Sprintf("__%s", g.name) } - -func NewGenericParser[T any](name string, port int32, parser PortParser[T], opts ...PortBuilderOption) *GenericParser[T] { - o := NewOption(name, port) - o.Apply(opts...) - return &GenericParser[T]{name: name, portParser: parser, option: o} -} diff --git a/internal/components/generic_parser_test.go b/internal/components/generic_parser_test.go index 9d6be1ade6..a271bcd2dd 100644 --- a/internal/components/generic_parser_test.go +++ b/internal/components/generic_parser_test.go @@ -21,6 +21,7 @@ import ( "github.com/go-logr/logr" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "github.com/open-telemetry/opentelemetry-operator/internal/components" ) @@ -41,7 +42,7 @@ func TestGenericParser_GetPorts(t *testing.T) { tests := []testCase[*components.SingleEndpointConfig]{ { name: "valid config with endpoint", - g: components.NewGenericParser[*components.SingleEndpointConfig]("test", 0, components.ParseSingleEndpoint), + g: components.NewSinglePortParserBuilder("test", 0).MustBuild(), args: args{ logger: logr.Discard(), config: map[string]interface{}{ @@ -58,7 +59,7 @@ func TestGenericParser_GetPorts(t *testing.T) { }, { name: "valid config with listen_address", - g: components.NewGenericParser[*components.SingleEndpointConfig]("test", 0, components.ParseSingleEndpoint), + g: components.NewSinglePortParserBuilder("test", 0).MustBuild(), args: args{ logger: logr.Discard(), config: map[string]interface{}{ @@ -74,8 +75,8 @@ func TestGenericParser_GetPorts(t *testing.T) { wantErr: assert.NoError, }, { - name: "valid config with listen_address with option", - g: components.NewGenericParser[*components.SingleEndpointConfig]("test", 0, components.ParseSingleEndpoint, components.WithProtocol(corev1.ProtocolUDP)), + name: "valid config with listen_address with settings", + g: components.NewSinglePortParserBuilder("test", 0).WithProtocol(corev1.ProtocolUDP).MustBuild(), args: args{ logger: logr.Discard(), config: map[string]interface{}{ @@ -93,7 +94,7 @@ func TestGenericParser_GetPorts(t *testing.T) { }, { name: "invalid config with no endpoint or listen_address", - g: components.NewGenericParser[*components.SingleEndpointConfig]("test", 0, components.ParseSingleEndpoint), + g: components.NewSinglePortParserBuilder("test", 0).MustBuild(), args: args{ logger: logr.Discard(), config: map[string]interface{}{}, @@ -113,3 +114,109 @@ func TestGenericParser_GetPorts(t *testing.T) { }) } } + +func TestGenericParser_GetRBACRules(t *testing.T) { + type args struct { + logger logr.Logger + config interface{} + } + type testCase[T any] struct { + name string + g *components.GenericParser[T] + args args + want []rbacv1.PolicyRule + wantErr assert.ErrorAssertionFunc + } + + rbacGenFunc := func(logger logr.Logger, config *components.SingleEndpointConfig) ([]rbacv1.PolicyRule, error) { + if config.Endpoint == "" && config.ListenAddress == "" { + return nil, fmt.Errorf("either endpoint or listen_address must be specified") + } + return []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + }, nil + } + + tests := []testCase[*components.SingleEndpointConfig]{ + { + name: "valid config with endpoint", + g: components.NewSinglePortParserBuilder("test", 0).WithRbacGen(rbacGenFunc).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "endpoint": "http://localhost:8080", + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "valid config with listen_address", + g: components.NewSinglePortParserBuilder("test", 0).WithRbacGen(rbacGenFunc).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{ + "listen_address": "0.0.0.0:9090", + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "invalid config with no endpoint or listen_address", + g: components.NewSinglePortParserBuilder("test", 0).WithRbacGen(rbacGenFunc).MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{}, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "Generic works", + g: components.NewBuilder[*components.SingleEndpointConfig]().WithName("test").MustBuild(), + args: args{ + logger: logr.Discard(), + config: map[string]interface{}{}, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "failed to parse config", + g: components.NewSinglePortParserBuilder("test", 0).WithRbacGen(rbacGenFunc).MustBuild(), + args: args{ + logger: logr.Discard(), + config: func() {}, + }, + want: nil, + wantErr: assert.Error, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.g.GetRBACRules(tt.args.logger, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("GetRBACRules(%v, %v)", tt.args.logger, tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetRBACRules(%v, %v)", tt.args.logger, tt.args.config) + }) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index d4e5948f81..7a10b90fd6 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -20,6 +20,7 @@ import ( "github.com/go-logr/logr" "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -71,21 +72,46 @@ func (m *MultiPortReceiver) ParserName() string { return fmt.Sprintf("__%s", m.name) } -func NewMultiPortReceiver(name string, opts ...MultiPortOption) *MultiPortReceiver { +func (m *MultiPortReceiver) GetRBACRules(logr.Logger, interface{}) ([]rbacv1.PolicyRule, error) { + return nil, nil +} + +type MultiPortBuilder[ComponentConfigType any] []Builder[ComponentConfigType] + +func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEndpointConfig] { + return append(MultiPortBuilder[*MultiProtocolEndpointConfig]{}, NewBuilder[*MultiProtocolEndpointConfig]().WithName(name)) +} + +func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] { + return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port) +} + +func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] { + return append(mp, builder) +} + +func (mp MultiPortBuilder[ComponentConfigType]) Build() (*MultiPortReceiver, error) { + if len(mp) < 1 { + return nil, fmt.Errorf("must provide at least one port mapping") + } multiReceiver := &MultiPortReceiver{ - name: name, + name: mp[0].MustBuild().name, portMappings: map[string]*corev1.ServicePort{}, } - for _, opt := range opts { - opt(multiReceiver) + for _, bu := range mp[1:] { + built, err := bu.Build() + if err != nil { + return nil, err + } + multiReceiver.portMappings[built.name] = built.settings.GetServicePort() } - return multiReceiver + return multiReceiver, nil } -func WithPortMapping(name string, port int32, opts ...PortBuilderOption) MultiPortOption { - return func(parser *MultiPortReceiver) { - o := NewOption(name, port) - o.Apply(opts...) - parser.portMappings[name] = o.GetServicePort() +func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver { + if p, err := mp.Build(); err != nil { + panic(err) + } else { + return p } } diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go index 2aac06a5d1..deabf377a4 100644 --- a/internal/components/multi_endpoint_test.go +++ b/internal/components/multi_endpoint_test.go @@ -42,8 +42,7 @@ var ( func TestMultiPortReceiver_ParserName(t *testing.T) { type fields struct { - name string - opts []components.MultiPortOption + b components.MultiPortBuilder[*components.MultiProtocolEndpointConfig] } tests := []struct { name string @@ -53,34 +52,32 @@ func TestMultiPortReceiver_ParserName(t *testing.T) { { name: "no options", fields: fields{ - name: "receiver1", - opts: nil, + b: components.NewMultiPortReceiverBuilder("receiver1"), }, want: "__receiver1", }, { name: "with port mapping without builder options", fields: fields{ - name: "receiver2", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - }, + b: components.NewMultiPortReceiverBuilder("receiver2").AddPortMapping( + components.NewProtocolBuilder("http", 80), + ), }, want: "__receiver2", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) - assert.Equalf(t, tt.want, m.ParserName(), "ParserName()") + s, err := tt.fields.b.Build() + assert.NoError(t, err) + assert.Equalf(t, tt.want, s.ParserName(), "ParserName()") }) } } func TestMultiPortReceiver_ParserType(t *testing.T) { type fields struct { - name string - opts []components.MultiPortOption + b components.MultiPortBuilder[*components.MultiProtocolEndpointConfig] } tests := []struct { name string @@ -90,26 +87,25 @@ func TestMultiPortReceiver_ParserType(t *testing.T) { { name: "no options", fields: fields{ - name: "receiver1", - opts: nil, + b: components.NewMultiPortReceiverBuilder("receiver1"), }, want: "receiver1", }, { name: "with port mapping without builder options", fields: fields{ - name: "receiver2/test", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - }, + b: components.NewMultiPortReceiverBuilder("receiver2").AddPortMapping( + components.NewProtocolBuilder("http", 80), + ), }, want: "receiver2", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) - assert.Equalf(t, tt.want, m.ParserType(), "ParserType()") + s, err := tt.fields.b.Build() + assert.NoError(t, err) + assert.Equalf(t, tt.want, s.ParserType(), "ParserType()") }) } } @@ -117,37 +113,37 @@ func TestMultiPortReceiver_ParserType(t *testing.T) { func TestMultiPortReceiver_Ports(t *testing.T) { type fields struct { name string - opts []components.MultiPortOption + b components.MultiPortBuilder[*components.MultiProtocolEndpointConfig] } type args struct { config interface{} } tests := []struct { - name string - fields fields - args args - want []corev1.ServicePort - wantErr assert.ErrorAssertionFunc + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + wantBuildErr assert.ErrorAssertionFunc }{ { name: "no options", fields: fields{ name: "receiver1", - opts: nil, + b: components.NewMultiPortReceiverBuilder("receiver1"), }, args: args{ config: nil, }, - want: nil, - wantErr: assert.NoError, + want: nil, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "single port mapping without builder options", fields: fields{ name: "receiver2", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - }, + b: components.NewMultiPortReceiverBuilder("receiver2").AddPortMapping(components.NewProtocolBuilder("http", 80)), }, args: args{ config: httpConfig, @@ -158,15 +154,16 @@ func TestMultiPortReceiver_Ports(t *testing.T) { Port: 80, }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "port mapping with target port", fields: fields{ name: "receiver3", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80, components.WithTargetPort(8080)), - }, + b: components.NewMultiPortReceiverBuilder("receiver3"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithTargetPort(8080)), }, args: args{ config: httpConfig, @@ -178,15 +175,16 @@ func TestMultiPortReceiver_Ports(t *testing.T) { TargetPort: intstr.FromInt(80), }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "port mapping with app protocol", fields: fields{ name: "receiver4", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80, components.WithAppProtocol(&components.HttpProtocol)), - }, + b: components.NewMultiPortReceiverBuilder("receiver4"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithAppProtocol(&components.HttpProtocol)), }, args: args{ config: httpConfig, @@ -198,15 +196,16 @@ func TestMultiPortReceiver_Ports(t *testing.T) { AppProtocol: &components.HttpProtocol, }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "port mapping with protocol", fields: fields{ name: "receiver5", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80, components.WithProtocol(corev1.ProtocolTCP)), - }, + b: components.NewMultiPortReceiverBuilder("receiver2"). + AddPortMapping(components.NewProtocolBuilder("http", 80). + WithProtocol(corev1.ProtocolTCP)), }, args: args{ config: httpConfig, @@ -218,19 +217,20 @@ func TestMultiPortReceiver_Ports(t *testing.T) { Protocol: corev1.ProtocolTCP, }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "multiple port mappings", fields: fields{ name: "receiver6", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - components.WithPortMapping("grpc", 4317, - components.WithTargetPort(4317), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.GrpcProtocol)), - }, + b: components.NewMultiPortReceiverBuilder("receiver6"). + AddPortMapping(components.NewProtocolBuilder("http", 80)). + AddPortMapping(components.NewProtocolBuilder("grpc", 4317). + WithTargetPort(4317). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.GrpcProtocol), + ), }, args: args{ config: httpAndGrpcConfig, @@ -248,19 +248,20 @@ func TestMultiPortReceiver_Ports(t *testing.T) { Port: 80, }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "multiple port mappings only one enabled", fields: fields{ name: "receiver6", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - components.WithPortMapping("grpc", 4317, - components.WithTargetPort(4317), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.GrpcProtocol)), - }, + b: components.NewMultiPortReceiverBuilder("receiver6"). + AddPortMapping(components.NewProtocolBuilder("http", 80)). + AddPortMapping(components.NewProtocolBuilder("grpc", 4317). + WithTargetPort(4317). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.GrpcProtocol), + ), }, args: args{ config: httpConfig, @@ -271,39 +272,40 @@ func TestMultiPortReceiver_Ports(t *testing.T) { Port: 80, }, }, - wantErr: assert.NoError, + wantBuildErr: assert.NoError, + wantErr: assert.NoError, }, { name: "error unmarshalling configuration", fields: fields{ name: "receiver1", - opts: nil, + b: components.NewMultiPortReceiverBuilder("receiver1"), }, args: args{ config: "invalid config", // Simulate an invalid config that causes LoadMap to fail }, - want: nil, - wantErr: assert.Error, + want: nil, + wantBuildErr: assert.NoError, + wantErr: assert.Error, }, { name: "error marshaling configuration", fields: fields{ name: "receiver1", - opts: nil, + b: components.NewMultiPortReceiverBuilder("receiver1"), }, args: args{ config: func() {}, // Simulate an invalid config that causes LoadMap to fail }, - want: nil, - wantErr: assert.Error, + want: nil, + wantBuildErr: assert.NoError, + wantErr: assert.Error, }, { name: "unknown protocol", fields: fields{ name: "receiver2", - opts: []components.MultiPortOption{ - components.WithPortMapping("http", 80), - }, + b: components.NewMultiPortReceiverBuilder("receiver2").AddPortMapping(components.NewProtocolBuilder("http", 80)), }, args: args{ config: map[string]interface{}{ @@ -312,18 +314,57 @@ func TestMultiPortReceiver_Ports(t *testing.T) { }, }, }, - want: nil, - wantErr: assert.Error, + want: nil, + wantBuildErr: assert.NoError, + wantErr: assert.Error, + }, + { + name: "no name set", + fields: fields{ + name: "receiver2", + b: components.MultiPortBuilder[*components.MultiProtocolEndpointConfig]{}, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: nil, + wantBuildErr: assert.Error, + }, + { + name: "bad builder", + fields: fields{ + name: "receiver2", + b: components.NewMultiPortReceiverBuilder("receiver2").AddPortMapping(components.NewBuilder[*components.MultiProtocolEndpointConfig]()), + }, + args: args{ + config: map[string]interface{}{}, + }, + want: nil, + wantErr: assert.NoError, + wantBuildErr: assert.Error, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) - got, err := m.Ports(logr.Discard(), tt.fields.name, tt.args.config) + s, err := tt.fields.b.Build() + if tt.wantBuildErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) && err != nil { + return + } + got, err := s.Ports(logr.Discard(), tt.fields.name, tt.args.config) if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { return } assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + rbacGen, err := s.GetRBACRules(logr.Discard(), tt.args.config) + assert.NoError(t, err) + assert.Nil(t, rbacGen) }) } } + +func TestMultiMustBuildPanics(t *testing.T) { + b := components.MultiPortBuilder[*components.MultiProtocolEndpointConfig]{} + assert.Panics(t, func() { + b.MustBuild() + }) +} diff --git a/internal/components/processors/helpers.go b/internal/components/processors/helpers.go new file mode 100644 index 0000000000..ab1277b186 --- /dev/null +++ b/internal/components/processors/helpers.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import "github.com/open-telemetry/opentelemetry-operator/internal/components" + +// registry holds a record of all known receiver parsers. +var registry = make(map[string]components.Parser) + +// Register adds a new parser builder to the list of known builders. +func Register(name string, p components.Parser) { + registry[name] = p +} + +// IsRegistered checks whether a parser is registered with the given name. +func IsRegistered(name string) bool { + _, ok := registry[components.ComponentType(name)] + return ok +} + +// ProcessorFor returns a parser builder for the given exporter name. +func ProcessorFor(name string) components.Parser { + if parser, ok := registry[components.ComponentType(name)]; ok { + return parser + } + return components.NewBuilder[any]().WithName(name).MustBuild() +} + +var componentParsers = []components.Parser{ + components.NewBuilder[K8sAttributeConfig]().WithName("k8sattributes").WithRbacGen(GenerateK8SAttrRbacRules).MustBuild(), + components.NewBuilder[ResourceDetectionConfig]().WithName("resourcedetection").WithRbacGen(GenerateResourceDetectionRbacRules).MustBuild(), +} + +func init() { + for _, parser := range componentParsers { + Register(parser.ParserType(), parser) + } +} diff --git a/internal/components/processors/helpers_test.go b/internal/components/processors/helpers_test.go new file mode 100644 index 0000000000..5ef0b40a8f --- /dev/null +++ b/internal/components/processors/helpers_test.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors_test + +import ( + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/processors" +) + +var logger = logf.Log.WithName("unit-tests") + +func TestParserForReturns(t *testing.T) { + const testComponentName = "test" + parser := processors.ProcessorFor(testComponentName) + assert.Equal(t, "test", parser.ParserType()) + assert.Equal(t, "__test", parser.ParserName()) + ports, err := parser.Ports(logr.Discard(), testComponentName, map[string]interface{}{ + "endpoint": "localhost:9000", + }) + assert.NoError(t, err) + assert.Len(t, ports, 0) // Should use the nop parser +} + +func TestCanRegister(t *testing.T) { + const testComponentName = "test" + processors.Register(testComponentName, components.NewSinglePortParserBuilder(testComponentName, 9000).MustBuild()) + assert.True(t, processors.IsRegistered(testComponentName)) + parser := processors.ProcessorFor(testComponentName) + assert.Equal(t, "test", parser.ParserType()) + assert.Equal(t, "__test", parser.ParserName()) + ports, err := parser.Ports(logr.Discard(), testComponentName, map[string]interface{}{}) + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.Equal(t, ports[0].Port, int32(9000)) +} + +func TestDownstreamParsers(t *testing.T) { + for _, tt := range []struct { + desc string + processorName string + parserName string + }{ + {"k8sattributes", "k8sattributes", "__k8sattributes"}, + {"resourcedetection", "resourcedetection", "__resourcedetection"}, + } { + t.Run(tt.processorName, func(t *testing.T) { + t.Run("builds successfully", func(t *testing.T) { + // test + parser := processors.ProcessorFor(tt.processorName) + + // verify + assert.Equal(t, tt.parserName, parser.ParserName()) + }) + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := processors.ProcessorFor(tt.processorName) + + // test throwing in pure junk + _, err := parser.Ports(logger, tt.processorName, func() {}) + + // verify + assert.Nil(t, err) + }) + + }) + } +} diff --git a/internal/components/processors/k8sattribute.go b/internal/components/processors/k8sattribute.go new file mode 100644 index 0000000000..f9d5266c60 --- /dev/null +++ b/internal/components/processors/k8sattribute.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import ( + "fmt" + "strings" + + "github.com/go-logr/logr" + rbacv1 "k8s.io/api/rbac/v1" +) + +type FieldExtractConfig struct { + TagName string `mapstructure:"tag_name"` + Key string `mapstructure:"key"` + KeyRegex string `mapstructure:"key_regex"` + Regex string `mapstructure:"regex"` + From string `mapstructure:"from"` +} + +type Extract struct { + Metadata []string `mapstructure:"metadata"` + Labels []FieldExtractConfig `mapstructure:"labels"` + Annotations []FieldExtractConfig `mapstructure:"annotations"` +} + +// K8sAttributeConfig is a minimal struct needed for parsing a valid k8sattribute processor configuration +// This only contains the fields necessary for parsing, other fields can be added in the future. +type K8sAttributeConfig struct { + Extract Extract `mapstructure:"extract"` +} + +func GenerateK8SAttrRbacRules(_ logr.Logger, config K8sAttributeConfig) ([]rbacv1.PolicyRule, error) { + // These policies need to be added always + var prs = []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "namespaces"}, + Verbs: []string{"get", "watch", "list"}, + }, + } + + replicasetPolicy := rbacv1.PolicyRule{ + APIGroups: []string{"apps"}, + Resources: []string{"replicasets"}, + Verbs: []string{"get", "watch", "list"}, + } + + if len(config.Extract.Metadata) == 0 { + prs = append(prs, replicasetPolicy) + } + addedReplicasetPolicy := false + for _, m := range config.Extract.Metadata { + metadataField := fmt.Sprint(m) + if (metadataField == "k8s.deployment.uid" || metadataField == "k8s.deployment.name") && !addedReplicasetPolicy { + prs = append(prs, replicasetPolicy) + addedReplicasetPolicy = true + } else if strings.Contains(metadataField, "k8s.node") { + prs = append(prs, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "watch", "list"}, + }, + ) + } + } + return prs, nil +} diff --git a/internal/components/processors/k8sattribute_test.go b/internal/components/processors/k8sattribute_test.go new file mode 100644 index 0000000000..604656f93a --- /dev/null +++ b/internal/components/processors/k8sattribute_test.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + rbacv1 "k8s.io/api/rbac/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components/processors" +) + +func TestGenerateK8SAttrRbacRules(t *testing.T) { + type args struct { + config interface{} + } + tests := []struct { + name string + args args + want []rbacv1.PolicyRule + wantErr assert.ErrorAssertionFunc + }{ + { + name: "default config with empty metadata", + args: args{ + config: map[string]interface{}{ + "extract": map[string]interface{}{ + "metadata": []string{}, + "labels": []interface{}{}, + "annotations": []interface{}{}, + }, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "namespaces"}, + Verbs: []string{"get", "watch", "list"}, + }, + { + APIGroups: []string{"apps"}, + Resources: []string{"replicasets"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with deployment metadata", + args: args{ + config: map[string]interface{}{ + "extract": map[string]interface{}{ + "metadata": []string{"k8s.deployment.uid", "k8s.deployment.name"}, + "labels": []interface{}{}, + "annotations": []interface{}{}, + }, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "namespaces"}, + Verbs: []string{"get", "watch", "list"}, + }, + { + APIGroups: []string{"apps"}, + Resources: []string{"replicasets"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with node metadata", + args: args{ + config: map[string]interface{}{ + "extract": map[string]interface{}{ + "metadata": []string{"k8s.node.name"}, + "labels": []interface{}{}, + "annotations": []interface{}{}, + }, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "namespaces"}, + Verbs: []string{"get", "watch", "list"}, + }, + { + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "invalid config", + args: args{ + config: "hi", + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "config with invalid metadata", + args: args{ + config: map[string]interface{}{ + "extract": map[string]interface{}{ + "metadata": []string{"invalid.metadata"}, + "labels": []interface{}{}, + "annotations": []interface{}{}, + }, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "namespaces"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := processors.ProcessorFor("k8sattributes") + got, err := parser.GetRBACRules(logger, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("GetRBACRules(%v)", tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetRBACRules(%v)", tt.args.config) + }) + } +} diff --git a/internal/components/processors/resourcedetection.go b/internal/components/processors/resourcedetection.go new file mode 100644 index 0000000000..5d5af17c11 --- /dev/null +++ b/internal/components/processors/resourcedetection.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import ( + "fmt" + + "github.com/go-logr/logr" + rbacv1 "k8s.io/api/rbac/v1" +) + +// ResourceDetectionConfig is a minimal struct needed for parsing a valid resourcedetection processor configuration +// This only contains the fields necessary for parsing, other fields can be added in the future. +type ResourceDetectionConfig struct { + Detectors []string `mapstructure:"detectors"` +} + +func GenerateResourceDetectionRbacRules(_ logr.Logger, config ResourceDetectionConfig) ([]rbacv1.PolicyRule, error) { + var prs []rbacv1.PolicyRule + for _, d := range config.Detectors { + detectorName := fmt.Sprint(d) + switch detectorName { + case "k8snode": + policy := rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "list"}, + } + prs = append(prs, policy) + case "openshift": + policy := rbacv1.PolicyRule{ + APIGroups: []string{"config.openshift.io"}, + Resources: []string{"infrastructures", "infrastructures/status"}, + Verbs: []string{"get", "watch", "list"}, + } + prs = append(prs, policy) + } + } + return prs, nil +} diff --git a/internal/components/processors/resourcedetection_test.go b/internal/components/processors/resourcedetection_test.go new file mode 100644 index 0000000000..4d6b448d40 --- /dev/null +++ b/internal/components/processors/resourcedetection_test.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + rbacv1 "k8s.io/api/rbac/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components/processors" +) + +func TestGenerateResourceDetectionRbacRules(t *testing.T) { + type args struct { + config map[string]interface{} + } + tests := []struct { + name string + args args + want []rbacv1.PolicyRule + wantErr assert.ErrorAssertionFunc + }{ + { + name: "default config with no detectors", + args: args{ + config: map[string]interface{}{ + "detectors": []string{}, + }, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "config with k8snode detector", + args: args{ + config: map[string]interface{}{ + "detectors": []string{"k8snode"}, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with openshift detector", + args: args{ + config: map[string]interface{}{ + "detectors": []string{"openshift"}, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{"config.openshift.io"}, + Resources: []string{"infrastructures", "infrastructures/status"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with multiple detectors", + args: args{ + config: map[string]interface{}{ + "detectors": []string{"k8snode", "openshift"}, + }, + }, + want: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{"get", "list"}, + }, + { + APIGroups: []string{"config.openshift.io"}, + Resources: []string{"infrastructures", "infrastructures/status"}, + Verbs: []string{"get", "watch", "list"}, + }, + }, + wantErr: assert.NoError, + }, + { + name: "config with invalid detector", + args: args{ + config: map[string]interface{}{ + "detectors": []string{"invalid"}, + }, + }, + want: nil, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := processors.ProcessorFor("resourcedetection") + got, err := parser.GetRBACRules(logger, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("GetRBACRules(%v)", tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetRBACRules(%v)", tt.args.config) + }) + } +} diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 6802f3a038..89a3cb6fe7 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -39,82 +39,103 @@ func ReceiverFor(name string) components.Parser { if parser, ok := registry[components.ComponentType(name)]; ok { return parser } - return components.NewSilentSinglePortParser(components.ComponentType(name), components.UnsetPort) + return components.NewSilentSinglePortParserBuilder(components.ComponentType(name), components.UnsetPort).MustBuild() } // NewScraperParser is an instance of a generic parser that returns nothing when called and never fails. func NewScraperParser(name string) *components.GenericParser[any] { - return components.NewGenericParser[any](name, components.UnsetPort, nil) + return components.NewBuilder[any]().WithName(name).WithPort(components.UnsetPort).MustBuild() } var ( componentParsers = []components.Parser{ - components.NewMultiPortReceiver("otlp", - components.WithPortMapping( - "grpc", - 4317, - components.WithAppProtocol(&components.GrpcProtocol), - components.WithTargetPort(4317), - ), components.WithPortMapping( - "http", - 4318, - components.WithAppProtocol(&components.HttpProtocol), - components.WithTargetPort(4318), - ), - ), - components.NewMultiPortReceiver("skywalking", - components.WithPortMapping(components.GrpcProtocol, 11800, - components.WithTargetPort(11800), - components.WithAppProtocol(&components.GrpcProtocol), - ), - components.WithPortMapping(components.HttpProtocol, 12800, - components.WithTargetPort(12800), - components.WithAppProtocol(&components.HttpProtocol), - )), - components.NewMultiPortReceiver("jaeger", - components.WithPortMapping(components.GrpcProtocol, 14250, - components.WithTargetPort(14250), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.GrpcProtocol), - ), - components.WithPortMapping("thrift_http", 14268, - components.WithTargetPort(14268), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.HttpProtocol), - ), - components.WithPortMapping("thrift_compact", 6831, - components.WithTargetPort(6831), - components.WithProtocol(corev1.ProtocolUDP), - ), - components.WithPortMapping("thrift_binary", 6832, - components.WithTargetPort(6832), - components.WithProtocol(corev1.ProtocolUDP), - ), - ), - components.NewMultiPortReceiver("loki", - components.WithPortMapping(components.GrpcProtocol, 9095, - components.WithTargetPort(9095), - components.WithAppProtocol(&components.GrpcProtocol), - ), - components.WithPortMapping(components.HttpProtocol, 3100, - components.WithTargetPort(3100), - components.WithAppProtocol(&components.HttpProtocol), - ), - ), - components.NewSinglePortParser("awsxray", 2000, components.WithProtocol(corev1.ProtocolUDP), components.WithTargetPort(2000)), - components.NewSinglePortParser("carbon", 2003, components.WithTargetPort(2003)), - components.NewSinglePortParser("collectd", 8081, components.WithTargetPort(8081)), - components.NewSinglePortParser("fluentforward", 8006, components.WithTargetPort(8006)), - components.NewSinglePortParser("influxdb", 8086, components.WithTargetPort(8086)), - components.NewSinglePortParser("opencensus", 55678, components.WithAppProtocol(nil), components.WithTargetPort(55678)), - components.NewSinglePortParser("sapm", 7276, components.WithTargetPort(7276)), - components.NewSinglePortParser("signalfx", 9943, components.WithTargetPort(9943)), - components.NewSinglePortParser("splunk_hec", 8088, components.WithTargetPort(8088)), - components.NewSinglePortParser("statsd", 8125, components.WithProtocol(corev1.ProtocolUDP), components.WithTargetPort(8125)), - components.NewSinglePortParser("tcplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolTCP)), - components.NewSinglePortParser("udplog", components.UnsetPort, components.WithProtocol(corev1.ProtocolUDP)), - components.NewSinglePortParser("wavefront", 2003, components.WithTargetPort(2003)), - components.NewSinglePortParser("zipkin", 9411, components.WithAppProtocol(&components.HttpProtocol), components.WithProtocol(corev1.ProtocolTCP), components.WithTargetPort(3100)), + components.NewMultiPortReceiverBuilder("otlp"). + AddPortMapping(components.NewProtocolBuilder("grpc", 4317). + WithAppProtocol(&components.GrpcProtocol). + WithTargetPort(4317)). + AddPortMapping(components.NewProtocolBuilder("http", 4318). + WithAppProtocol(&components.HttpProtocol). + WithTargetPort(4318)). + MustBuild(), + components.NewMultiPortReceiverBuilder("skywalking"). + AddPortMapping(components.NewProtocolBuilder(components.GrpcProtocol, 11800). + WithTargetPort(11800). + WithAppProtocol(&components.GrpcProtocol)). + AddPortMapping(components.NewProtocolBuilder(components.HttpProtocol, 12800). + WithTargetPort(12800). + WithAppProtocol(&components.HttpProtocol)). + MustBuild(), + components.NewMultiPortReceiverBuilder("jaeger"). + AddPortMapping(components.NewProtocolBuilder(components.GrpcProtocol, 14250). + WithTargetPort(14250). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.GrpcProtocol)). + AddPortMapping(components.NewProtocolBuilder("thrift_http", 14268). + WithTargetPort(14268). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.HttpProtocol)). + AddPortMapping(components.NewProtocolBuilder("thrift_compact", 6831). + WithTargetPort(6831). + WithProtocol(corev1.ProtocolUDP)). + AddPortMapping(components.NewProtocolBuilder("thrift_binary", 6832). + WithTargetPort(6832). + WithProtocol(corev1.ProtocolUDP)). + MustBuild(), + components.NewMultiPortReceiverBuilder("loki"). + AddPortMapping(components.NewProtocolBuilder(components.GrpcProtocol, 9095). + WithTargetPort(9095). + WithAppProtocol(&components.GrpcProtocol)). + AddPortMapping(components.NewProtocolBuilder(components.HttpProtocol, 3100). + WithTargetPort(3100). + WithAppProtocol(&components.HttpProtocol)). + MustBuild(), + components.NewSinglePortParserBuilder("awsxray", 2000). + WithTargetPort(2000). + WithProtocol(corev1.ProtocolUDP). + MustBuild(), + components.NewSinglePortParserBuilder("carbon", 2003). + WithTargetPort(2003). + MustBuild(), + components.NewSinglePortParserBuilder("collectd", 8081). + WithTargetPort(8081). + MustBuild(), + components.NewSinglePortParserBuilder("fluentforward", 8006). + WithTargetPort(8006). + MustBuild(), + components.NewSinglePortParserBuilder("influxdb", 8086). + WithTargetPort(8086). + MustBuild(), + components.NewSinglePortParserBuilder("opencensus", 55678). + WithAppProtocol(nil). + WithTargetPort(55678). + MustBuild(), + components.NewSinglePortParserBuilder("sapm", 7276). + WithTargetPort(7276). + MustBuild(), + components.NewSinglePortParserBuilder("signalfx", 9943). + WithTargetPort(9943). + MustBuild(), + components.NewSinglePortParserBuilder("splunk_hec", 8088). + WithTargetPort(8088). + MustBuild(), + components.NewSinglePortParserBuilder("statsd", 8125). + WithProtocol(corev1.ProtocolUDP). + WithTargetPort(8125). + MustBuild(), + components.NewSinglePortParserBuilder("tcplog", components.UnsetPort). + WithProtocol(corev1.ProtocolTCP). + MustBuild(), + components.NewSinglePortParserBuilder("udplog", components.UnsetPort). + WithProtocol(corev1.ProtocolUDP). + MustBuild(), + components.NewSinglePortParserBuilder("wavefront", 2003). + WithTargetPort(2003). + MustBuild(), + components.NewSinglePortParserBuilder("zipkin", 9411). + WithAppProtocol(&components.HttpProtocol). + WithProtocol(corev1.ProtocolTCP). + WithTargetPort(3100). + MustBuild(), NewScraperParser("prometheus"), NewScraperParser("kubeletstats"), NewScraperParser("sshcheck"), diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index e97df3b41f..914136b568 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -21,6 +21,10 @@ import ( "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) +var ( + _ Parser = &GenericParser[*SingleEndpointConfig]{} +) + // SingleEndpointConfig represents the minimal struct for a given YAML configuration input containing either // endpoint or listen_address. type SingleEndpointConfig struct { @@ -48,38 +52,37 @@ func (g *SingleEndpointConfig) GetPortNum() (int32, error) { return UnsetPort, PortNotFoundErr } -func ParseSingleEndpointSilent(logger logr.Logger, name string, o *Option, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { - return internalParseSingleEndpoint(logger, name, true, o, singleEndpointConfig) +func ParseSingleEndpointSilent(logger logr.Logger, name string, defaultPort *corev1.ServicePort, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { + return internalParseSingleEndpoint(logger, name, true, defaultPort, singleEndpointConfig) } -func ParseSingleEndpoint(logger logr.Logger, name string, o *Option, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { - return internalParseSingleEndpoint(logger, name, false, o, singleEndpointConfig) +func ParseSingleEndpoint(logger logr.Logger, name string, defaultPort *corev1.ServicePort, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { + return internalParseSingleEndpoint(logger, name, false, defaultPort, singleEndpointConfig) } -func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently bool, o *Option, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { +func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently bool, defaultPort *corev1.ServicePort, singleEndpointConfig *SingleEndpointConfig) ([]corev1.ServicePort, error) { if singleEndpointConfig == nil { return nil, nil } - if _, err := singleEndpointConfig.GetPortNum(); err != nil && o.port == UnsetPort { + if _, err := singleEndpointConfig.GetPortNum(); err != nil && defaultPort.Port == UnsetPort { if failSilently { - logger.WithValues("receiver", o.name).V(4).Info("couldn't parse the endpoint's port and no default port set", "error", err) + logger.WithValues("receiver", defaultPort.Name).V(4).Info("couldn't parse the endpoint's port and no default port set", "error", err) err = nil } else { - logger.WithValues("receiver", o.name).Error(err, "couldn't parse the endpoint's port and no default port set") + logger.WithValues("receiver", defaultPort.Name).Error(err, "couldn't parse the endpoint's port and no default port set") } return []corev1.ServicePort{}, err } - port := singleEndpointConfig.GetPortNumOrDefault(logger, o.port) - svcPort := o.GetServicePort() + port := singleEndpointConfig.GetPortNumOrDefault(logger, defaultPort.Port) + svcPort := defaultPort svcPort.Name = naming.PortName(name, port) return []corev1.ServicePort{ConstructServicePort(svcPort, port)}, nil } -func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *GenericParser[*SingleEndpointConfig] { - return NewGenericParser(name, port, ParseSingleEndpoint, opts...) +func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint) } -// NewSilentSinglePortParser returns a ParseSingleEndpoint that errors silently on failure to find a port. -func NewSilentSinglePortParser(name string, port int32, opts ...PortBuilderOption) *GenericParser[*SingleEndpointConfig] { - return NewGenericParser(name, port, ParseSingleEndpointSilent, opts...) +func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] { + return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent) } diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go index 9e544d40b9..bf0757d835 100644 --- a/internal/components/single_endpoint_test.go +++ b/internal/components/single_endpoint_test.go @@ -109,9 +109,7 @@ func TestSingleEndpointConfig_GetPortNumOrDefault(t *testing.T) { func TestSingleEndpointParser_ParserName(t *testing.T) { type fields struct { - name string - port int32 - opts []components.PortBuilderOption + b components.Builder[*components.SingleEndpointConfig] } tests := []struct { name string @@ -121,25 +119,22 @@ func TestSingleEndpointParser_ParserName(t *testing.T) { { name: "no options", fields: fields{ - name: "receiver1", - opts: nil, + b: components.NewSinglePortParserBuilder("receiver1", components.UnsetPort), }, want: "__receiver1", }, { name: "with port mapping without builder options", fields: fields{ - name: "receiver2", - opts: []components.PortBuilderOption{ - components.WithTargetPort(8080), - }, + b: components.NewSinglePortParserBuilder("receiver2", components.UnsetPort).WithTargetPort(8080), }, want: "__receiver2", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + s, err := tt.fields.b.Build() + assert.NoError(t, err) assert.Equalf(t, tt.want, s.ParserName(), "ParserName()") }) } @@ -147,9 +142,7 @@ func TestSingleEndpointParser_ParserName(t *testing.T) { func TestSingleEndpointParser_ParserType(t *testing.T) { type fields struct { - name string - port int32 - opts []components.PortBuilderOption + b components.Builder[*components.SingleEndpointConfig] } tests := []struct { name string @@ -159,25 +152,22 @@ func TestSingleEndpointParser_ParserType(t *testing.T) { { name: "no options", fields: fields{ - name: "receiver1", - opts: nil, + b: components.NewSinglePortParserBuilder("receiver1", components.UnsetPort), }, want: "receiver1", }, { name: "with port mapping without builder options", fields: fields{ - name: "receiver2/test", - opts: []components.PortBuilderOption{ - components.WithTargetPort(80), - }, + b: components.NewSinglePortParserBuilder("receiver2", components.UnsetPort).WithTargetPort(8080), }, want: "receiver2", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + s, err := tt.fields.b.Build() + assert.NoError(t, err) assert.Equalf(t, tt.want, s.ParserType(), "ParserType()") }) } @@ -185,9 +175,7 @@ func TestSingleEndpointParser_ParserType(t *testing.T) { func TestSingleEndpointParser_Ports(t *testing.T) { type fields struct { - name string - port int32 - opts []components.PortBuilderOption + b components.Builder[*components.SingleEndpointConfig] } type args struct { config interface{} @@ -202,8 +190,7 @@ func TestSingleEndpointParser_Ports(t *testing.T) { { name: "ValidConfigWithPort", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSinglePortParserBuilder("testparser", 8080), }, args: args{ config: map[string]interface{}{ @@ -215,11 +202,21 @@ func TestSingleEndpointParser_Ports(t *testing.T) { }, wantErr: assert.NoError, }, + { + name: "ValidConfigWithPort nil config", + fields: fields{ + b: components.NewSinglePortParserBuilder("testparser", 8080), + }, + args: args{ + config: nil, + }, + want: nil, + wantErr: assert.NoError, + }, { name: "ValidConfigWithDefaultPort", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSinglePortParserBuilder("testparser", 8080), }, args: args{ config: map[string]interface{}{}, @@ -232,13 +229,10 @@ func TestSingleEndpointParser_Ports(t *testing.T) { { name: "ConfigWithFixins", fields: fields{ - name: "testparser", - port: 8080, - opts: []components.PortBuilderOption{ - components.WithTargetPort(4317), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.GrpcProtocol), - }, + b: components.NewSinglePortParserBuilder("testparser", 8080). + WithTargetPort(4317). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.GrpcProtocol), }, args: args{ config: map[string]interface{}{}, @@ -257,8 +251,7 @@ func TestSingleEndpointParser_Ports(t *testing.T) { { name: "InvalidConfigMissingPort", fields: fields{ - name: "testparser", - port: 0, + b: components.NewSinglePortParserBuilder("testparser", components.UnsetPort), }, args: args{ config: map[string]interface{}{ @@ -271,8 +264,7 @@ func TestSingleEndpointParser_Ports(t *testing.T) { { name: "ErrorParsingConfig", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSinglePortParserBuilder("testparser", components.UnsetPort), }, args: args{ config: "invalid config", @@ -283,8 +275,9 @@ func TestSingleEndpointParser_Ports(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) - got, err := s.Ports(logr.Discard(), tt.fields.name, tt.args.config) + s, err := tt.fields.b.Build() + assert.NoError(t, err) + got, err := s.Ports(logr.Discard(), s.ParserType(), tt.args.config) if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { return } @@ -294,10 +287,9 @@ func TestSingleEndpointParser_Ports(t *testing.T) { } func TestNewSilentSinglePortParser_Ports(t *testing.T) { + type fields struct { - name string - port int32 - opts []components.PortBuilderOption + b components.Builder[*components.SingleEndpointConfig] } type args struct { config interface{} @@ -312,8 +304,7 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { { name: "ValidConfigWithPort", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSilentSinglePortParserBuilder("testparser", 8080), }, args: args{ config: map[string]interface{}{ @@ -328,8 +319,7 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { { name: "ValidConfigWithDefaultPort", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSilentSinglePortParserBuilder("testparser", 8080), }, args: args{ config: map[string]interface{}{}, @@ -342,13 +332,10 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { { name: "ConfigWithFixins", fields: fields{ - name: "testparser", - port: 8080, - opts: []components.PortBuilderOption{ - components.WithTargetPort(4317), - components.WithProtocol(corev1.ProtocolTCP), - components.WithAppProtocol(&components.GrpcProtocol), - }, + b: components.NewSilentSinglePortParserBuilder("testparser", 8080). + WithTargetPort(4317). + WithProtocol(corev1.ProtocolTCP). + WithAppProtocol(&components.GrpcProtocol), }, args: args{ config: map[string]interface{}{}, @@ -367,8 +354,7 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { { name: "InvalidConfigMissingPort", fields: fields{ - name: "testparser", - port: 0, + b: components.NewSilentSinglePortParserBuilder("testparser", components.UnsetPort), }, args: args{ config: map[string]interface{}{ @@ -381,8 +367,7 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { { name: "ErrorParsingConfig", fields: fields{ - name: "testparser", - port: 8080, + b: components.NewSilentSinglePortParserBuilder("testparser", 8080), }, args: args{ config: "invalid config", @@ -393,8 +378,9 @@ func TestNewSilentSinglePortParser_Ports(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := components.NewSilentSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) - got, err := s.Ports(logr.Discard(), tt.fields.name, tt.args.config) + s, err := tt.fields.b.Build() + assert.NoError(t, err) + got, err := s.Ports(logr.Discard(), s.ParserType(), tt.args.config) if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { return } diff --git a/internal/manifests/collector/rbac.go b/internal/manifests/collector/rbac.go index 70bac31c75..610d948b67 100644 --- a/internal/manifests/collector/rbac.go +++ b/internal/manifests/collector/rbac.go @@ -19,25 +19,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/open-telemetry/opentelemetry-operator/internal/manifests" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) func ClusterRole(params manifests.Params) (*rbacv1.ClusterRole, error) { - confStr, err := params.OtelCol.Spec.Config.Yaml() + rules, err := params.OtelCol.Spec.Config.GetAllRbacRules(params.Log) if err != nil { return nil, err - } - - configFromString, err := adapters.ConfigFromString(confStr) - if err != nil { - params.Log.Error(err, "couldn't extract the configuration from the context") - return nil, nil - } - rules := adapters.ConfigToRBAC(params.Log, configFromString) - - if len(rules) == 0 { + } else if len(rules) == 0 { return nil, nil } @@ -60,18 +50,10 @@ func ClusterRole(params manifests.Params) (*rbacv1.ClusterRole, error) { } func ClusterRoleBinding(params manifests.Params) (*rbacv1.ClusterRoleBinding, error) { - confStr, err := params.OtelCol.Spec.Config.Yaml() + rules, err := params.OtelCol.Spec.Config.GetAllRbacRules(params.Log) if err != nil { return nil, err - } - configFromString, err := adapters.ConfigFromString(confStr) - if err != nil { - params.Log.Error(err, "couldn't extract the configuration from the context") - return nil, nil - } - rules := adapters.ConfigToRBAC(params.Log, configFromString) - - if len(rules) == 0 { + } else if len(rules) == 0 { return nil, nil }