Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add otelcol.processor.groupbyattrs component #1300

Merged
merged 19 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bccb601
Add `otelcol.processor.groupbyattrs` component
kehindesalaam Jul 15, 2024
7f0ca5f
Make changes from review
kehindesalaam Jul 23, 2024
280f1aa
Import Correct package version
kehindesalaam Jul 23, 2024
644c98f
Add groupbyattrs converter
kehindesalaam Jul 23, 2024
f70fdfd
Apply suggestions from code review
kehindesalaam Jul 31, 2024
930555b
Update documentation
kehindesalaam Jul 31, 2024
3972e11
Merge branch 'refs/heads/main' into otelcol-processor-groupbyattr
kehindesalaam Jul 31, 2024
eaf85ba
fix order of keys in the test and update go.sum
kehindesalaam Aug 1, 2024
8947ece
Apply suggestions from code review
kehindesalaam Aug 1, 2024
c7edec4
Update internal/converter/internal/otelcolconvert/testdata/groupbyatt…
kehindesalaam Aug 15, 2024
02a4045
Make changes from second review
kehindesalaam Aug 15, 2024
167b46c
Merge remote-tracking branch 'origin/otelcol-processor-groupbyattr' i…
kehindesalaam Aug 15, 2024
0cb3070
Update docs/sources/reference/components/otelcol/otelcol.processor.gr…
kehindesalaam Aug 19, 2024
685fb20
Update docs/sources/reference/components/otelcol/otelcol.processor.gr…
kehindesalaam Aug 19, 2024
fde7535
Merge branch 'grafana:main' into otelcol-processor-groupbyattr
kehindesalaam Aug 19, 2024
a2ac0bc
Add `otelcol.processor.groupbyattrs` component
kehindesalaam Jul 15, 2024
39cfc5c
remove white space
kehindesalaam Aug 19, 2024
b1b562c
Update docs/sources/reference/components/otelcol/otelcol.processor.gr…
kehindesalaam Aug 20, 2024
d7b7593
Move blocks to the Blocks section
ptodev Aug 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ Main (unreleased)
- `otelcol.connector.spanmetrics`: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series.
https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/31780

- Add a `otelcol.processor.groupbyattrs` component to reassociate collected metrics that match specified attributes
from opentelemetry. (@kehindesalaam)

- Upgrade Beyla component v1.6.3 to v1.7.0
- Reporting application process metrics
- New supported protocols: SQL, Redis, Kafka
Expand Down
ptodev marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.groupbyattrs/
description: Learn about otelcol.processor.groupbyattrs
title: otelcol.processor.groupbyattrs
---

# otelcol.processor.groupbyattrs
kehindesalaam marked this conversation as resolved.
Show resolved Hide resolved

`otelcol.processor.groupbyattrs` accepts telemetry data from other `otelcol`
components and reassociates spans, log records, and metric datapoints to a resource that matches the specified attributes. It groups telemetry data by specified attributes.
ptodev marked this conversation as resolved.
Show resolved Hide resolved
ptodev marked this conversation as resolved.
Show resolved Hide resolved

{{% admonition type="note" %}}
`otelcol.processor.groupbyattrs` is a wrapper over the upstream OpenTelemetry
Collector `groupbyattrs` processor. If necessary, bug reports or feature requests
will be redirected to the upstream repository.
{{% /admonition %}}

We recommend you use the groupbyattrs processor together with [otelcol.processor.batch][], as a consecutive step. This will reduce the fragmentation of data by grouping records together under the matching Resource/Instrumentation Library.

You can specify multiple `otelcol.processor.groupbyattrs` components by giving them
different labels.

## Usage

```river
ptodev marked this conversation as resolved.
Show resolved Hide resolved
otelcol.processor.groupbyattrs "LABEL" {
output {
metrics = [...]
logs = [...]
traces = [...]
}
}
```

## Arguments

The following arguments are supported:

| Name | Type | Description | Default | Required |
|-----------------|-------------------|---------------------------------------------------------------------------------------|---------|----------|
| `keys` | `array(string)` | Keys that will be used to group the spans, log records, or metric data points together. | | no |
ptodev marked this conversation as resolved.
Show resolved Hide resolved
| `output` | [output][] | Configures where to send received telemetry data. | yes | |
| `debug_metrics` | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no | |

[output]: #output-block
[debug_metrics]: #debug_metrics-block

### keys
ptodev marked this conversation as resolved.
Show resolved Hide resolved
`keys` is a string array that is used for grouping the data. If it is empty, the processor performs compaction and reassociates all spans with matching Resource and InstrumentationLibrary.


### output block

