Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <[email protected]>
  • Loading branch information
Chengxuan committed Jun 6, 2024
1 parent dd3be97 commit b96a714
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 20 deletions.
14 changes: 10 additions & 4 deletions pkg/ffresty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ const (
HTTPIdleTimeout = "idleTimeout"
// HTTPMaxIdleConns the max number of idle connections to hold pooled
HTTPMaxIdleConns = "maxIdleConns"
// HTTPRequestsPerSecond the max number of request to submit per second
// HTTPThrottleRequestsPerSecond The average rate at which requests are allowed to pass through over time. Default to RPS
// requests over the limit will be blocked using a buffered channel
// the blocked time period is not counted in request timeout
HTTPRequestsPerSecond = "requestsPerSecond"
HTTPThrottleRequestsPerSecond = "throttle.requestsPerSecond"

// HTTPThrottleBurst The maximum number of requests that can be made in a short period of time before the RPS throttling kicks in.
HTTPThrottleBurst = "throttle.burst"

// HTTPMaxConnsPerHost the max number of concurrent connections
HTTPMaxConnsPerHost = "maxConnsPerHost"
// HTTPConnectionTimeout the connection timeout for new connections
Expand Down Expand Up @@ -98,7 +102,8 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(HTTPConfigRetryMaxDelay, defaultRetryMaxWaitTime)
conf.AddKnownKey(HTTPConfigRetryErrorStatusCodeRegex)
conf.AddKnownKey(HTTPConfigRequestTimeout, defaultRequestTimeout)
conf.AddKnownKey(HTTPRequestsPerSecond)
conf.AddKnownKey(HTTPThrottleRequestsPerSecond)
conf.AddKnownKey(HTTPThrottleBurst)
conf.AddKnownKey(HTTPIdleTimeout, defaultHTTPIdleTimeout)
conf.AddKnownKey(HTTPMaxIdleConns, defaultHTTPMaxIdleConns)
conf.AddKnownKey(HTTPMaxConnsPerHost, defaultHTTPMaxConnsPerHost)
Expand All @@ -125,7 +130,8 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*Config, error) {
RetryInitialDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryInitDelay)),
RetryMaximumDelay: fftypes.FFDuration(conf.GetDuration(HTTPConfigRetryMaxDelay)),
RetryErrorStatusCodeRegex: conf.GetString(HTTPConfigRetryErrorStatusCodeRegex),
RequestPerSecond: conf.GetInt(HTTPRequestsPerSecond),
ThrottleRequestsPerSecond: conf.GetInt(HTTPThrottleRequestsPerSecond),
ThrottleBurst: conf.GetInt(HTTPThrottleBurst),
HTTPRequestTimeout: fftypes.FFDuration(conf.GetDuration(HTTPConfigRequestTimeout)),
HTTPIdleConnTimeout: fftypes.FFDuration(conf.GetDuration(HTTPIdleTimeout)),
HTTPMaxIdleConns: conf.GetInt(HTTPMaxIdleConns),
Expand Down
21 changes: 17 additions & 4 deletions pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type HTTPConfig struct {
HTTPExpectContinueTimeout fftypes.FFDuration `ffstruct:"RESTConfig" json:"expectContinueTimeout,omitempty"`
AuthUsername string `ffstruct:"RESTConfig" json:"authUsername,omitempty"`
AuthPassword string `ffstruct:"RESTConfig" json:"authPassword,omitempty"`
RequestPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"`
ThrottleRequestsPerSecond int `ffstruct:"RESTConfig" json:"requestsPerSecond,omitempty"`
ThrottleBurst int `ffstruct:"RESTConfig" json:"burst,omitempty"`
Retry bool `ffstruct:"RESTConfig" json:"retry,omitempty"`
RetryCount int `ffstruct:"RESTConfig" json:"retryCount,omitempty"`
RetryInitialDelay fftypes.FFDuration `ffstruct:"RESTConfig" json:"retryInitialDelay,omitempty"`
Expand Down Expand Up @@ -175,6 +176,20 @@ func New(ctx context.Context, staticConfig config.Section) (client *resty.Client
return NewWithConfig(ctx, *ffrestyConfig), nil
}

func getRateLimiter(rps, burst int) *rate.Limiter {
if rps != 0 || burst != 0 { // if neither was set, no need for a rate limiter
rpsLimiter := rate.Limit(rps)
if rps == 0 { // only want to control max concurrent requests
rpsLimiter = rate.Inf
}
if rps != 0 && burst == 0 {
burst = rps
}
return rate.NewLimiter(rpsLimiter, burst)
}
return nil
}

// New creates a new Resty client, using static configuration (from the config file)
// from a given section in the static configuration
//
Expand Down Expand Up @@ -213,9 +228,7 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
client = resty.NewWithClient(httpClient)
}

if ffrestyConfig.RequestPerSecond != 0 { // NOTE: 0 is treated as RPS control disabled
rateLimiter = rate.NewLimiter(rate.Limit(ffrestyConfig.RequestPerSecond), ffrestyConfig.RequestPerSecond)
}
rateLimiter = getRateLimiter(ffrestyConfig.ThrottleRequestsPerSecond, ffrestyConfig.ThrottleBurst)

url := strings.TrimSuffix(ffrestyConfig.URL, "/")
if url != "" {
Expand Down
73 changes: 63 additions & 10 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/metric"
"golang.org/x/time/rate"

"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"
)

const configDir = "../../test/data/config"
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestRequestWithRateLimiter(t *testing.T) {
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPRequestsPerSecond, rps)
utConf.Set(HTTPThrottleRequestsPerSecond, rps)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

