Skip to content

Commit

Permalink
Introduce typed pipeline (#2896)
Browse files Browse the repository at this point in the history
* introduce more typing for pipelines

* rock and a hard place

* resolve tests

* Fix tests

* chlog

* fix docs

* thing

* alphabet

* remove chlog entry
  • Loading branch information
jaronoff97 authored Apr 30, 2024
1 parent 5ca66dd commit 2b7d4b3
Show file tree
Hide file tree
Showing 19 changed files with 324 additions and 32 deletions.
45 changes: 44 additions & 1 deletion apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ import (
"gopkg.in/yaml.v3"
)

type ComponentType int

const (
ComponentTypeReceiver ComponentType = iota
ComponentTypeExporter
ComponentTypeProcessor
)

func (c ComponentType) String() string {
return [...]string{"receiver", "exporter", "processor"}[c]
}

// AnyConfig represent parts of the config.
type AnyConfig struct {
Object map[string]interface{} `json:"-" yaml:",inline"`
Expand Down Expand Up @@ -75,6 +87,37 @@ func (c *AnyConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Object)
}

// Pipeline is a struct of component type to a list of component IDs.
type Pipeline struct {
Exporters []string `json:"exporters" yaml:"exporters"`
Processors []string `json:"processors" yaml:"processors"`
Receivers []string `json:"receivers" yaml:"receivers"`
}

// GetEnabledComponents constructs a list of enabled components by component type.
func (c *Config) GetEnabledComponents() map[ComponentType]map[string]interface{} {
toReturn := map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {},
ComponentTypeProcessor: {},
ComponentTypeExporter: {},
}
for _, pipeline := range c.Service.Pipelines {
if pipeline == nil {
continue
}
for _, componentId := range pipeline.Receivers {
toReturn[ComponentTypeReceiver][componentId] = struct{}{}
}
for _, componentId := range pipeline.Exporters {
toReturn[ComponentTypeExporter][componentId] = struct{}{}
}
for _, componentId := range pipeline.Processors {
toReturn[ComponentTypeProcessor][componentId] = struct{}{}
}
}
return toReturn
}

// Config encapsulates collector config.
type Config struct {
// +kubebuilder:pruning:PreserveUnknownFields
Expand Down Expand Up @@ -135,7 +178,7 @@ type Service struct {
// +kubebuilder:pruning:PreserveUnknownFields
Telemetry *AnyConfig `json:"telemetry,omitempty" yaml:"telemetry,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Pipelines AnyConfig `json:"pipelines" yaml:"pipelines"`
Pipelines map[string]*Pipeline `json:"pipelines" yaml:"pipelines"`
}

// MetricsPort gets the port number for the metrics endpoint from the collector config if it has been set.
Expand Down
131 changes: 119 additions & 12 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ func TestConfigYaml(t *testing.T) {
"insights": "yeah!",
},
},
Pipelines: AnyConfig{
Object: map[string]interface{}{
"receivers": []string{"otlp"},
"processors": []string{"modify_2000"},
"exporters": []string{"otlp/exporter", "con"},
Pipelines: map[string]*Pipeline{
"traces": {
Receivers: []string{"otlp"},
Processors: []string{"modify_2000"},
Exporters: []string{"otlp/exporter", "con"},
},
},
},
Expand All @@ -173,13 +173,14 @@ service:
telemetry:
insights: yeah!
pipelines:
exporters:
- otlp/exporter
- con
processors:
- modify_2000
receivers:
- otlp
traces:
exporters:
- otlp/exporter
- con
processors:
- modify_2000
receivers:
- otlp
`

assert.Equal(t, expected, yamlCollector)
Expand Down Expand Up @@ -278,3 +279,109 @@ func TestConfigToMetricsPort(t *testing.T) {
})
}
}

func TestConfig_GetEnabledComponents(t *testing.T) {
tests := []struct {
name string
file string
want map[ComponentType]map[string]interface{}
}{

{
name: "connectors",
file: "testdata/otelcol-connectors.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {
"foo": struct{}{},
"count": struct{}{},
},
ComponentTypeProcessor: {},
ComponentTypeExporter: {
"bar": struct{}{},
"count": struct{}{},
},
},
},
{
name: "couchbase",
file: "testdata/otelcol-couchbase.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {
"prometheus/couchbase": struct{}{},
},
ComponentTypeProcessor: {
"filter/couchbase": struct{}{},
"metricstransform/couchbase": struct{}{},
"transform/couchbase": struct{}{},
},
ComponentTypeExporter: {
"prometheus": struct{}{},
},
},
},
{
name: "demo",
file: "testdata/otelcol-demo.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {
"otlp": struct{}{},
},
ComponentTypeProcessor: {
"batch": struct{}{},
},
ComponentTypeExporter: {
"debug": struct{}{},
"zipkin": struct{}{},
"otlp": struct{}{},
"prometheus": struct{}{},
},
},
},
{
name: "extensions",
file: "testdata/otelcol-extensions.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {
"otlp": struct{}{},
},
ComponentTypeProcessor: {},
ComponentTypeExporter: {
"otlp/auth": struct{}{},
},
},
},
{
name: "filelog",
file: "testdata/otelcol-filelog.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {
"filelog": struct{}{},
},
ComponentTypeProcessor: {},
ComponentTypeExporter: {
"debug": struct{}{},
},
},
},
{
name: "null",
file: "testdata/otelcol-null-values.yaml",
want: map[ComponentType]map[string]interface{}{
ComponentTypeReceiver: {},
ComponentTypeProcessor: {},
ComponentTypeExporter: {},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collectorYaml, err := os.ReadFile(tt.file)
require.NoError(t, err)

c := &Config{}
err = go_yaml.Unmarshal(collectorYaml, c)
require.NoError(t, err)
assert.Equalf(t, tt.want, c.GetEnabledComponents(), "GetEnabledComponents()")
})
}
}
1 change: 1 addition & 0 deletions apis/v1beta1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type OpenTelemetryCollector struct {
Status OpenTelemetryCollectorStatus `json:"status,omitempty"`
}

// Hub exists to allow for conversion.
func (*OpenTelemetryCollector) Hub() {}

//+kubebuilder:object:root=true
Expand Down
2 changes: 2 additions & 0 deletions apis/v1beta1/testdata/otelcol-connectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ service:
pipelines:
traces:
receivers: [foo]
processors: []
exporters: [count]
metrics:
receivers: [count]
processors: []
exporters: [bar]
1 change: 1 addition & 0 deletions apis/v1beta1/testdata/otelcol-filelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ service:
pipelines:
logs:
receivers: [filelog]
processors: []
exporters: [debug]
47 changes: 46 additions & 1 deletion apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5522,6 +5522,25 @@ spec:
type: string
type: array
pipelines:
additionalProperties:
properties:
exporters:
items:
type: string
type: array
processors:
items:
type: string
type: array
receivers:
items:
type: string
type: array
required:
- exporters
- processors
- receivers
type: object
type: object
x-kubernetes-preserve-unknown-fields: true
telemetry:
Expand Down
19 changes: 19 additions & 0 deletions config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5508,6 +5508,25 @@ spec:
type: string
type: array
pipelines:
additionalProperties:
properties:
exporters:
items:
type: string
type: array
processors:
items:
type: string
type: array
receivers:
items:
type: string
type: array
required:
- exporters
- processors
- receivers
type: object
type: object
x-kubernetes-preserve-unknown-fields: true
telemetry:
Expand Down
Loading

0 comments on commit 2b7d4b3

Please sign in to comment.