Skip to content

Commit

Permalink
allow the receiver to opperate with delayed metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
narcis.gemene committed Aug 29, 2024
1 parent eca257a commit e7ea3b0
Show file tree
Hide file tree
Showing 16 changed files with 548 additions and 164 deletions.
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ receivers:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.108.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.108.0
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.108.0
Expand Down Expand Up @@ -545,7 +546,7 @@ require (
github.com/hashicorp/serf v0.10.1 // indirect
github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95 // indirect
github.com/hetznercloud/hcloud-go/v2 v2.10.2 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.104 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
7 changes: 2 additions & 5 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion receiver/huaweicloudcesreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ The following settings are optional:

- `initial_delay`: The delay before the first collection of metrics begins. This is a duration field, such as 5s for 5 seconds.

- `collection_interval` (default = `60s`): This is the interval at which this receiver collects metrics. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.
- `collection_interval` (default = `60s`): This is the interval at which this receiver collects metrics. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. We recommend a polling interval of at least one minute.

- `retry_on_failure`: The following configurations can be used to control the retry policy of the CES client. The default values are suitable for most deployment scenarios.
- `enabled` (default true)
- `initial_interval` (default 100ms)
- `max_interval` (default 1s)
- `max_elapsed_time` (default 15s)
- `randomization_factor` (default 0.5)
- `multiplier` (default 1.5)

### Example Configuration

Expand Down Expand Up @@ -88,6 +96,9 @@ Before running the application, you need to set the environment variables `HUAWE
echo $HUAWEICLOUD_SDK_SK
```

## Error handling
If you encounter any CES errors, please refer to the [Huawei Cloud Error Codes](https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-errorcode.html).

## Converting CES metric representation to Open Telementery metric representation


Expand Down
27 changes: 20 additions & 7 deletions receiver/huaweicloudcesreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"fmt"
"slices"

"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1/model"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
)
Expand Down Expand Up @@ -57,10 +59,12 @@ type Config struct {
// 14400: Cloud Eye aggregates data every 4 hours.
// 86400: Cloud Eye aggregates data every 24 hours.
// For details about the aggregation, see https://support.huaweicloud.com/intl/en-us/ces_faq/ces_faq_0009.html
Period int `mapstructure:"period"`
Period int32 `mapstructure:"period"`

// Data aggregation method. The supported values ​​are max, min, average, sum, variance.
Filter string `mapstructure:"filter"`

BackOffConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
}