Expand Down Expand Up @@ -146,6 +146,60 @@ func TestRequestWithRateLimiter(t *testing.T) {
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRequestWithRateLimiterHighBurst(t *testing.T) {
expectedNumberOfRequest := 20 // should take longer than 3 seconds less than 4 seconds

customClient := &http.Client{}

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigHeaders, map[string]interface{}{
"someheader": "headervalue",
})
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPThrottleRequestsPerSecond, 0)
utConf.Set(HTTPThrottleBurst, expectedNumberOfRequest)
utConf.Set(HTTPConfigRetryEnabled, true)
utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(customClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/test",
func(req *http.Request) (*http.Response, error) {
assert.Equal(t, "headervalue", req.Header.Get("someheader"))
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
requestChan := make(chan bool, expectedNumberOfRequest)
startTime := time.Now()
for i := 0; i < expectedNumberOfRequest; i++ {
go func() {
resp, err := c.R().Get("/test")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())
requestChan <- true
}()
}
count := 0
for {
<-requestChan
count++
if count == expectedNumberOfRequest {
break

}
}

duration := time.Since(startTime)
assert.Less(t, duration, 1*time.Second)
assert.Equal(t, expectedNumberOfRequest, httpmock.GetTotalCallCount())
}

func TestRateLimiterFailure(t *testing.T) {
customClient := &http.Client{}

Expand All @@ -157,6 +211,7 @@ func TestRateLimiterFailure(t *testing.T) {
utConf.Set(HTTPConfigAuthUsername, "user")
utConf.Set(HTTPConfigAuthPassword, "pass")
utConf.Set(HTTPConfigRetryEnabled, true)

utConf.Set(HTTPCustomClient, customClient)

c, err := New(context.Background(), utConf)
Expand All @@ -170,7 +225,7 @@ func TestRateLimiterFailure(t *testing.T) {
assert.Equal(t, "Basic dXNlcjpwYXNz", req.Header.Get("Authorization"))
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})
rateLimiter = rate.NewLimiter(rate.Limit(0), 0) // fake limiter with error
rateLimiter = rate.NewLimiter(rate.Limit(1), 0) // artificially create an broken rate limiter, this is not possible with our config default
resp, err := c.R().Get("/test")
assert.Error(t, err)
assert.Regexp(t, "exceeds", err)
Expand Down Expand Up @@ -617,17 +672,15 @@ func TestEnableClientMetrics(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

}



func TestEnableClientMetricsIdempotent(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")
_ = EnableClientMetrics(ctx, mr)
_ = EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)
}
Expand All @@ -637,17 +690,17 @@ func TestHooks(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

onErrorCount := 0
onSuccessCount := 0

customOnError := func(req *resty.Request, err error){
customOnError := func(req *resty.Request, err error) {
onErrorCount++
}

customOnSuccess := func(c *resty.Client, resp *resty.Response){
customOnSuccess := func(c *resty.Client, resp *resty.Response) {
onSuccessCount++
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/i18n/en_base_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ var (
ConfigGlobalExpectContinueTimeout = ffc("config.global.expectContinueTimeout", "See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)", TimeDurationType)
ConfigGlobalHeaders = ffc("config.global.headers", "Adds custom headers to HTTP requests", MapStringStringType)
ConfigGlobalIdleTimeout = ffc("config.global.idleTimeout", "The max duration to hold a HTTP keepalive connection between calls", TimeDurationType)
ConfigGlobalRPS = ffc("config.global.requestsPerSecond", "The max number of requests to submit per second, requests over the limit will wait.", IntType)
ConfigGlobalThrottleRPS = ffc("config.global.throttle.requestsPerSecond", "The average rate at which requests are allowed to pass through over time.", IntType)
ConfigGlobalThrottleBurst = ffc("config.global.throttle.burst", "The maximum number of requests that can be made in a short period of time before the throttling kicks in.", IntType)
ConfigGlobalMaxIdleConns = ffc("config.global.maxIdleConns", "The max number of idle connections to hold pooled", IntType)
ConfigGlobalMaxConnsPerHost = ffc("config.global.maxConnsPerHost", "The max number of connections, per unique hostname. Zero means no limit", IntType)
ConfigGlobalMethod = ffc("config.global.method", "The HTTP method to use when making requests to the Address Resolver", StringType)
Expand Down
3 changes: 2 additions & 1 deletion pkg/i18n/en_base_field_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ var (
RESTConfigExpectHeaders = ffm("RESTConfig.headers", "Headers to add to the HTTP call")
RESTConfigHTTPPassthroughHeadersEnabled = ffm("RESTConfig.httpPassthroughHeadersEnabled", "Proxy request ID or other configured headers from an upstream microservice connection")
RESTConfigIdleTimeout = ffm("RESTConfig.idleTimeout", "Time to leave idle connections in the connection pool")
RESTConfigRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Maximum requests to submit per second")
RESTConfigThrottleRequestsPerSecond = ffm("RESTConfig.requestsPerSecond", "Requests per second")
RESTConfigThrottleBurst = ffm("RESTConfig.burst", "Burst")
RESTConfigMaxConnsPerHost = ffm("RESTConfig.maxConnsPerHost", "Maximum connections per host")
RESTConfigMaxIdleConns = ffm("RESTConfig.maxIdleConns", "Maximum idle connections to leave in the connection pool")
RESTConfigMaxIdleTimeout = ffm("RESTConfig.maxIdleTimeout", "Maximum time to leave idle connections in the connection pool")
Expand Down

0 comments on commit b96a714

Please sign in to comment.