Skip to content

Commit

Permalink
Full working version of embedded NATS
Browse files Browse the repository at this point in the history
This PR introduces fixes to embedded NATS bootup process that will allow
Marmot processes to wait for streams to be ready before it can proceed
with boot process. This will prevent panic exit from the process and
let individual Marmot processes to come up forming a JS cluster.
  • Loading branch information
maxpert committed Jun 30, 2023
1 parent fb6fd13 commit ec65289
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
4 changes: 2 additions & 2 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ compress=true
urls=[
"nats://localhost:4222"
]
# Embedded server config file
server_config=""
# Embedded server config file
server_config=""
# Subject prefix used when publishing log entries, it's usually suffixed by shard number
# to get the full subject name
subject_prefix="marmot-change-log"
Expand Down
54 changes: 45 additions & 9 deletions stream/embedded_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"path"
"strconv"
"sync"
"time"

"github.com/maxpert/marmot/cfg"
"github.com/nats-io/nats-server/v2/logger"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -45,12 +47,15 @@ func startEmbeddedServer(nodeName string) (*server.Server, error) {
JetStream: true,
JetStreamMaxMemory: 1 << 25,
JetStreamMaxStore: 1 << 30,
Routes: server.RoutesFromStr(*cfg.ClusterPeers),
Cluster: server.ClusterOpts{
Name: "e-marmot",
},
}

if *cfg.ClusterPeers != "" {
opts.Routes = server.RoutesFromStr(*cfg.ClusterPeers)
}

if *cfg.ClusterListenAddr != "" {
host, port, err := parseHostAndPort(*cfg.ClusterListenAddr)
if err != nil {
Expand All @@ -62,27 +67,58 @@ func startEmbeddedServer(nodeName string) (*server.Server, error) {
opts.Cluster.Port = port
}

if opts.StoreDir == "" {
opts.StoreDir = path.Join(cfg.TmpDir, "nats", nodeName)
}

if cfg.Config.NATS.ServerConfigFile != "" {
err := opts.ProcessConfigFile(cfg.Config.NATS.ServerConfigFile)
if err != nil {
return nil, err
}
}

if opts.StoreDir == "" {
opts.StoreDir = path.Join(cfg.TmpDir, "nats", nodeName)
}

s, err := server.NewServer(opts)
if err != nil {
return nil, err
}

s.SetLogger(logger.NewStdLogger(true, false, false, true, false), true, false)
s.SetLogger(
logger.NewStdLogger(true, opts.Debug, opts.Trace, true, false),
opts.Debug,
opts.Trace,
)
s.Start()
log.Info().
Bool("clustered", s.JetStreamIsClustered()).
Msg("Started embedded JetStream server...")

err = touchStreamReady(nodeName, s)
if err != nil {
return nil, err
}

embeddedServer = s
return s, nil
}

func touchStreamReady(nodeName string, s *server.Server) error {
for {
c, err := nats.Connect("", nats.InProcessServer(s))
if err != nil {
return err
}

j, err := c.JetStream()
if err != nil {
return err
}

st, err := j.StreamInfo(nodeName, nats.MaxWait(1*time.Second))
if err == nats.ErrStreamNotFound || st != nil {
log.Info().Msg("Streaming ready...")
return nil
}

c.Close()
log.Info().Err(err).Msg("Streams not ready, waiting for NATS streams to come up...")
time.Sleep(1 * time.Second)
}
}
2 changes: 1 addition & 1 deletion stream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func Connect() (*nats.Conn, error) {
opts := []nats.Option{
nats.Name(cfg.Config.NodeName()),
nats.Timeout(60 * time.Second),
nats.Timeout(10 * time.Second),
}

serverUrl := strings.Join(cfg.Config.NATS.URLs, ", ")
Expand Down

0 comments on commit ec65289

Please sign in to comment.