Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add intervalprocessor component #1484

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Main (unreleased)
### Features

- Added Datadog Exporter community component, enabling exporting of otel-formatted Metrics and traces to Datadog. (@polyrain)
- (_Experimental_) Add an `otelcol.processor.interval` component to aggregate metrics and periodically
forward the latest values to the next component in the pipeline.

### Enhancements

- Clustering peer resolution through `--cluster.join-addresses` flag has been
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol.
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down Expand Up @@ -339,6 +340,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.interval/
description: Learn about otelcol.processor.interval
title: otelcol.processor.interval
---

<span class="badge docs-labels__stage docs-labels__item">Experimental</span>

# otelcol.processor.interval

{{< docs/shared lookup="stability/experimental.md" source="alloy" version="<ALLOY_VERSION>" >}}

`otelcol.processor.interval` aggregates metrics and periodically forwards the latest values to the next component in the pipeline.
The processor supports aggregating the following metric types:

* Monotonically increasing, cumulative sums
* Monotonically increasing, cumulative histograms
* Monotonically increasing, cumulative exponential histograms

The following metric types will *not* be aggregated and will instead be passed, unchanged, to the next component in the pipeline:

* All delta metrics
* Non-monotonically increasing sums
* Gauges
* Summaries

{{< admonition type="warning" >}}
After exporting, any internal state is cleared. If no new metrics come in, the next interval will export nothing.
{{< /admonition >}}

{{< admonition type="note" >}}
`otelcol.processor.interval` is a wrapper over the upstream OpenTelemetry Collector `interval` processor.
Bug reports or feature requests will be redirected to the upstream repository, if necessary.
{{< /admonition >}}

## Usage

```alloy
otelcol.processor.interval "LABEL" {
output {
metrics = [...]
}
}
```

## Arguments

`otelcol.processor.interval` supports the following arguments:

Name | Type | Description | Default | Required
------------- | ---------- | ------------------------------------------------------------------- | ------- | --------
`interval` | `duration` | The interval in the processor should export the aggregated metrics. | `"60s"` | no

## Blocks

The following blocks are supported inside the definition of `otelcol.processor.interval`:

Hierarchy | Block | Description | Required
------------- | ----------------- | -------------------------------------------------------------------------- | --------
output | [output][] | Configures where to send received telemetry data. | yes
debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no

[output]: #output-block
[debug_metrics]: #debug_metrics-block

### output block

