Skip to content

Commit

Permalink
http: added basic circuit breaker functionality and provide http clie…
Browse files Browse the repository at this point in the history
…nts;

This replaces `NewHttpClient` with `ProvideHttpClient` (requiring now a
context and name additionally to the config and logger). Http clients
are now named and use the same fallback logic as we use for AWS services
(look for the named client, then the "default" client, then use
hardcoded defaults). This allows you to use HTTP clients in your code
with different settings from the config.

Additionally, a new section `circuit_breaker` was added to the config.
The circuit breaker is disabled by default. When enabled, it will stop
requests to a target to be sent for a configurable time after a
configurable threshold of error responses was reached. From time to time
a single request is retried to automatically resume sending requests
again once the remote side is healthy again.

This is a breaking change (code and config wise). The following needs to
be changed in the config. Old:

```yml
http_client:
  follow_redirects: true
  request_timeout: 30s
  retry_count: 5
  retry_max_wait_time: 2s
  retry_reset_readers: true
  retry_wait_time: 100ms
```

New:

```yml
http_client:
  default:
    follow_redirects: true
    request_timeout: 30s
    retry_count: 5
    retry_max_wait_time: 2s
    retry_reset_readers: true
    retry_wait_time: 100ms
    circuit_breaker:
      enabled: false
      max_failures: 10
      retry_delay: 1m
      expected_statuses: []
```
  • Loading branch information
ajscholl committed May 10, 2023
1 parent 8c88016 commit 667a0af
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 81 deletions.
5 changes: 4 additions & 1 deletion pkg/currency/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func NewUpdater(ctx context.Context, config cfg.Config, logger log.Logger) (Upda
return nil, fmt.Errorf("can not create kvStore: %w", err)
}

httpClient := http.NewHttpClient(config, logger)
httpClient, err := http.ProvideHttpClient(ctx, config, logger, "currencyUpdater")
if err != nil {
return nil, fmt.Errorf("can not create http client: %w", err)
}

return NewUpdaterWithInterfaces(logger, store, httpClient, clock.Provider), nil
}
Expand Down
132 changes: 132 additions & 0 deletions pkg/http/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package http

import (
"context"
"sync/atomic"
"time"

"github.com/justtrackio/gosoline/pkg/clock"
"github.com/justtrackio/gosoline/pkg/exec"
"github.com/justtrackio/gosoline/pkg/funk"
"github.com/justtrackio/gosoline/pkg/log"
)

type CircuitBreakerSettings struct {
Enabled bool `cfg:"enabled" default:"false"`
MaxFailures int64 `cfg:"max_failures" default:"10"`
RetryDelay time.Duration `cfg:"retry_delay" default:"1m"`
ExpectedStatuses []int `cfg:"expected_statuses"`
}

type CircuitIsOpenError struct {
}

func (c CircuitIsOpenError) Error() string {
return "request rejected, circuit breaker is open"
}

type circuitBreakerClient struct {
Client
logger log.Logger
clock clock.Clock
name string
settings CircuitBreakerSettings

// state updated with atomics
recentFailures int64
nextRetryAt int64
}

func NewCircuitBreakerClientWithInterfaces(baseClient Client, logger log.Logger, clock clock.Clock, name string, settings CircuitBreakerSettings) Client {
return &circuitBreakerClient{
Client: baseClient,
logger: logger.WithChannel("circuit-breaker-client-" + name),
clock: clock,
name: name,
settings: settings,
recentFailures: 0,
nextRetryAt: 0,
}
}

func (c *circuitBreakerClient) Delete(ctx context.Context, request *Request) (*Response, error) {
return c.doRequest(ctx, request, c.Client.Delete)
}

func (c *circuitBreakerClient) Get(ctx context.Context, request *Request) (*Response, error) {
return c.doRequest(ctx, request, c.Client.Get)
}

func (c *circuitBreakerClient) Patch(ctx context.Context, request *Request) (*Response, error) {
return c.doRequest(ctx, request, c.Client.Patch)
}

func (c *circuitBreakerClient) Post(ctx context.Context, request *Request) (*Response, error) {
return c.doRequest(ctx, request, c.Client.Post)
}

func (c *circuitBreakerClient) Put(ctx context.Context, request *Request) (*Response, error) {
return c.doRequest(ctx, request, c.Client.Put)
}

func (c *circuitBreakerClient) doRequest(ctx context.Context, request *Request, performRequest func(ctx context.Context, request *Request) (*Response, error)) (*Response, error) {
// check if we have too many recent failures
recentFailures := atomic.LoadInt64(&c.recentFailures)
if recentFailures >= c.settings.MaxFailures {
// we have too many failures. check if we can retry anyway
nextRetryAt := atomic.LoadInt64(&c.nextRetryAt)
now := c.clock.Now().UnixMilli()
canRetry := false
if nextRetryAt <= now {
// we can retry. swap the retry value with the next time we can retry. if we lose the race with
// another thread, we will not swap and must not retry
canRetry = atomic.CompareAndSwapInt64(&c.nextRetryAt, nextRetryAt, now+c.settings.RetryDelay.Milliseconds())

if canRetry {
c.logger.WithContext(ctx).Info("trying to close circuit breaker again by trying single request")
}
}

if !canRetry {
return nil, CircuitIsOpenError{}
}
}

// perform the request and reset the failure counter should we succeed
response, err := performRequest(ctx, request)
if !c.isRemoteFailure(response, err) {
// only reset the counter if the request was successful (ignore e.g. context canceled errors, they are not successful)
if !exec.IsRequestCanceled(err) {
oldFailures := atomic.SwapInt64(&c.recentFailures, 0)
if oldFailures > 0 {
c.logger.WithContext(ctx).Info("reset failure counter of circuit breaker again")
}
}

return response, err
}

// ensure we at most retry after the retry delay
newNextRetryAt := c.clock.Now().Add(c.settings.RetryDelay).UnixMilli()
atomic.StoreInt64(&c.nextRetryAt, newNextRetryAt)

// only now count up the recent failures - so nextRetryAt is set up already when we check it
newFailures := atomic.AddInt64(&c.recentFailures, 1)
if newFailures == c.settings.MaxFailures {
c.logger.WithContext(ctx).Warn("circuit breaker triggered, stopping requests for %v", c.settings.RetryDelay)
}

return response, err
}