{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

### debug_metrics block

{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

| Name | Type | Description |
|---------|--------------------|---------------------------------------------------------------|
| `input` | `otelcol.Consumer` | Accepts `otelcol.Consumer` data for metrics, logs, or traces. |

`input` accepts `otelcol.Consumer` data for any telemetry signal (metrics,
logs, or traces).

## Component health

`otelcol.processor.groupbyattrs` is only reported as unhealthy if given an invalid
configuration.

## Debug information

`otelcol.processor.groupbyattrs` doesn't expose any component-specific debug
information.

## Debug metrics

`otelcol.processor.groupbyattrs` doesn't expose any component-specific debug metrics.

## Examples

### Grouping metrics by an attribute

This example reassociates the metrics based on the value of the `host.name` attribute.

```alloy
otelcol.processor.groupbyattrs "default" {
keys = [
"host.name",
]

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}
```

## Notes
clayton-cornell marked this conversation as resolved.
Show resolved Hide resolved
- The data points with different data types aren't merged under the same metric. For example, a gauge and sum metric would not be merged.
- The data points without the specified keys remain under their respective resources.
- New resources inherit the attributes of the original resource and the specified attributes in the keys array.
- The grouping attributes in the keys array are removed from the output metrics.
ptodev marked this conversation as resolved.
Show resolved Hide resolved

kehindesalaam marked this conversation as resolved.
Show resolved Hide resolved
[otelcol.processor.batch]: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.batch/
kehindesalaam marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor v0.105.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumul
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.105.0/go.mod h1:PKox+dLnO2bWc1qUN6WZnyHPV0MpWZ10arqGV5v69kI=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0 h1:oRa+acTM4f5rjTT3+hjOVM1LYrlwrm6CSNG4o/RIqcA=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0/go.mod h1:66cZFd4X8vQBTmvm1hPHxrSNHS474iUEsAVbYk9xQBU=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0 h1:OYsGaSC9G7pAVYKTd1+D0f7HTHcxuQfoEHyQy+a1NKk=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0/go.mod h1:WCesGEakYveZYZH4o3cUTLt3UB7JxE+yDiiphRHoJoc=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0 h1:ScIwuYg6l79Ta+deOyZIADXrBlXSdeAZ7sp3MXhm7JY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0/go.mod h1:pranRmnWRkzDsn9a16BzSqX6HJ6XjjVVFmMhyZPEzt0=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0 h1:mFAlBmDFELQJS8uj1M8csB/vQqjpq6W9/9k9izh9Hr4=
Expand Down
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs
_ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Package attributes provides an otelcol.processor.groupbyattrs component.
ptodev marked this conversation as resolved.
Show resolved Hide resolved
package groupbyattrs

import (
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/processor"
"github.com/grafana/alloy/internal/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.groupbyattrs",
Stability: featuregate.StabilityGenerallyAvailable,
Exports: otelcol.ConsumerExports{},
Args: Arguments{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := groupbyattrsprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

type Arguments struct {

// Keys is a list of attributes to group metrics by.
Keys []string `alloy:"keys,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `alloy:"output,block"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
wildum marked this conversation as resolved.
Show resolved Hide resolved
}

var (
_ processor.Arguments = Arguments{}

// DefaultArguments holds default settings for Arguments.
DefaultArguments = Arguments{
Keys: []string{},
ptodev marked this conversation as resolved.
Show resolved Hide resolved
}
)

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
ptodev marked this conversation as resolved.
Show resolved Hide resolved
args.DebugMetrics.SetToDefault()
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &groupbyattrsprocessor.Config{
GroupByKeys: args.Keys,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// DebugMetricsConfig implements processor.Arguments.
func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments {
return args.DebugMetrics
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package groupbyattrs_test

import (
"testing"

"github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs"
"github.com/grafana/alloy/syntax"

"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
"github.com/stretchr/testify/require"
)

func TestArguments_UnmarshalAlloy(t *testing.T) {
tests := []struct {
testName string
cfg string
expected map[string]interface{}
errMsg string
}{
{
testName: "Default",
cfg: `
output {}
`,
expected: map[string]interface{}{
"keys": []string{},
},
},
{
testName: "SingleKey",
cfg: `
keys = ["key1"]
output {}
`,
expected: map[string]interface{}{
"keys": []string{
"key1",
},
},
},
{
testName: "MultipleKeys",
cfg: `
keys = ["key1", "key2"]
output {}
`,
expected: map[string]interface{}{
"keys": []string{
"key1",
"key2",
},
},
},
}

for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
var args groupbyattrs.Arguments
err := syntax.Unmarshal([]byte(tt.cfg), &args)
if tt.errMsg != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.errMsg)
return
}
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*groupbyattrsprocessor.Config)

var expectedCfg groupbyattrsprocessor.Config
err = mapstructure.Decode(tt.expected, &expectedCfg)
require.NoError(t, err)

require.Equal(t, expectedCfg, *actual)
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs"
"github.com/grafana/alloy/internal/converter/diag"
"github.com/grafana/alloy/internal/converter/internal/common"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, groupByAttrsConverter{})
}

type groupByAttrsConverter struct{}

func (groupByAttrsConverter) Factory() component.Factory {
return groupbyattrsprocessor.NewFactory()
}

func (groupByAttrsConverter) InputComponentName() string {
return "otelcol.processor.groupbyattrs"
}

func (groupByAttrsConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.AlloyComponentLabel()

args := toGroupByAttrsProcessor(state, id, cfg.(*groupbyattrsprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "groupbyattrs"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toGroupByAttrsProcessor(state *State, id component.InstanceID, cfg *groupbyattrsprocessor.Config) *groupbyattrs.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &groupbyattrs.Arguments{
Keys: cfg.GroupByKeys,
Output: &otelcol.ConsumerArguments{
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},

DebugMetrics: common.DefaultValue[groupbyattrs.Arguments]().DebugMetrics,
}
}
Loading