diff --git a/pkg/currency/updater.go b/pkg/currency/updater.go index 94fb75627..f1d96bdc9 100644 --- a/pkg/currency/updater.go +++ b/pkg/currency/updater.go @@ -44,7 +44,10 @@ func NewUpdater(ctx context.Context, config cfg.Config, logger log.Logger) (Upda return nil, fmt.Errorf("can not create kvStore: %w", err) } - httpClient := http.NewHttpClient(config, logger) + httpClient, err := http.ProvideHttpClient(ctx, config, logger, "currencyUpdater") + if err != nil { + return nil, fmt.Errorf("can not create http client: %w", err) + } return NewUpdaterWithInterfaces(logger, store, httpClient, clock.Provider), nil } diff --git a/pkg/http/circuit_breaker.go b/pkg/http/circuit_breaker.go new file mode 100644 index 000000000..725f9bd81 --- /dev/null +++ b/pkg/http/circuit_breaker.go @@ -0,0 +1,132 @@ +package http + +import ( + "context" + "sync/atomic" + "time" + + "github.com/justtrackio/gosoline/pkg/clock" + "github.com/justtrackio/gosoline/pkg/exec" + "github.com/justtrackio/gosoline/pkg/funk" + "github.com/justtrackio/gosoline/pkg/log" +) + +type CircuitBreakerSettings struct { + Enabled bool `cfg:"enabled" default:"false"` + MaxFailures int64 `cfg:"max_failures" default:"10"` + RetryDelay time.Duration `cfg:"retry_delay" default:"1m"` + ExpectedStatuses []int `cfg:"expected_statuses"` +} + +type CircuitIsOpenError struct { +} + +func (c CircuitIsOpenError) Error() string { + return "request rejected, circuit breaker is open" +} + +type circuitBreakerClient struct { + Client + logger log.Logger + clock clock.Clock + name string + settings CircuitBreakerSettings + + // state updated with atomics + recentFailures int64 + nextRetryAt int64 +} + +func NewCircuitBreakerClientWithInterfaces(baseClient Client, logger log.Logger, clock clock.Clock, name string, settings CircuitBreakerSettings) Client { + return &circuitBreakerClient{ + Client: baseClient, + logger: logger.WithChannel("circuit-breaker-client-" + name), + clock: clock, + name: name, + settings: settings, + recentFailures: 0, + nextRetryAt: 0, + } +} + +func (c *circuitBreakerClient) Delete(ctx context.Context, request *Request) (*Response, error) { + return c.doRequest(ctx, request, c.Client.Delete) +} + +func (c *circuitBreakerClient) Get(ctx context.Context, request *Request) (*Response, error) { + return c.doRequest(ctx, request, c.Client.Get) +} + +func (c *circuitBreakerClient) Patch(ctx context.Context, request *Request) (*Response, error) { + return c.doRequest(ctx, request, c.Client.Patch) +} + +func (c *circuitBreakerClient) Post(ctx context.Context, request *Request) (*Response, error) { + return c.doRequest(ctx, request, c.Client.Post) +} + +func (c *circuitBreakerClient) Put(ctx context.Context, request *Request) (*Response, error) { + return c.doRequest(ctx, request, c.Client.Put) +} + +func (c *circuitBreakerClient) doRequest(ctx context.Context, request *Request, performRequest func(ctx context.Context, request *Request) (*Response, error)) (*Response, error) { + // check if we have too many recent failures + recentFailures := atomic.LoadInt64(&c.recentFailures) + if recentFailures >= c.settings.MaxFailures { + // we have too many failures. check if we can retry anyway + nextRetryAt := atomic.LoadInt64(&c.nextRetryAt) + now := c.clock.Now().UnixMilli() + canRetry := false + if nextRetryAt <= now { + // we can retry. swap the retry value with the next time we can retry. if we lose the race with + // another thread, we will not swap and must not retry + canRetry = atomic.CompareAndSwapInt64(&c.nextRetryAt, nextRetryAt, now+c.settings.RetryDelay.Milliseconds()) + + if canRetry { + c.logger.WithContext(ctx).Info("trying to close circuit breaker again by trying single request") + } + } + + if !canRetry { + return nil, CircuitIsOpenError{} + } + } + + // perform the request and reset the failure counter should we succeed + response, err := performRequest(ctx, request) + if !c.isRemoteFailure(response, err) { + // only reset the counter if the request was successful (ignore e.g. context canceled errors, they are not successful) + if !exec.IsRequestCanceled(err) { + oldFailures := atomic.SwapInt64(&c.recentFailures, 0) + if oldFailures > 0 { + c.logger.WithContext(ctx).Info("reset failure counter of circuit breaker again") + } + } + + return response, err + } + + // ensure we at most retry after the retry delay + newNextRetryAt := c.clock.Now().Add(c.settings.RetryDelay).UnixMilli() + atomic.StoreInt64(&c.nextRetryAt, newNextRetryAt) + + // only now count up the recent failures - so nextRetryAt is set up already when we check it + newFailures := atomic.AddInt64(&c.recentFailures, 1) + if newFailures == c.settings.MaxFailures { + c.logger.WithContext(ctx).Warn("circuit breaker triggered, stopping requests for %v", c.settings.RetryDelay) + } + + return response, err +} + +func (c *circuitBreakerClient) isRemoteFailure(response *Response, err error) bool { + if err != nil { + return !exec.IsRequestCanceled(err) + } + + if len(c.settings.ExpectedStatuses) == 0 { + return false + } + + return !funk.Contains(c.settings.ExpectedStatuses, response.StatusCode) +} diff --git a/pkg/http/circuit_breaker_test.go b/pkg/http/circuit_breaker_test.go new file mode 100644 index 000000000..1cb2c1086 --- /dev/null +++ b/pkg/http/circuit_breaker_test.go @@ -0,0 +1,171 @@ +package http_test + +import ( + "context" + "fmt" + netHttp "net/http" + "testing" + "time" + + "github.com/justtrackio/gosoline/pkg/clock" + "github.com/justtrackio/gosoline/pkg/http" + httpMocks "github.com/justtrackio/gosoline/pkg/http/mocks" + "github.com/justtrackio/gosoline/pkg/log" + loggerMocks "github.com/justtrackio/gosoline/pkg/log/mocks" + "github.com/justtrackio/gosoline/pkg/uuid" + "github.com/stretchr/testify/suite" +) + +type circuitBreakerTestSuite struct { + suite.Suite + method string + + ctx context.Context + logger *loggerMocks.Logger + clock clock.FakeClock + baseClient *httpMocks.Client + + client http.Client +} + +type outcome int + +const ( + outcomeSuccess outcome = iota + outcomeBadResponse + outcomeConnectionError + outcomeCanceled + outcomeLimited +) + +func TestCircuitBreakerClient(t *testing.T) { + for _, method := range []string{"Get", "Post", "Put", "Patch", "Delete"} { + suite.Run(t, &circuitBreakerTestSuite{ + method: method, + }) + } +} + +func (s *circuitBreakerTestSuite) SetupTest() { + s.ctx = context.Background() + s.logger = loggerMocks.NewLoggerMockedUntilLevel(log.PriorityWarn) + s.clock = clock.NewFakeClock() + s.baseClient = new(httpMocks.Client) + s.client = http.NewCircuitBreakerClientWithInterfaces(s.baseClient, s.logger, s.clock, "test", http.CircuitBreakerSettings{ + MaxFailures: 2, + RetryDelay: time.Minute, + ExpectedStatuses: []int{netHttp.StatusOK}, + }) +} + +func (s *circuitBreakerTestSuite) TearDownTest() { + s.logger.AssertExpectations(s.T()) + s.baseClient.AssertExpectations(s.T()) +} + +func (s *circuitBreakerTestSuite) TestSuccess() { + s.performRequest(outcomeSuccess) +} + +func (s *circuitBreakerTestSuite) TestSingleBadResponse() { + s.performRequest(outcomeBadResponse) +} + +func (s *circuitBreakerTestSuite) TestManyCanceledContext() { + for i := 0; i < 10; i++ { + s.performRequest(outcomeCanceled) + } +} + +func (s *circuitBreakerTestSuite) TestTriggerCircuitBreaker() { + // perform some failing requests + s.performRequest(outcomeBadResponse) + s.performRequest(outcomeConnectionError) + + // breaker triggered + for i := 0; i < 10; i++ { + s.performRequest(outcomeLimited) + } + + // advance time to try again + s.clock.Advance(time.Minute) + + // still failing, but one request comes through + s.performRequest(outcomeBadResponse) + + // all other requests are still rejected + for i := 0; i < 10; i++ { + s.performRequest(outcomeLimited) + } + + // advance time again + s.clock.Advance(time.Minute) + + // the first request opens the circuit again, now all requests succeed again + for i := 0; i < 10; i++ { + s.performRequest(outcomeSuccess) + } +} + +func (s *circuitBreakerTestSuite) performRequest(outcome outcome) { + req := http.NewRequest(nil).WithUrl("http://localhost/" + uuid.New().NewV4()) + + if outcome != outcomeLimited { + var response *http.Response + var err error + + switch outcome { + case outcomeSuccess: + response = &http.Response{ + StatusCode: netHttp.StatusOK, + } + case outcomeBadResponse: + response = &http.Response{ + StatusCode: netHttp.StatusBadRequest, + } + case outcomeConnectionError: + err = fmt.Errorf("conn error") + case outcomeCanceled: + err = context.Canceled + } + + s.baseClient.On(s.method, s.ctx, req).Return(response, err).Once() + } + + var res *http.Response + var err error + switch s.method { + case "Get": + res, err = s.client.Get(s.ctx, req) + case "Post": + res, err = s.client.Post(s.ctx, req) + case "Put": + res, err = s.client.Put(s.ctx, req) + case "Patch": + res, err = s.client.Patch(s.ctx, req) + case "Delete": + res, err = s.client.Delete(s.ctx, req) + default: + s.Fail("unknown method") + } + + switch outcome { + case outcomeSuccess: + s.NoError(err) + s.Equal(netHttp.StatusOK, res.StatusCode) + case outcomeBadResponse: + s.NoError(err) + s.Equal(netHttp.StatusBadRequest, res.StatusCode) + case outcomeConnectionError: + s.EqualError(err, "conn error") + s.Nil(res) + case outcomeCanceled: + s.EqualError(err, context.Canceled.Error()) + s.Nil(res) + case outcomeLimited: + s.EqualError(err, http.CircuitIsOpenError{}.Error()) + s.Nil(res) + default: + s.Fail("unknown outcome") + } +} diff --git a/pkg/http/client.go b/pkg/http/client.go index 96767cc44..fbd42ec28 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -10,6 +10,7 @@ import ( httpHeaders "github.com/go-http-utils/headers" "github.com/go-resty/resty/v2" + "github.com/justtrackio/gosoline/pkg/appctx" "github.com/justtrackio/gosoline/pkg/cfg" "github.com/justtrackio/gosoline/pkg/clock" "github.com/justtrackio/gosoline/pkg/log" @@ -66,35 +67,30 @@ type client struct { clock clock.Clock defaultHeaders headers http restyClient - mo metric.Writer + metricWriter metric.Writer } type Settings struct { - FollowRedirects bool `cfg:"follow_redirects" default:"true"` - RequestTimeout time.Duration `cfg:"request_timeout" default:"30s"` - RetryCount int `cfg:"retry_count" default:"5"` - RetryMaxWaitTime time.Duration `cfg:"retry_max_wait_time" default:"2000ms"` - RetryResetReaders bool `cfg:"retry_reset_readers" default:"true"` - RetryWaitTime time.Duration `cfg:"retry_wait_time" default:"100ms"` + FollowRedirects bool `cfg:"follow_redirects" default:"true"` + RequestTimeout time.Duration `cfg:"request_timeout" default:"30s"` + RetryCount int `cfg:"retry_count" default:"5"` + RetryMaxWaitTime time.Duration `cfg:"retry_max_wait_time" default:"2000ms"` + RetryResetReaders bool `cfg:"retry_reset_readers" default:"true"` + RetryWaitTime time.Duration `cfg:"retry_wait_time" default:"100ms"` + CircuitBreakerSettings CircuitBreakerSettings `cfg:"circuit_breaker"` } -func NewHttpClient(config cfg.Config, logger log.Logger) Client { - c := clock.NewRealClock() +func ProvideHttpClient(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Client, error) { + type httpClientName string - mo := metric.NewWriter() - - settings := &Settings{} - config.UnmarshalKey("http_client", settings) + return appctx.Provide(ctx, httpClientName(name), func() (Client, error) { + return newHttpClient(config, logger, name), nil + }) +} - // allow the old settings to be used for now... we will eventually remove this though - if config.IsSet("http_client_retry_count") { - settings.RetryCount = config.GetInt("http_client_retry_count") - logger.Warn("http_client_retry_count is deprecated, use http_client.retry_count instead") - } - if config.IsSet("http_client_request_timeout") { - settings.RequestTimeout = config.GetDuration("http_client_request_timeout") - logger.Warn("http_client_request_timeout is deprecated, use http_client.request_timeout instead") - } +func newHttpClient(config cfg.Config, logger log.Logger, name string) Client { + metricWriter := metric.NewWriter() + settings := UnmarshalClientSettings(config, name) httpClient := resty.New() if settings.FollowRedirects { @@ -110,16 +106,22 @@ func NewHttpClient(config cfg.Config, logger log.Logger) Client { httpClient.SetRetryMaxWaitTime(settings.RetryMaxWaitTime) httpClient.SetRetryResetReaders(settings.RetryResetReaders) - return NewHttpClientWithInterfaces(logger, c, mo, httpClient) + client := NewHttpClientWithInterfaces(logger, clock.Provider, metricWriter, httpClient) + + if settings.CircuitBreakerSettings.Enabled { + client = NewCircuitBreakerClientWithInterfaces(client, logger, clock.Provider, name, settings.CircuitBreakerSettings) + } + + return client } -func NewHttpClientWithInterfaces(logger log.Logger, c clock.Clock, mo metric.Writer, httpClient restyClient) Client { +func NewHttpClientWithInterfaces(logger log.Logger, clock clock.Clock, metricWriter metric.Writer, httpClient restyClient) Client { return &client{ logger: logger, - clock: c, + clock: clock, defaultHeaders: make(headers), http: httpClient, - mo: mo, + metricWriter: metricWriter, } } @@ -246,7 +248,7 @@ func (c *client) do(ctx context.Context, method string, request *Request) (*Resp response := buildResponse(resp, &totalDuration) // Only log the duration if we did not get an error. - // If we get an error, we might not actually have send anything, + // If we get an error, we might not actually have sent anything, // so the duration will be very low. If we get back an error (e.g., status 500), // we log the duration as this is just a valid http response. requestDurationMs := float64(resp.Time() / time.Millisecond) @@ -256,7 +258,7 @@ func (c *client) do(ctx context.Context, method string, request *Request) (*Resp } func (c *client) writeMetric(metricName string, method string, unit metric.StandardUnit, value float64) { - c.mo.WriteOne(&metric.Datum{ + c.metricWriter.WriteOne(&metric.Datum{ Priority: metric.PriorityHigh, Timestamp: time.Now(), MetricName: metricName, @@ -282,3 +284,47 @@ func buildResponse(resp *resty.Response, totalDuration *time.Duration) *Response TotalDuration: totalDuration, } } + +func GetClientConfigKey(name string) string { + return fmt.Sprintf("http_client.%s", name) +} + +func UnmarshalClientSettings(config cfg.Config, name string) Settings { + // notify about wrong old settings being used + if config.IsSet("http_client_retry_count") { + panic("'http_client_retry_count' was removed, use 'http_client.default.retry_count' instead") + } + if config.IsSet("http_client_request_timeout") { + panic("'http_client_request_timeout' was removed, use 'http_client.default.request_timeout' instead") + } + if config.IsSet("http_client.follow_redirects") { + panic("'http_client.follow_redirects' was removed, use 'http_client.default.follow_redirects' instead") + } + if config.IsSet("http_client.request_timeout") { + panic("'http_client.request_timeout' was removed, use 'http_client.default.request_timeout' instead") + } + if config.IsSet("http_client.retry_count") { + panic("'http_client.retry_count' was removed, use 'http_client.default.retry_count' instead") + } + if config.IsSet("http_client.retry_max_wait_time") { + panic("'http_client.retry_max_wait_time' was removed, use 'http_client.default.retry_max_wait_time' instead") + } + if config.IsSet("http_client.retry_reset_readers") { + panic("'http_client.retry_reset_readers' was removed, use 'http_client.default.retry_reset_readers' instead") + } + if config.IsSet("http_client.retry_wait_time") { + panic("'http_client.retry_wait_time' was removed, use 'http_client.default.retry_wait_time' instead") + } + + if name == "" { + name = "default" + } + + clientsKey := GetClientConfigKey(name) + defaultClientKey := GetClientConfigKey("default") + + var settings Settings + config.UnmarshalKey(clientsKey, &settings, cfg.UnmarshalWithDefaultsFromKey(defaultClientKey, ".")) + + return settings +} diff --git a/pkg/http/client_test.go b/pkg/http/client_test.go index 5dc004165..da1767513 100644 --- a/pkg/http/client_test.go +++ b/pkg/http/client_test.go @@ -9,11 +9,11 @@ import ( "testing" "time" - cfgMocks "github.com/justtrackio/gosoline/pkg/cfg/mocks" + "github.com/justtrackio/gosoline/pkg/appctx" + "github.com/justtrackio/gosoline/pkg/cfg" "github.com/justtrackio/gosoline/pkg/http" logMocks "github.com/justtrackio/gosoline/pkg/log/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) // A copy of context.emptyCtx @@ -31,7 +31,7 @@ func (m *myContext) Err() error { return nil } -func (m *myContext) Value(key interface{}) interface{} { +func (m *myContext) Value(_ interface{}) interface{} { return nil } @@ -48,30 +48,37 @@ func runTestServer(t *testing.T, method string, status int, delay time.Duration, test(testServer.Listener.Addr().String()) } -func getConfig(retries int, timeout time.Duration) *cfgMocks.Config { - config := new(cfgMocks.Config) - config.On("UnmarshalKey", "http_client", &http.Settings{}).Run(func(args mock.Arguments) { - config := args.Get(1).(*http.Settings) - *config = http.Settings{ - RequestTimeout: timeout, - RetryCount: retries, - RetryWaitTime: 100 * time.Millisecond, - RetryMaxWaitTime: 200 * time.Millisecond, - FollowRedirects: true, - } - }) - config.On("IsSet", "http_client_retry_count").Return(false) - config.On("IsSet", "http_client_request_timeout").Return(false) +func getConfig(t *testing.T, retries int, timeout time.Duration) cfg.Config { + config := cfg.New() + err := config.Option(cfg.WithConfigMap(map[string]interface{}{ + "http_client": map[string]interface{}{ + "default": map[string]interface{}{ + "request_timeout": timeout, + "retry_count": retries, + "retry_max_wait_time": "200ms", + }, + }, + })) + assert.NoError(t, err) return config } -func TestClient_Delete(t *testing.T) { - config := getConfig(1, time.Second) +func getClient(t *testing.T, retries int, timeout time.Duration) http.Client { + ctx := appctx.WithContainer(context.Background()) + config := getConfig(t, retries, timeout) + logger := logMocks.NewLoggerMockedAll() + client, err := http.ProvideHttpClient(ctx, config, logger, "default") + assert.NoError(t, err) + + return client +} + +func TestClient_Delete(t *testing.T) { runTestServer(t, "DELETE", 200, 0, func(host string) { - client := http.NewHttpClient(config, logger) + client := getClient(t, 1, time.Second) request := client.NewRequest(). WithUrl(fmt.Sprintf("http://%s", host)) response, err := client.Delete(context.TODO(), request) @@ -79,16 +86,11 @@ func TestClient_Delete(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 200, response.StatusCode) }) - - config.AssertExpectations(t) } func TestClient_Get(t *testing.T) { - config := getConfig(1, time.Second) - logger := logMocks.NewLoggerMockedAll() - runTestServer(t, "GET", 200, 0, func(host string) { - client := http.NewHttpClient(config, logger) + client := getClient(t, 1, time.Second) request := client.NewRequest(). WithUrl(fmt.Sprintf("http://%s", host)) response, err := client.Get(context.TODO(), request) @@ -96,16 +98,11 @@ func TestClient_Get(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 200, response.StatusCode) }) - - config.AssertExpectations(t) } func TestClient_GetTimeout(t *testing.T) { - config := getConfig(0, time.Second) - logger := logMocks.NewLoggerMockedAll() - runTestServer(t, "GET", 200, 1100*time.Millisecond, func(host string) { - client := http.NewHttpClient(config, logger) + client := getClient(t, 0, time.Second) request := client.NewRequest(). WithUrl(fmt.Sprintf("http://%s", host)) response, err := client.Get(context.TODO(), request) @@ -113,19 +110,14 @@ func TestClient_GetTimeout(t *testing.T) { assert.Error(t, err) assert.Nil(t, response) }) - - config.AssertExpectations(t) } func TestClient_GetCanceled(t *testing.T) { - config := getConfig(1, time.Second) - logger := logMocks.NewLoggerMockedAll() - baseCtx := myContext(0) ctx, cancel := context.WithCancel(&baseCtx) runTestServer(t, "GET", 200, 200*time.Millisecond, func(host string) { - client := http.NewHttpClient(config, logger) + client := getClient(t, 1, time.Second) request := client.NewRequest(). WithUrl(fmt.Sprintf("http://%s", host)) go func() { @@ -138,16 +130,11 @@ func TestClient_GetCanceled(t *testing.T) { assert.True(t, errors.Is(err, context.Canceled)) assert.Nil(t, response) }) - - config.AssertExpectations(t) } func TestClient_Post(t *testing.T) { - config := getConfig(1, time.Second) - logger := logMocks.NewLoggerMockedAll() - runTestServer(t, "POST", 200, 0, func(host string) { - client := http.NewHttpClient(config, logger) + client := getClient(t, 1, time.Second) request := client.NewRequest(). WithUrl(fmt.Sprintf("http://%s", host)) response, err := client.Post(context.TODO(), request) @@ -155,6 +142,4 @@ func TestClient_Post(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 200, response.StatusCode) }) - - config.AssertExpectations(t) } diff --git a/pkg/ipinfo/client.go b/pkg/ipinfo/client.go index a3b6e4384..7b0966846 100644 --- a/pkg/ipinfo/client.go +++ b/pkg/ipinfo/client.go @@ -49,12 +49,15 @@ func ProvideIpInfo(ctx context.Context, config cfg.Config, logger log.Logger) (* func ProvideClient(ctx context.Context, config cfg.Config, logger log.Logger) (*Client, error) { return appctx.Provide(ctx, ipInfoClientCtxKey("default"), func() (*Client, error) { - return NewClient(config, logger) + return NewClient(ctx, config, logger) }) } -func NewClient(config cfg.Config, logger log.Logger) (*Client, error) { - httpClient := http.NewHttpClient(config, logger) +func NewClient(ctx context.Context, config cfg.Config, logger log.Logger) (*Client, error) { + httpClient, err := http.ProvideHttpClient(ctx, config, logger, "ipInfo") + if err != nil { + return nil, fmt.Errorf("can not create http client: %w", err) + } return &Client{logger: logger, http: httpClient}, nil } diff --git a/pkg/oauth2/google.go b/pkg/oauth2/google.go index aaf581e28..a0201bc68 100644 --- a/pkg/oauth2/google.go +++ b/pkg/oauth2/google.go @@ -47,10 +47,13 @@ type GoogleService struct { httpClient http.Client } -func NewGoogleService(config cfg.Config, logger log.Logger) Service { - httpClient := http.NewHttpClient(config, logger) +func NewGoogleService(ctx context.Context, config cfg.Config, logger log.Logger) (Service, error) { + httpClient, err := http.ProvideHttpClient(ctx, config, logger, "oauthGoogleService") + if err != nil { + return nil, fmt.Errorf("can not create http client: %w", err) + } - return NewGoogleServiceWithInterfaces(httpClient) + return NewGoogleServiceWithInterfaces(httpClient), nil } func NewGoogleServiceWithInterfaces(httpClient http.Client) Service {