forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add huaweicloudlogsreceiver skelethon
- Loading branch information
narcis.gemene
committed
Sep 11, 2024
1 parent
6926554
commit 02e295c
Showing
27 changed files
with
2,148 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package huawei // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/huawei" | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/cenkalti/backoff/v4" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// Generic function to make an API call with exponential backoff and context cancellation handling. | ||
func MakeAPICallWithRetry[T any]( | ||
ctx context.Context, | ||
shutdownChan chan struct{}, | ||
logger *zap.Logger, | ||
apiCall func() (*T, error), | ||
isThrottlingError func(error) bool, | ||
backOffConfig *backoff.ExponentialBackOff, | ||
) (*T, error) { | ||
// Immediately check for context cancellation or server shutdown. | ||
select { | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("request was cancelled or timed out") | ||
case <-shutdownChan: | ||
return nil, fmt.Errorf("request is cancelled due to server shutdown") | ||
case <-time.After(50 * time.Millisecond): | ||
} | ||
|
||
// Make the initial API call. | ||
resp, err := apiCall() | ||
if err == nil { | ||
return resp, nil | ||
} | ||
|
||
// If the error is not due to request throttling, return the error. | ||
if !isThrottlingError(err) { | ||
return nil, err | ||
} | ||
|
||
// Initialize the backoff mechanism for retrying the API call. | ||
expBackoff := &backoff.ExponentialBackOff{ | ||
InitialInterval: backOffConfig.InitialInterval, | ||
RandomizationFactor: backOffConfig.RandomizationFactor, | ||
Multiplier: backOffConfig.Multiplier, | ||
MaxInterval: backOffConfig.MaxInterval, | ||
MaxElapsedTime: backOffConfig.MaxElapsedTime, | ||
Stop: backoff.Stop, | ||
Clock: backoff.SystemClock, | ||
} | ||
expBackoff.Reset() | ||
attempts := 0 | ||
|
||
// Retry loop for handling throttling errors. | ||
for { | ||
attempts++ | ||
delay := expBackoff.NextBackOff() | ||
if delay == backoff.Stop { | ||
return resp, err | ||
} | ||
logger.Warn("server busy, retrying request", | ||
zap.Int("attempts", attempts), | ||
zap.Duration("delay", delay)) | ||
|
||
// Handle context cancellation or shutdown before retrying. | ||
select { | ||
case <-ctx.Done(): | ||
return nil, fmt.Errorf("request was cancelled or timed out") | ||
case <-shutdownChan: | ||
return nil, fmt.Errorf("request is cancelled due to server shutdown") | ||
case <-time.After(delay): | ||
} | ||
|
||
// Retry the API call. | ||
resp, err = apiCall() | ||
if err == nil { | ||
return resp, nil | ||
} | ||
if !isThrottlingError(err) { | ||
break | ||
} | ||
} | ||
|
||
return nil, err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package huawei | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/cenkalti/backoff/v4" | ||
"github.com/stretchr/testify/assert" | ||
"go.uber.org/zap/zaptest" | ||
) | ||
|
||
func TestMakeAPICallWithRetrySuccess(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
apiCall := func() (*string, error) { | ||
result := "success" | ||
return &result, nil | ||
} | ||
isThrottlingError := func(_ error) bool { | ||
return false | ||
} | ||
|
||
resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) | ||
|
||
assert.NoError(t, err) | ||
assert.Equal(t, "success", *resp) | ||
} | ||
|
||
func TestMakeAPICallWithRetryImmediateFailure(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
apiCall := func() (*string, error) { | ||
return nil, errors.New("some error") | ||
} | ||
isThrottlingError := func(_ error) bool { | ||
return false | ||
} | ||
|
||
resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) | ||
|
||
assert.Error(t, err) | ||
assert.Nil(t, resp) | ||
assert.Equal(t, "some error", err.Error()) | ||
} | ||
|
||
func TestMakeAPICallWithRetryThrottlingWithSuccess(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
callCount := 0 | ||
apiCall := func() (*string, error) { | ||
callCount++ | ||
if callCount == 3 { | ||
result := "success" | ||
return &result, nil | ||
} | ||
return nil, errors.New("throttling error") | ||
} | ||
isThrottlingError := func(err error) bool { | ||
return err.Error() == "throttling error" | ||
} | ||
|
||
backOffConfig := backoff.NewExponentialBackOff() | ||
backOffConfig.InitialInterval = 10 * time.Millisecond | ||
|
||
resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backOffConfig) | ||
|
||
assert.NoError(t, err) | ||
assert.Equal(t, "success", *resp) | ||
assert.Equal(t, 3, callCount) | ||
} | ||
|
||
func TestMakeAPICallWithRetryThrottlingMaxRetries(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
apiCall := func() (*string, error) { | ||
return nil, errors.New("throttling error") | ||
} | ||
isThrottlingError := func(err error) bool { | ||
return err.Error() == "throttling error" | ||
} | ||
|
||
backOffConfig := backoff.NewExponentialBackOff() | ||
backOffConfig.MaxElapsedTime = 50 * time.Millisecond | ||
|
||
resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backOffConfig) | ||
|
||
assert.Error(t, err) | ||
assert.Nil(t, resp) | ||
assert.Equal(t, "throttling error", err.Error()) | ||
} | ||
|
||
func TestMakeAPICallWithRetryContextCancellation(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
ctx, cancel := context.WithCancel(context.TODO()) | ||
time.AfterFunc(time.Second, cancel) | ||
|
||
apiCall := func() (*string, error) { | ||
return nil, errors.New("throttling error") | ||
} | ||
isThrottlingError := func(err error) bool { | ||
return err.Error() == "throttling error" | ||
} | ||
|
||
resp, err := MakeAPICallWithRetry(ctx, make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) | ||
|
||
assert.Error(t, err) | ||
assert.Nil(t, resp) | ||
assert.Equal(t, "request was cancelled or timed out", err.Error()) | ||
} | ||
|
||
func TestMakeAPICallWithRetryServerShutdown(t *testing.T) { | ||
logger := zaptest.NewLogger(t) | ||
shutdownChan := make(chan struct{}) | ||
time.AfterFunc(time.Second, func() { close(shutdownChan) }) | ||
|
||
apiCall := func() (*string, error) { | ||
return nil, errors.New("throttling error") | ||
} | ||
isThrottlingError := func(err error) bool { | ||
return err.Error() == "throttling error" | ||
} | ||
|
||
resp, err := MakeAPICallWithRetry(context.TODO(), shutdownChan, logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) | ||
|
||
assert.Error(t, err) | ||
assert.Nil(t, resp) | ||
assert.Equal(t, "request is cancelled due to server shutdown", err.Error()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/huawei | ||
|
||
go 1.22.3 | ||
|
||
require ( | ||
github.com/cenkalti/backoff/v4 v4.3.0 | ||
github.com/stretchr/testify v1.9.0 | ||
go.uber.org/zap v1.27.0 | ||
) | ||
|
||
require ( | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
go.uber.org/multierr v1.10.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
Oops, something went wrong.