diff --git a/pkg/ffresty/config.go b/pkg/ffresty/config.go index 20bf75a..7e3e592 100644 --- a/pkg/ffresty/config.go +++ b/pkg/ffresty/config.go @@ -67,10 +67,14 @@ const ( HTTPIdleTimeout = "idleTimeout" // HTTPMaxIdleConns the max number of idle connections to hold pooled HTTPMaxIdleConns = "maxIdleConns" - // HTTPRequestsPerSecond the max number of request to submit per second + // HTTPThrottleRequestsPerSecond The average rate at which requests are allowed to pass through over time. Default to RPS // requests over the limit will be blocked using a buffered channel // the blocked time period is not counted in request timeout - HTTPRequestsPerSecond = "requestsPerSecond" + HTTPThrottleRequestsPerSecond = "throttle.requestsPerSecond" + + // HTTPThrottleBurst The maximum number of requests that can be made in a short period of time before the RPS throttling kicks in. + HTTPThrottleBurst = "throttle.burst" + // HTTPMaxConnsPerHost the max number of concurrent connections HTTPMaxConnsPerHost = "maxConnsPerHost" // HTTPConnectionTimeout the connection timeout for new connections @@ -98,7 +102,8 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime) conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex) conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout) - conf.AddKnownKey(HTTPRequestsPerSecond) + conf.AddKnownKey(HTTPThrottleRequestsPerSecond) + conf.AddKnownKey(HTTPThrottleBurst) conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout) conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns) conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost) @@ -125,7 +130,8 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) { RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)), RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)), RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex), - RequestPerSecond: conf.GetInt(HTTPRequestsPerSecond), + ThrottleRequestsPerSecond: conf.GetInt(HTTPThrottleRequestsPerSecond), + ThrottleBurst: conf.GetInt(HTTPThrottleBurst), HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)), HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)), HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns), diff --git a/pkg/ffresty/ffresty.go b/pkg/ffresty/ffresty.go index b6b0dcc..96bad3c 100644 --- a/pkg/ffresty/ffresty.go +++ b/pkg/ffresty/ffresty.go @@ -71,7 +71,8 @@ type HTTPConfig struct { HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"` AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"` AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"` - RequestPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"` + ThrottleRequestsPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"` + ThrottleBurst int `ffstruct:"RESTConfig" json:"burst,omitempty"` Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"` RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"` RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"` @@ -175,6 +176,20 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client return NewWithConfig(ctx, *ffrestyConfig), nil } +func getRateLimiter(rps, burst int) *rate.Limiter { + if rps != 0 || burst != 0 { // if neither was set, no need for a rate limiter + rpsLimiter := rate.Limit(rps) + if rps == 0 { // only want to control max concurrent requests + rpsLimiter = rate.Inf + } + if rps != 0 && burst == 0 { + burst = rps + } + return rate.NewLimiter(rpsLimiter, burst) + } + return nil +} + // New creates a new Resty client, using static configuration (from the config file) // from a given section in the static configuration // @@ -213,9 +228,7 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli client = resty.NewWithClient(httpClient) } - if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled - rateLimiter = rate.NewLimiter(rate.Limit(ffrestyConfig.RequestPerSecond), ffrestyConfig.RequestPerSecond) - } + rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst) url := strings.TrimSuffix(ffrestyConfig.URL, "/") if url != "" { diff --git a/pkg/ffresty/ffresty_test.go b/pkg/ffresty/ffresty_test.go index 08a5dd7..b7028fd 100644 --- a/pkg/ffresty/ffresty_test.go +++ b/pkg/ffresty/ffresty_test.go @@ -42,10 +42,10 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftls" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/metric" + "golang.org/x/time/rate" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" - "golang.org/x/time/rate" ) const configDir = "../../test/data/config" @@ -104,7 +104,7 @@ func TestRequestWithRateLimiter(t *testing.T) { }) utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") - utConf.Set(HTTPRequestsPerSecond, rps) + utConf.Set(HTTPThrottleRequestsPerSecond, rps) utConf.Set(HTTPConfigRetryEnabled, true) utConf.Set(HTTPCustomClient, customClient) @@ -146,6 +146,60 @@ func TestRequestWithRateLimiter(t *testing.T) { assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) } +func TestRequestWithRateLimiterHighBurst(t *testing.T) { + expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds + + customClient := &http.Client{} + + resetConf() + utConf.Set(HTTPConfigURL, "http://localhost:12345") + utConf.Set(HTTPConfigHeaders, map[string]interface{}{ + "someheader": "headervalue", + }) + utConf.Set(HTTPConfigAuthUsername, "user") + utConf.Set(HTTPConfigAuthPassword, "pass") + utConf.Set(HTTPThrottleRequestsPerSecond, 0) + utConf.Set(HTTPThrottleBurst, expectedNumberOfRequest) + utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) + + c, err := New(context.Background(), utConf) + assert.Nil(t, err) + httpmock.ActivateNonDefault(customClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/test", + func(req *http.Request) (*http.Response, error) { + assert.Equal(t, "headervalue", req.Header.Get("someheader")) + assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) + return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) + }) + requestChan := make(chan bool, expectedNumberOfRequest) + startTime := time.Now() + for i := 0; i < expectedNumberOfRequest; i++ { + go func() { + resp, err := c.R().Get("/test") + assert.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode()) + assert.Equal(t, `{"some": "data"}`, resp.String()) + requestChan <- true + }() + } + count := 0 + for { + <-requestChan + count++ + if count == expectedNumberOfRequest { + break + + } + } + + duration := time.Since(startTime) + assert.Less(t, duration, 1*time.Second) + assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount()) +} + func TestRateLimiterFailure(t *testing.T) { customClient := &http.Client{} @@ -157,6 +211,7 @@ func TestRateLimiterFailure(t *testing.T) { utConf.Set(HTTPConfigAuthUsername, "user") utConf.Set(HTTPConfigAuthPassword, "pass") utConf.Set(HTTPConfigRetryEnabled, true) + utConf.Set(HTTPCustomClient, customClient) c, err := New(context.Background(), utConf) @@ -170,7 +225,7 @@ func TestRateLimiterFailure(t *testing.T) { assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization")) return httpmock.NewStringResponder(200, `{"some": "data"}`)(req) }) - rateLimiter = rate.NewLimiter(rate.Limit(0), 0) // fake limiter with error + rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default resp, err := c.R().Get("/test") assert.Error(t, err) assert.Regexp(t, "exceeds", err) @@ -617,17 +672,15 @@ func TestEnableClientMetrics(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - err := EnableClientMetrics(ctx, mr) + err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) } - - func TestEnableClientMetricsIdempotent(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - _ = EnableClientMetrics(ctx, mr) + _ = EnableClientMetrics(ctx, mr) err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) } @@ -637,17 +690,17 @@ func TestHooks(t *testing.T) { ctx := context.Background() mr := metric.NewPrometheusMetricsRegistry("test") - err := EnableClientMetrics(ctx, mr) + err := EnableClientMetrics(ctx, mr) assert.NoError(t, err) onErrorCount := 0 onSuccessCount := 0 - customOnError := func(req *resty.Request, err error){ + customOnError := func(req *resty.Request, err error) { onErrorCount++ } - customOnSuccess := func(c *resty.Client, resp *resty.Response){ + customOnSuccess := func(c *resty.Client, resp *resty.Response) { onSuccessCount++ } diff --git a/pkg/i18n/en_base_config_descriptions.go b/pkg/i18n/en_base_config_descriptions.go index aee2e33..733395c 100644 --- a/pkg/i18n/en_base_config_descriptions.go +++ b/pkg/i18n/en_base_config_descriptions.go @@ -84,7 +84,8 @@ var ( ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType) ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType) ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType) - ConfigGlobalRPS = ffc("config.global.requestsPerSecond", "The max number of requests to submit per second, requests over the limit will wait.", IntType) + ConfigGlobalThrottleRPS = ffc("config.global.throttle.requestsPerSecond", "The average rate at which requests are allowed to pass through over time.", IntType) + ConfigGlobalThrottleBurst = ffc("config.global.throttle.burst", "The maximum number of requests that can be made in a short period of time before the throttling kicks in.", IntType) ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType) ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType) ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType) diff --git a/pkg/i18n/en_base_field_descriptions.go b/pkg/i18n/en_base_field_descriptions.go index dd9ad21..88ccb17 100644 --- a/pkg/i18n/en_base_field_descriptions.go +++ b/pkg/i18n/en_base_field_descriptions.go @@ -88,7 +88,8 @@ var ( RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call") RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection") RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool") - RESTConfigRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Maximum requests to submit per second") + RESTConfigThrottleRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Requests per second") + RESTConfigThrottleBurst = ffm("RESTConfig.burst", "Burst") RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host") RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool") RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")