Skip to content

Commit

Permalink
Merge pull request #86 from TylerGillson/feat-nats-connect-retries
Browse files Browse the repository at this point in the history
feat: configurable NATS connection retries
  • Loading branch information
maxpert authored Oct 31, 2023
2 parents be02273 + 04bbcfe commit 1b32206
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 32 deletions.
48 changes: 26 additions & 22 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ const NodeNamePrefix = "marmot-node"
const EmbeddedClusterName = "e-marmot"
const (
Nats SnapshotStoreType = "nats"
S3 = "s3"
WebDAV = "webdav"
SFTP = "sftp"
S3 SnapshotStoreType = "s3"
WebDAV SnapshotStoreType = "webdav"
SFTP SnapshotStoreType = "sftp"
)

type ReplicationLogConfiguration struct {
Expand Down Expand Up @@ -67,17 +67,19 @@ type SnapshotConfiguration struct {
}

type NATSConfiguration struct {
URLs []string `toml:"urls"`
SubjectPrefix string `toml:"subject_prefix"`
StreamPrefix string `toml:"stream_prefix"`
ServerConfigFile string `toml:"server_config"`
SeedFile string `toml:"seed_file"`
CredsUser string `toml:"user_name"`
CredsPassword string `toml:"user_password"`
CAFile string `toml:"ca_file"`
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
BindAddress string `toml:"bind_address"`
URLs []string `toml:"urls"`
SubjectPrefix string `toml:"subject_prefix"`
StreamPrefix string `toml:"stream_prefix"`
ServerConfigFile string `toml:"server_config"`
SeedFile string `toml:"seed_file"`
CredsUser string `toml:"user_name"`
CredsPassword string `toml:"user_password"`
CAFile string `toml:"ca_file"`
CertFile string `toml:"cert_file"`
KeyFile string `toml:"key_file"`
BindAddress string `toml:"bind_address"`
ConnectRetries int `toml:"connect_retries"`
ReconnectWaitSeconds int `toml:"reconnect_wait_seconds"`
}

type LoggingConfiguration struct {
Expand Down Expand Up @@ -142,14 +144,16 @@ var Config = &Configuration{
},

NATS: NATSConfiguration{
URLs: []string{},
SubjectPrefix: "marmot-change-log",
StreamPrefix: "marmot-changes",
ServerConfigFile: "",
SeedFile: "",
CredsPassword: "",
CredsUser: "",
BindAddress: "0.0.0.0:4222",
URLs: []string{},
SubjectPrefix: "marmot-change-log",
StreamPrefix: "marmot-changes",
ServerConfigFile: "",
SeedFile: "",
CredsPassword: "",
CredsUser: "",
BindAddress: "0.0.0.0:4222",
ConnectRetries: 5,
ReconnectWaitSeconds: 2,
},

Logging: LoggingConfiguration{
Expand Down
7 changes: 6 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ urls=[
]
# Embedded server bind address
bind_address="0.0.0.0:4222"
# Embedded server config file (will be only used if URLs array is empty)
# Embedded server config file (will only be used if URLs array is empty)
server_config=""
# Subject prefix used when publishing log entries, it's usually suffixed by shard number
# to get the full subject name
Expand All @@ -137,6 +137,11 @@ seed_file=""
# User credentials used for plain user password authentication
user_name=""
user_password=""
# Number of retries when establishing the NATS server connection (will only be used if URLs array is not empty)
connect_retries=5
# Wait time between NATS reconnect attempts (will only be used if URLs array is not empty)
reconnect_wait_seconds=2


# Console STDOUT configurations
[logging]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/rs/zerolog v1.29.1
github.com/samber/lo v1.38.1
github.com/studio-b12/gowebdav v0.9.0
golang.org/x/crypto v0.11.0
)

require (
Expand All @@ -42,7 +43,6 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/objx v0.1.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
Expand Down
52 changes: 44 additions & 8 deletions stream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (

"github.com/maxpert/marmot/cfg"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)

func Connect() (*nats.Conn, error) {
opts := []nats.Option{
nats.Name(cfg.Config.NodeName()),
nats.Timeout(10 * time.Second),
}
opts := setupConnOptions()

creds, err := getNatsAuthFromConfig()
if err != nil {
Expand All @@ -35,10 +33,24 @@ func Connect() (*nats.Conn, error) {
return embedded.prepareConnection(opts...)
}

return nats.Connect(
strings.Join(cfg.Config.NATS.URLs, ", "),
opts...,
)
url := strings.Join(cfg.Config.NATS.URLs, ", ")

var conn *nats.Conn
for i := 0; i < cfg.Config.NATS.ConnectRetries; i++ {
conn, err = nats.Connect(url, opts...)
if err == nil && conn.Status() == nats.CONNECTED {
break
}

log.Warn().
Err(err).
Int("attempt", i+1).
Int("attempt_limit", cfg.Config.NATS.ConnectRetries).
Str("status", conn.Status().String()).
Msg("NATS connection failed")
}

return conn, err
}

func getNatsAuthFromConfig() ([]nats.Option, error) {
Expand Down Expand Up @@ -76,3 +88,27 @@ func getNatsTLSFromConfig() ([]nats.Option, error) {

return opts, nil
}

func setupConnOptions() []nats.Option {
return []nats.Option{
nats.Name(cfg.Config.NodeName()),
nats.RetryOnFailedConnect(true),
nats.ReconnectWait(time.Duration(cfg.Config.NATS.ReconnectWaitSeconds) * time.Second),
nats.MaxReconnects(cfg.Config.NATS.ConnectRetries),
nats.ClosedHandler(func(nc *nats.Conn) {
log.Fatal().
Err(nc.LastError()).
Msg("NATS client exiting")
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Error().
Err(err).
Msg("NATS client disconnected")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Info().
Str("url", nc.ConnectedUrl()).
Msg("NATS client reconnected")
}),
}
}

0 comments on commit 1b32206

Please sign in to comment.