Skip to content

Commit

Permalink
[ADDED] Exponential backoff for server reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Feb 28, 2023
1 parent 7917595 commit 20fb507
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 35 deletions.
68 changes: 52 additions & 16 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ const (
Version = "1.24.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
DefaultReconnectWait = 2 * time.Second
DefaultMaxReconnect = -1
DefaultReconnectJitter = 100 * time.Millisecond
DefaultReconnectJitterTLS = time.Second
DefaultTimeout = 2 * time.Second
Expand All @@ -62,6 +61,10 @@ const (
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
LangString = "go"

// DEPRECATED: Client now uses [nats.DefaultReconnectBackoffHandler] to
// handle default reconnect wait time.
DefaultReconnectWait = 2 * time.Second
)

const (
Expand Down Expand Up @@ -143,17 +146,17 @@ var (
// GetDefaultOptions returns default configuration options for the client.
func GetDefaultOptions() Options {
return Options{
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectWait: DefaultReconnectWait,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
AllowReconnect: true,
MaxReconnect: DefaultMaxReconnect,
ReconnectJitter: DefaultReconnectJitter,
ReconnectJitterTLS: DefaultReconnectJitterTLS,
Timeout: DefaultTimeout,
PingInterval: DefaultPingInterval,
MaxPingsOut: DefaultMaxPingOut,
SubChanLen: DefaultMaxChanLen,
ReconnectBufSize: DefaultReconnectBufSize,
DrainTimeout: DefaultDrainTimeout,
IgnoreAuthErrorAbort: true,
}
}

Expand Down Expand Up @@ -470,6 +473,7 @@ type Options struct {

// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
// DEPRECATED: This option will be removed in future releases.
IgnoreAuthErrorAbort bool

// SkipHostLookup skips the DNS lookup for the server hostname.
Expand Down Expand Up @@ -1260,13 +1264,22 @@ func CustomInboxPrefix(p string) Option {

// IgnoreAuthErrorAbort opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice.
// DEPRECATED: This option is now set to 'true' by default, therefore this option will be removed in future releases.
func IgnoreAuthErrorAbort() Option {
return func(o *Options) error {
o.IgnoreAuthErrorAbort = true
return nil
}
}

// AbortOnAuthErrors causes the client to bail out after 2 subsequent auth connection errors.
func AbortOnAuthErrors() Option {
return func(o *Options) error {
o.IgnoreAuthErrorAbort = false
return nil
}
}

// SkipHostLookup is an Option to skip the host lookup when connecting to a server.
func SkipHostLookup() Option {
return func(o *Options) error {
Expand Down Expand Up @@ -2559,6 +2572,28 @@ func (nc *Conn) stopPingTimer() {
}
}

// DefaultReconnectBackoffHandler returns a default reconnect exponential backoff interval.
// Base reconnect wait is 10ms, with x2 multiplier. Max wait time is 2 minutes.
// 10ms, 20ms, 40ms, 80ms...2m
// A random jitter is added to the result.
func DefaultReconnectBackoffHandler(jitter time.Duration) ReconnectDelayHandler {
return func(attempts int) time.Duration {
// base interval is 10ms
backoff := 10 * time.Millisecond
for i := 0; i < attempts-1; i++ {
backoff *= 2
if backoff > 2*time.Minute {
backoff = 2 * time.Minute
break
}
}
if jitter > 0 {
jitter = time.Duration(rand.Int63n(int64(jitter)))
}
return backoff + jitter
}
}

// Try to reconnect using the option parameters.
// This function assumes we are allowed to reconnect.
func (nc *Conn) doReconnect(err error) {
Expand Down Expand Up @@ -2596,18 +2631,19 @@ func (nc *Conn) doReconnect(err error) {
var wlf int

var jitter time.Duration
var rw time.Duration
// If a custom reconnect delay handler is set, this takes precedence.
crd := nc.Opts.CustomReconnectDelayCB
if crd == nil {
rw = nc.Opts.ReconnectWait
rw := nc.Opts.ReconnectWait
if crd == nil && rw == 0 {
// TODO: since we sleep only after the whole list has been tried, we can't
// rely on individual *srv to know if it is a TLS or non-TLS url.
// We have to pick which type of jitter to use, for now, we use these hints:
jitter = nc.Opts.ReconnectJitter
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
jitter = nc.Opts.ReconnectJitterTLS
}

crd = DefaultReconnectBackoffHandler(jitter)
}

for i := 0; len(nc.srvPool) > 0; {
Expand Down
61 changes: 53 additions & 8 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ func TestSelectNextServer(t *testing.T) {
opts := GetDefaultOptions()
opts.Servers = testServers
opts.NoRandomize = true
opts.MaxReconnect = 60
nc := &Conn{Opts: opts}
if err := nc.setupServerPool(); err != nil {
t.Fatalf("Problem setting up Server Pool: %v\n", err)
Expand Down Expand Up @@ -1609,14 +1610,14 @@ func TestExpiredAuthentication(t *testing.T) {
name string
expectedProto string
expectedErr error
ignoreAbort bool
withAuthAbort bool
}{
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, true},
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, true},
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, true},
{"expired users credentials, abort connection", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
{"revoked users credentials, abort connection", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
{"expired account, abort connection", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
} {
t.Run(test.name, func(t *testing.T) {
l, e := net.Listen("tcp", "127.0.0.1:0")
Expand Down Expand Up @@ -1678,16 +1679,16 @@ func TestExpiredAuthentication(t *testing.T) {
ch <- true
}),
}
if test.ignoreAbort {
opts = append(opts, IgnoreAuthErrorAbort())
if test.withAuthAbort {
opts = append(opts, AbortOnAuthErrors())
}
nc, err := Connect(url, opts...)
if err != nil {
t.Fatalf("Expected to connect, got %v", err)
}
defer nc.Close()

if test.ignoreAbort {
if !test.withAuthAbort {
// We expect more than 3 errors, as the connect attempt should not be aborted after 2 failed attempts.
for i := 0; i < 4; i++ {
select {
Expand Down Expand Up @@ -2171,7 +2172,7 @@ func BenchmarkNextMsgNoTimeout(b *testing.B) {
}
}

func TestAuthErrorOnReconnect(t *testing.T) {
func TestAuthErrorOnReconnectWithAuthErrorAbort(t *testing.T) {
// This is a bit of an artificial test, but it is to demonstrate
// that if the client is disconnected from a server (not due to an auth error),
// it will still correctly stop the reconnection logic if it gets twice an
Expand Down Expand Up @@ -2199,6 +2200,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
MaxReconnects(-1),
DontRandomize(),
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
AbortOnAuthErrors(),
DisconnectErrHandler(func(_ *Conn, e error) {
dch <- true
}),
Expand Down Expand Up @@ -2948,6 +2950,49 @@ func TestInProcessConn(t *testing.T) {
}
}

func TestDefaultReconnectBackoffHandler(t *testing.T) {
tests := []struct {
name string
attempts int
jitter time.Duration
expectedRange []time.Duration
}{
{
name: "4 attempts, no jitter",
attempts: 4,
expectedRange: []time.Duration{80 * time.Millisecond},
},
{
name: "1 attempt, no jitter, return base value",
attempts: 1,
expectedRange: []time.Duration{10 * time.Millisecond},
},
{
name: "100 attempts, no jitter, return max",
attempts: 100,
expectedRange: []time.Duration{2 * time.Minute},
},
{
name: "4 attempts, with jitter",
attempts: 4,
jitter: 20 * time.Millisecond,
expectedRange: []time.Duration{80 * time.Millisecond, 99 * time.Millisecond},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cb := DefaultReconnectBackoffHandler(test.jitter)
res := cb(test.attempts)
if test.jitter == 0 {
if res != test.expectedRange[0] {
t.Fatalf("Invalid result; want: %s; got: %s", test.expectedRange[0], res)
}
}
})
}
}

func TestServerListWithTrailingComma(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()
Expand Down
1 change: 1 addition & 0 deletions test/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2564,6 +2564,7 @@ func TestRetryOnFailedConnect(t *testing.T) {
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
nats.ReconnectWait(15*time.Millisecond),
nats.AbortOnAuthErrors(),
nats.ReconnectHandler(func(_ *nats.Conn) {
ch <- true
}),
Expand Down
2 changes: 1 addition & 1 deletion test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8629,7 +8629,7 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
}
hbMissed <- struct{}{}
}
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler), nats.ReconnectWait(500*time.Millisecond))
defer nc.Close()

if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {
Expand Down
10 changes: 0 additions & 10 deletions test/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ func startReconnectServer(t *testing.T) *server.Server {
return RunServerOnPort(22222)
}

func TestReconnectTotalTime(t *testing.T) {
opts := nats.GetDefaultOptions()
totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait
if totalReconnectTime < (2 * time.Minute) {
t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n",
totalReconnectTime)
}
}

func TestDefaultReconnectJitter(t *testing.T) {
opts := nats.GetDefaultOptions()
if opts.ReconnectJitter != nats.DefaultReconnectJitter {
Expand Down Expand Up @@ -123,7 +114,6 @@ var reconnectOpts = nats.Options{
Url: "nats://127.0.0.1:22222",
AllowReconnect: true,
MaxReconnect: 10,
ReconnectWait: 100 * time.Millisecond,
Timeout: nats.DefaultTimeout,
}

Expand Down

0 comments on commit 20fb507

Please sign in to comment.