diff --git a/cfg/config.go b/cfg/config.go index d1f16db..de5ec9a 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -20,9 +20,9 @@ const NodeNamePrefix = "marmot-node" const EmbeddedClusterName = "e-marmot" const ( Nats SnapshotStoreType = "nats" - S3 = "s3" - WebDAV = "webdav" - SFTP = "sftp" + S3 SnapshotStoreType = "s3" + WebDAV SnapshotStoreType = "webdav" + SFTP SnapshotStoreType = "sftp" ) type ReplicationLogConfiguration struct { @@ -67,17 +67,19 @@ type SnapshotConfiguration struct { } type NATSConfiguration struct { - URLs []string `toml:"urls"` - SubjectPrefix string `toml:"subject_prefix"` - StreamPrefix string `toml:"stream_prefix"` - ServerConfigFile string `toml:"server_config"` - SeedFile string `toml:"seed_file"` - CredsUser string `toml:"user_name"` - CredsPassword string `toml:"user_password"` - CAFile string `toml:"ca_file"` - CertFile string `toml:"cert_file"` - KeyFile string `toml:"key_file"` - BindAddress string `toml:"bind_address"` + URLs []string `toml:"urls"` + SubjectPrefix string `toml:"subject_prefix"` + StreamPrefix string `toml:"stream_prefix"` + ServerConfigFile string `toml:"server_config"` + SeedFile string `toml:"seed_file"` + CredsUser string `toml:"user_name"` + CredsPassword string `toml:"user_password"` + CAFile string `toml:"ca_file"` + CertFile string `toml:"cert_file"` + KeyFile string `toml:"key_file"` + BindAddress string `toml:"bind_address"` + ConnectRetries int `toml:"connect_retries"` + ReconnectWaitSeconds int `toml:"reconnect_wait_seconds"` } type LoggingConfiguration struct { @@ -142,14 +144,16 @@ var Config = &Configuration{ }, NATS: NATSConfiguration{ - URLs: []string{}, - SubjectPrefix: "marmot-change-log", - StreamPrefix: "marmot-changes", - ServerConfigFile: "", - SeedFile: "", - CredsPassword: "", - CredsUser: "", - BindAddress: "0.0.0.0:4222", + URLs: []string{}, + SubjectPrefix: "marmot-change-log", + StreamPrefix: "marmot-changes", + ServerConfigFile: "", + SeedFile: "", + CredsPassword: "", + CredsUser: "", + BindAddress: "0.0.0.0:4222", + ConnectRetries: 5, + ReconnectWaitSeconds: 2, }, Logging: LoggingConfiguration{ diff --git a/config.toml b/config.toml index 8cb0bd5..9169606 100644 --- a/config.toml +++ b/config.toml @@ -120,7 +120,7 @@ urls=[ ] # Embedded server bind address bind_address="0.0.0.0:4222" -# Embedded server config file (will be only used if URLs array is empty) +# Embedded server config file (will only be used if URLs array is empty) server_config="" # Subject prefix used when publishing log entries, it's usually suffixed by shard number # to get the full subject name @@ -137,6 +137,11 @@ seed_file="" # User credentials used for plain user password authentication user_name="" user_password="" +# Number of retries when establishing the NATS server connection (will only be used if URLs array is not empty) +connect_retries=5 +# Wait time between NATS reconnect attempts (will only be used if URLs array is not empty) +reconnect_wait_seconds=2 + # Console STDOUT configurations [logging] diff --git a/go.mod b/go.mod index 4735b3f..798de33 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/rs/zerolog v1.29.1 github.com/samber/lo v1.38.1 github.com/studio-b12/gowebdav v0.9.0 + golang.org/x/crypto v0.11.0 ) require ( @@ -42,7 +43,6 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/objx v0.1.1 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/stream/nats.go b/stream/nats.go index 66c3068..afc8734 100644 --- a/stream/nats.go +++ b/stream/nats.go @@ -6,13 +6,11 @@ import ( "github.com/maxpert/marmot/cfg" "github.com/nats-io/nats.go" + "github.com/rs/zerolog/log" ) func Connect() (*nats.Conn, error) { - opts := []nats.Option{ - nats.Name(cfg.Config.NodeName()), - nats.Timeout(10 * time.Second), - } + opts := setupConnOptions() creds, err := getNatsAuthFromConfig() if err != nil { @@ -35,10 +33,24 @@ func Connect() (*nats.Conn, error) { return embedded.prepareConnection(opts...) } - return nats.Connect( - strings.Join(cfg.Config.NATS.URLs, ", "), - opts..., - ) + url := strings.Join(cfg.Config.NATS.URLs, ", ") + + var conn *nats.Conn + for i := 0; i < cfg.Config.NATS.ConnectRetries; i++ { + conn, err = nats.Connect(url, opts...) + if err == nil && conn.Status() == nats.CONNECTED { + break + } + + log.Warn(). + Err(err). + Int("attempt", i+1). + Int("attempt_limit", cfg.Config.NATS.ConnectRetries). + Str("status", conn.Status().String()). + Msg("NATS connection failed") + } + + return conn, err } func getNatsAuthFromConfig() ([]nats.Option, error) { @@ -76,3 +88,27 @@ func getNatsTLSFromConfig() ([]nats.Option, error) { return opts, nil } + +func setupConnOptions() []nats.Option { + return []nats.Option{ + nats.Name(cfg.Config.NodeName()), + nats.RetryOnFailedConnect(true), + nats.ReconnectWait(time.Duration(cfg.Config.NATS.ReconnectWaitSeconds) * time.Second), + nats.MaxReconnects(cfg.Config.NATS.ConnectRetries), + nats.ClosedHandler(func(nc *nats.Conn) { + log.Fatal(). + Err(nc.LastError()). + Msg("NATS client exiting") + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Error(). + Err(err). + Msg("NATS client disconnected") + }), + nats.ReconnectHandler(func(nc *nats.Conn) { + log.Info(). + Str("url", nc.ConnectedUrl()). + Msg("NATS client reconnected") + }), + } +}