diff --git a/config.toml b/config.toml index a0ec5fa..19a743b 100644 --- a/config.toml +++ b/config.toml @@ -101,8 +101,8 @@ compress=true urls=[ "nats://localhost:4222" ] - # Embedded server config file - server_config="" +# Embedded server config file +server_config="" # Subject prefix used when publishing log entries, it's usually suffixed by shard number # to get the full subject name subject_prefix="marmot-change-log" diff --git a/stream/embedded_nats.go b/stream/embedded_nats.go index 63870ed..1dc54b3 100644 --- a/stream/embedded_nats.go +++ b/stream/embedded_nats.go @@ -5,10 +5,12 @@ import ( "path" "strconv" "sync" + "time" "github.com/maxpert/marmot/cfg" "github.com/nats-io/nats-server/v2/logger" "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" "github.com/rs/zerolog/log" ) @@ -45,12 +47,15 @@ func startEmbeddedServer(nodeName string) (*server.Server, error) { JetStream: true, JetStreamMaxMemory: 1 << 25, JetStreamMaxStore: 1 << 30, - Routes: server.RoutesFromStr(*cfg.ClusterPeers), Cluster: server.ClusterOpts{ Name: "e-marmot", }, } + if *cfg.ClusterPeers != "" { + opts.Routes = server.RoutesFromStr(*cfg.ClusterPeers) + } + if *cfg.ClusterListenAddr != "" { host, port, err := parseHostAndPort(*cfg.ClusterListenAddr) if err != nil { @@ -62,10 +67,6 @@ func startEmbeddedServer(nodeName string) (*server.Server, error) { opts.Cluster.Port = port } - if opts.StoreDir == "" { - opts.StoreDir = path.Join(cfg.TmpDir, "nats", nodeName) - } - if cfg.Config.NATS.ServerConfigFile != "" { err := opts.ProcessConfigFile(cfg.Config.NATS.ServerConfigFile) if err != nil { @@ -73,16 +74,51 @@ func startEmbeddedServer(nodeName string) (*server.Server, error) { } } + if opts.StoreDir == "" { + opts.StoreDir = path.Join(cfg.TmpDir, "nats", nodeName) + } + s, err := server.NewServer(opts) if err != nil { return nil, err } - s.SetLogger(logger.NewStdLogger(true, false, false, true, false), true, false) + s.SetLogger( + logger.NewStdLogger(true, opts.Debug, opts.Trace, true, false), + opts.Debug, + opts.Trace, + ) s.Start() - log.Info(). - Bool("clustered", s.JetStreamIsClustered()). - Msg("Started embedded JetStream server...") + + err = touchStreamReady(nodeName, s) + if err != nil { + return nil, err + } + embeddedServer = s return s, nil } + +func touchStreamReady(nodeName string, s *server.Server) error { + for { + c, err := nats.Connect("", nats.InProcessServer(s)) + if err != nil { + return err + } + + j, err := c.JetStream() + if err != nil { + return err + } + + st, err := j.StreamInfo(nodeName, nats.MaxWait(1*time.Second)) + if err == nats.ErrStreamNotFound || st != nil { + log.Info().Msg("Streaming ready...") + return nil + } + + c.Close() + log.Info().Err(err).Msg("Streams not ready, waiting for NATS streams to come up...") + time.Sleep(1 * time.Second) + } +} diff --git a/stream/nats.go b/stream/nats.go index 17876ed..3657b4c 100644 --- a/stream/nats.go +++ b/stream/nats.go @@ -11,7 +11,7 @@ import ( func Connect() (*nats.Conn, error) { opts := []nats.Option{ nats.Name(cfg.Config.NodeName()), - nats.Timeout(60 * time.Second), + nats.Timeout(10 * time.Second), } serverUrl := strings.Join(cfg.Config.NATS.URLs, ", ")