func (c *circuitBreakerClient) isRemoteFailure(response *Response, err error) bool {
if err != nil {
return !exec.IsRequestCanceled(err)
}

if len(c.settings.ExpectedStatuses) == 0 {
return false
}

return !funk.Contains(c.settings.ExpectedStatuses, response.StatusCode)
}
171 changes: 171 additions & 0 deletions pkg/http/circuit_breaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package http_test

import (
"context"
"fmt"
netHttp "net/http"
"testing"
"time"

"github.com/justtrackio/gosoline/pkg/clock"
"github.com/justtrackio/gosoline/pkg/http"
httpMocks "github.com/justtrackio/gosoline/pkg/http/mocks"
"github.com/justtrackio/gosoline/pkg/log"
loggerMocks "github.com/justtrackio/gosoline/pkg/log/mocks"
"github.com/justtrackio/gosoline/pkg/uuid"
"github.com/stretchr/testify/suite"
)

type circuitBreakerTestSuite struct {
suite.Suite
method string

ctx context.Context
logger *loggerMocks.Logger
clock clock.FakeClock
baseClient *httpMocks.Client

client http.Client
}

type outcome int

const (
outcomeSuccess outcome = iota
outcomeBadResponse
outcomeConnectionError
outcomeCanceled
outcomeLimited
)

func TestCircuitBreakerClient(t *testing.T) {
for _, method := range []string{"Get", "Post", "Put", "Patch", "Delete"} {
suite.Run(t, &circuitBreakerTestSuite{
method: method,
})
}
}

func (s *circuitBreakerTestSuite) SetupTest() {
s.ctx = context.Background()
s.logger = loggerMocks.NewLoggerMockedUntilLevel(log.PriorityWarn)
s.clock = clock.NewFakeClock()
s.baseClient = new(httpMocks.Client)
s.client = http.NewCircuitBreakerClientWithInterfaces(s.baseClient, s.logger, s.clock, "test", http.CircuitBreakerSettings{
MaxFailures: 2,
RetryDelay: time.Minute,
ExpectedStatuses: []int{netHttp.StatusOK},
})
}

func (s *circuitBreakerTestSuite) TearDownTest() {
s.logger.AssertExpectations(s.T())
s.baseClient.AssertExpectations(s.T())
}

func (s *circuitBreakerTestSuite) TestSuccess() {
s.performRequest(outcomeSuccess)
}

func (s *circuitBreakerTestSuite) TestSingleBadResponse() {
s.performRequest(outcomeBadResponse)
}

func (s *circuitBreakerTestSuite) TestManyCanceledContext() {
for i := 0; i < 10; i++ {
s.performRequest(outcomeCanceled)
}
}

func (s *circuitBreakerTestSuite) TestTriggerCircuitBreaker() {
// perform some failing requests
s.performRequest(outcomeBadResponse)
s.performRequest(outcomeConnectionError)

// breaker triggered
for i := 0; i < 10; i++ {
s.performRequest(outcomeLimited)
}

// advance time to try again
s.clock.Advance(time.Minute)

// still failing, but one request comes through
s.performRequest(outcomeBadResponse)

// all other requests are still rejected
for i := 0; i < 10; i++ {
s.performRequest(outcomeLimited)
}

// advance time again
s.clock.Advance(time.Minute)

// the first request opens the circuit again, now all requests succeed again
for i := 0; i < 10; i++ {
s.performRequest(outcomeSuccess)
}
}

func (s *circuitBreakerTestSuite) performRequest(outcome outcome) {
req := http.NewRequest(nil).WithUrl("http://localhost/" + uuid.New().NewV4())

if outcome != outcomeLimited {
var response *http.Response
var err error

switch outcome {
case outcomeSuccess:
response = &http.Response{
StatusCode: netHttp.StatusOK,
}
case outcomeBadResponse:
response = &http.Response{
StatusCode: netHttp.StatusBadRequest,
}
case outcomeConnectionError:
err = fmt.Errorf("conn error")
case outcomeCanceled:
err = context.Canceled
}

s.baseClient.On(s.method, s.ctx, req).Return(response, err).Once()
}

var res *http.Response
var err error
switch s.method {
case "Get":
res, err = s.client.Get(s.ctx, req)
case "Post":
res, err = s.client.Post(s.ctx, req)
case "Put":
res, err = s.client.Put(s.ctx, req)
case "Patch":
res, err = s.client.Patch(s.ctx, req)
case "Delete":
res, err = s.client.Delete(s.ctx, req)
default:
s.Fail("unknown method")
}

switch outcome {
case outcomeSuccess:
s.NoError(err)
s.Equal(netHttp.StatusOK, res.StatusCode)
case outcomeBadResponse:
s.NoError(err)
s.Equal(netHttp.StatusBadRequest, res.StatusCode)
case outcomeConnectionError:
s.EqualError(err, "conn error")
s.Nil(res)
case outcomeCanceled:
s.EqualError(err, context.Canceled.Error())
s.Nil(res)
case outcomeLimited:
s.EqualError(err, http.CircuitIsOpenError{}.Error())
s.Nil(res)
default:
s.Fail("unknown outcome")
}
}
Loading

0 comments on commit 667a0af

Please sign in to comment.