type HuaweiSessionConfig struct {
Expand All @@ -78,10 +82,16 @@ type HuaweiSessionConfig struct {
var _ component.Config = (*Config)(nil)

// These valid periods are defined by CES API constraints: https://support.huaweicloud.com/intl/en-us/api-ces/ces_03_0034.html#section3
var validPeriods = []int{1, 300, 1200, 3600, 14400, 86400}
var validPeriods = []int32{1, 300, 1200, 3600, 14400, 86400}

// These valid filters are defined by CES API constraints: https://support.huaweicloud.com/intl/en-us/api-ces/ces_03_0034.html#section3
var validFilters = []string{"max", "min", "average", "sum", "variance"}
var validFilters = map[string]model.ShowMetricDataRequestFilter{
"max": model.GetShowMetricDataRequestFilterEnum().MAX,
"min": model.GetShowMetricDataRequestFilterEnum().MIN,
"average": model.GetShowMetricDataRequestFilterEnum().AVERAGE,
"sum": model.GetShowMetricDataRequestFilterEnum().SUM,
"variance": model.GetShowMetricDataRequestFilterEnum().VARIANCE,
}

// Validate config
func (config *Config) Validate() error {
Expand All @@ -96,11 +106,14 @@ func (config *Config) Validate() error {
if index := slices.Index(validPeriods, config.Period); index == -1 {
err = multierr.Append(err, fmt.Errorf("invalid period: got %d; must be one of %v", config.Period, validPeriods))
}

if index := slices.Index(validFilters, config.Filter); index == -1 {
err = multierr.Append(err, fmt.Errorf("invalid filter: got %s; must be one of %v", config.Filter, validFilters))
if _, ok := validFilters[config.Filter]; !ok {
var validFiltersSlice []string
for key := range validFilters {
validFiltersSlice = append(validFiltersSlice, key)
}
err = multierr.Append(err, fmt.Errorf("invalid filter: got %s; must be one of %v", config.Filter, validFiltersSlice))
}
if config.Period >= int(config.CollectionInterval.Seconds()) {
if config.Period >= int32(config.CollectionInterval.Seconds()) {
err = multierr.Append(err, errInvalidCollectionInterval)
}

Expand Down
11 changes: 11 additions & 0 deletions receiver/huaweicloudcesreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package huaweicloudcesreceiver // import "github.com/open-telemetry/opentelemetr

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

Expand All @@ -22,6 +25,14 @@ func NewFactory() receiver.Factory {

func createDefaultConfig() component.Config {
return &Config{
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond,
MaxInterval: time.Second,
MaxElapsedTime: 15 * time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
HuaweiSessionConfig: HuaweiSessionConfig{
NoVerifySSL: false,
},
Expand Down
8 changes: 5 additions & 3 deletions receiver/huaweicloudcesreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ go 1.22.0
toolchain go1.22.3

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.108.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.107.0
go.opentelemetry.io/collector/config/confighttp v0.107.0
go.opentelemetry.io/collector/config/configopaque v1.13.0
go.opentelemetry.io/collector/config/configretry v1.13.0
go.opentelemetry.io/collector/confmap v0.107.0
go.opentelemetry.io/collector/consumer v0.107.0
go.opentelemetry.io/collector/consumer/consumertest v0.107.0
go.opentelemetry.io/collector/pdata v1.13.1-0.20240816132030-9fd84668bb02
go.opentelemetry.io/collector/pdata v1.14.1
go.opentelemetry.io/collector/receiver v0.106.1
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -45,7 +47,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
8 changes: 6 additions & 2 deletions receiver/huaweicloudcesreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 11 additions & 22 deletions receiver/huaweicloudcesreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,24 @@ func TestHuaweiCloudCESReceiverIntegration(t *testing.T) {
Dimensions: []model.MetricsDimension{
{
Name: "instance_id",
Value: "12345",
Value: "faea5b75-e390-4e2b-8733-9226a9026070",
},
},
Unit: "%",
},
},
}, nil)

mc.On("BatchListMetricData", mock.Anything).Return(&model.BatchListMetricDataResponse{
Metrics: &[]model.BatchMetricData{
mc.On("ShowMetricData", mock.Anything).Return(&model.ShowMetricDataResponse{
MetricName: stringPtr("cpu_util"),
Datapoints: &[]model.Datapoint{
{
Namespace: stringPtr("SYS.ECS"),
MetricName: "cpu_util",
Dimensions: &[]model.MetricsDimension{
{
Name: "instance_id",
Value: "faea5b75-e390-4e2b-8733-9226a9026070",
},
},
Datapoints: []model.DatapointForBatchMetric{
{
Average: float64Ptr(45.67),
Timestamp: 1556625610000,
},
{
Average: float64Ptr(89.01),
Timestamp: 1556625715000,
},
},
Unit: stringPtr("%"),
Average: float64Ptr(45.67),
Timestamp: 1556625610000,
},
{
Average: float64Ptr(89.01),
Timestamp: 1556625715000,
},
},
}, nil)
Expand Down
88 changes: 88 additions & 0 deletions receiver/huaweicloudcesreceiver/internal/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal"

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
)

// Generic function to make an API call with exponential backoff and context cancellation handling.
func MakeAPICallWithRetry[T any](
ctx context.Context,
shutdownChan chan struct{},
logger *zap.Logger,
apiCall func() (*T, error),
isThrottlingError func(error) bool,
backOffConfig *backoff.ExponentialBackOff,
) (*T, error) {
// Immediately check for context cancellation or server shutdown.
select {
case <-ctx.Done():
return nil, fmt.Errorf("request was cancelled or timed out")
case <-shutdownChan:
return nil, fmt.Errorf("request is cancelled due to server shutdown")
case <-time.After(50 * time.Millisecond):
}

// Make the initial API call.
resp, err := apiCall()
if err == nil {
return resp, nil
}

// If the error is not due to request throttling, return the error.
if !isThrottlingError(err) {
return nil, err
}

// Initialize the backoff mechanism for retrying the API call.
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: backOffConfig.InitialInterval,
RandomizationFactor: backOffConfig.RandomizationFactor,
Multiplier: backOffConfig.Multiplier,
MaxInterval: backOffConfig.MaxInterval,
MaxElapsedTime: backOffConfig.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
attempts := 0

// Retry loop for handling throttling errors.
for {
attempts++
delay := expBackoff.NextBackOff()
if delay == backoff.Stop {
return resp, err
}
logger.Warn("server busy, retrying request",
zap.Int("attempts", attempts),
zap.Duration("delay", delay))

// Handle context cancellation or shutdown before retrying.
select {
case <-ctx.Done():
return nil, fmt.Errorf("request was cancelled or timed out")
case <-shutdownChan:
return nil, fmt.Errorf("request is cancelled due to server shutdown")
case <-time.After(delay):
}

// Retry the API call.
resp, err = apiCall()
if err == nil {
return resp, nil
}
if !isThrottlingError(err) {
break
}
}

return nil, err
}
Loading

0 comments on commit e7ea3b0

Please sign in to comment.