{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

### debug_metrics block

{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
--------|--------------------|-----------------------------------------------------------------
`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to.

`input` accepts `otelcol.Consumer` data for metrics.

## Component health

`otelcol.processor.interval` is only reported as unhealthy if given an invalid configuration.

## Debug information

`otelcol.processor.interval` does not expose any component-specific debug information.

## Example

This example receives OTLP metrics and aggregates them for 30s before sending to the next exporter.

```alloy
otelcol.receiver.otlp "default" {
grpc { ... }
http { ... }

output {
metrics = [otelcol.processor.interval.default.input]
}
}

otelcol.processor.interval "default" {
ArthurSens marked this conversation as resolved.
Show resolved Hide resolved
interval = "30s"
output {
metrics = [otelcol.exporter.otlphttp.grafana_cloud.input]
}
}

otelcol.exporter.otlphttp "grafana_cloud" {
client {
endpoint = "https://otlp-gateway-prod-gb-south-0.grafana.net/otlp"
auth = otelcol.auth.basic.grafana_cloud.handler
}
}

otelcol.auth.basic "grafana_cloud" {
username = env("GRAFANA_CLOUD_USERNAME")
password = env("GRAFANA_CLOUD_API_KEY")
}
```

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 0 | test_metric | Cumulative | labelA: foo | 4.0 |
| 2 | test_metric | Cumulative | labelA: bar | 3.1 |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |
| 6 | test_metric | Cumulative | labelA: foo | 8.2 |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

The processor immediately passes the following metric to the next processor in the chain because it is a Delta metric.

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |


At the next `interval` (15s by default), the processor passed the following metrics to the next processor in the chain.

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ----------- | ------------------------- | ----------- | ----: |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`otelcol.processor.interval` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters)

`otelcol.processor.interval` has exports that can be consumed by the following components:

- Components that consume [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ require (
github.com/go-openapi/validate v0.23.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-resty/resty/v2 v2.13.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is from this line, how the mapstructure reports errors has changed. You need to update the verbiage in the corrupt_config.diags to match the new error it reports.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find, thanks!

github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.3 // indirect
Expand Down Expand Up @@ -656,6 +656,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c=
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc=
github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg=
Expand Down Expand Up @@ -1238,8 +1238,6 @@ github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrR
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 h1:UPkAxuhlAcRmJT3/qd34OMTl+ZU7BLLfOO2+NXBlJpY=
github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3/go.mod h1:iZiiwNT4HbtGRVqCQu7uJPEZCuEE5sfSSttcnePkDl4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15 h1:J4PmreN24XmbqMIKReAS/P1t7ND6NCAZApcbjBhedrY=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548 h1:XwoNrPHEl07N7EIMt/WXlzGj0Q4CDEfa+6nrdnQGOG4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU=
Expand Down Expand Up @@ -1968,6 +1966,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterproces
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0/go.mod h1:66cZFd4X8vQBTmvm1hPHxrSNHS474iUEsAVbYk9xQBU=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0 h1:OYsGaSC9G7pAVYKTd1+D0f7HTHcxuQfoEHyQy+a1NKk=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0/go.mod h1:WCesGEakYveZYZH4o3cUTLt3UB7JxE+yDiiphRHoJoc=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0 h1:4EBgaDFaVQOaV0hpgNTrFQL8zjXSOglXz15gyUL/Kds=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0/go.mod h1:C9PSzt0uqtTM9oPs+1H92PAzowI4yIhnzXvdpFJjX30=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0 h1:ScIwuYg6l79Ta+deOyZIADXrBlXSdeAZ7sp3MXhm7JY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0/go.mod h1:pranRmnWRkzDsn9a16BzSqX6HJ6XjjVVFmMhyZPEzt0=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0 h1:mFAlBmDFELQJS8uj1M8csB/vQqjpq6W9/9k9izh9Hr4=
Expand Down
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs
Expand Down
88 changes: 88 additions & 0 deletions internal/component/otelcol/processor/interval/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package interval provides an otelcol.processor.interval component.
package interval

import (
"fmt"
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/processor"
"github.com/grafana/alloy/internal/featuregate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.interval",
Stability: featuregate.StabilityExperimental,
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return processor.New(opts, intervalprocessor.NewFactory(), args.(Arguments))
},
})
}

type Arguments struct {
// The interval in which the processor should export the aggregated metrics. Default: 60s.
Interval time.Duration `alloy:"interval,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `alloy:"output,block"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Interval: 60 * time.Second,
}

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
if args.Interval <= 0 {
return fmt.Errorf("interval must be greater than 0")
}
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &intervalprocessor.Config{
Interval: args.Interval,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// DebugMetricsConfig implements processor.Arguments.
func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments {
return args.DebugMetrics
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/processor/interval"
"github.com/grafana/alloy/internal/converter/diag"
"github.com/grafana/alloy/internal/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, intervalProcessorConverter{})
}

type intervalProcessorConverter struct{}

func (intervalProcessorConverter) Factory() component.Factory {
return intervalprocessor.NewFactory()
}

func (intervalProcessorConverter) InputComponentName() string {
return "otelcol.processor.interval"
}

func (intervalProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.AlloyComponentLabel()

args := toIntervalProcessor(state, id, cfg.(*intervalprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "interval"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toIntervalProcessor(state *State, id component.InstanceID, cfg *intervalprocessor.Config) *interval.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &interval.Arguments{
Interval: cfg.Interval,
Output: &otelcol.ConsumerArguments{
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},
DebugMetrics: common.DefaultValue[interval.Arguments]().DebugMetrics,
}
}
Loading
Loading