Skip to content

Commit

Permalink
Improving default configurations and logging
Browse files Browse the repository at this point in the history
Improves default config paths so that NATS logs are stored on same path
as SQLite DB path. Renaming variables, renaming few functions for better
readability. Improves `run-cluster.sh` example to run 3 nodes and
demonstrate point of all nodes discovering each other via gossip.
  • Loading branch information
maxpert committed Jul 5, 2023
1 parent 7cf98d4 commit 09e2e8f
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 35 deletions.
37 changes: 25 additions & 12 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"hash/fnv"
"os"
"path"
"path/filepath"

"github.com/BurntSushi/toml"
"github.com/denisbrodbeck/machineid"
Expand Down Expand Up @@ -82,17 +83,16 @@ type Configuration struct {
Logging LoggingConfiguration `toml:"logging"`
}

var ConfigPath = flag.String("config", "marmot.toml", "Path to configuration file")
var Cleanup = flag.Bool("cleanup", false, "Only cleanup marmot triggers and changelogs")
var SaveSnapshot = flag.Bool("save-snapshot", false, "Only take snapshot and upload")
var ClusterListenAddr = flag.String("cluster-addr", "", "Cluster listening address")
var ClusterPeers = flag.String("cluster-peers", "", "Comma separated list of clusters")

var TmpDir = os.TempDir()
var ConfigPathFlag = flag.String("config", "marmot.toml", "Path to configuration file")
var CleanupFlag = flag.Bool("cleanup", false, "Only cleanup marmot triggers and changelogs")
var SaveSnapshotFlag = flag.Bool("save-snapshot", false, "Only take snapshot and upload")
var ClusterAddrFlag = flag.String("cluster-addr", "", "Cluster listening address")
var ClusterPeersFlag = flag.String("cluster-peers", "", "Comma separated list of clusters")

var DataRootDir = os.TempDir()
var Config = &Configuration{
SeqMapPath: path.Join(TmpDir, "seq-map.cbor"),
DBPath: path.Join(TmpDir, "marmot.db"),
SeqMapPath: "",
DBPath: path.Join(DataRootDir, "marmot.db"),
NodeID: 0,
Publish: true,
Replicate: true,
Expand Down Expand Up @@ -149,13 +149,26 @@ func init() {
Config.NodeID = hasher.Sum64()
}

func Load(path string) error {
_, err := toml.DecodeFile(path, Config)
func Load(filePath string) error {
_, err := toml.DecodeFile(filePath, Config)
if os.IsNotExist(err) {
return nil
}

return err
if err != nil {
return err
}

DataRootDir, err = filepath.Abs(path.Dir(Config.DBPath))
if err != nil {
return err
}

if Config.SeqMapPath == "" {
Config.SeqMapPath = path.Join(DataRootDir, "seq-map.cbor")
}

return nil
}

func (c *Configuration) SnapshotStorageType() SnapshotStoreType {
Expand Down
9 changes: 5 additions & 4 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/maxpert/marmot/cfg"
"github.com/maxpert/marmot/utils"
"regexp"
"strings"
"text/template"
"time"

"github.com/maxpert/marmot/cfg"
"github.com/maxpert/marmot/utils"

_ "embed"

"github.com/doug-martin/goqu/v9"
Expand Down Expand Up @@ -316,14 +317,14 @@ func (conn *SqliteStreamDB) publishChangeLog() {
log.Error().Err(err).Msg("Unable to consume changes")
}

err = conn.cleanupChangeLog(change)
err = conn.markChangePublished(change)
if err != nil {
log.Error().Err(err).Msg("Unable to cleanup change log")
}
}
}

func (conn *SqliteStreamDB) cleanupChangeLog(change globalChangeLogEntry) error {
func (conn *SqliteStreamDB) markChangePublished(change globalChangeLogEntry) error {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions examples/node-3-config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
seq_map_path="/tmp/marmot-3-sm.cbor"
db_path="/tmp/marmot-3.db"
node_id=3
23 changes: 15 additions & 8 deletions examples/run-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
#!/bin/bash

cleanup() {
kill $job1 $job2
}

create_db() {
local db_file="$1"
cat <<EOF | sqlite3 "$db_file"
Expand All @@ -27,15 +22,27 @@ EOF
echo "Created $db_file"
}

rm -rf /tmp/marmot-1-* /tmp/marmot-2-*
rm -rf /tmp/marmot-1-* /tmp/marmot-2-* /tmp/marmot-3-*
create_db /tmp/marmot-1.db
create_db /tmp/marmot-2.db
trap cleanup EXIT
create_db /tmp/marmot-3.db


cleanup() {
kill "$job1" "$job2" "$job3"
}

trap cleanup EXIT
rm -rf /tmp/nats
./marmot -config examples/node-1-config.toml -cluster-addr localhost:4221 -cluster-peers 'nats://localhost:4222/' &
job1=$!

sleep 1
./marmot -config examples/node-2-config.toml -cluster-addr localhost:4222 -cluster-peers 'nats://localhost:4221/' &
job2=$!

wait $job1 $job2
sleep 1
./marmot -config examples/node-3-config.toml -cluster-addr localhost:4223 -cluster-peers 'nats://localhost:4221/,nats://localhost:4222/' &
job3=$!

wait $job1 $job2 $job3
15 changes: 10 additions & 5 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package main
import (
"context"
"flag"
"github.com/maxpert/marmot/utils"
"io"
"os"
"time"

"github.com/maxpert/marmot/utils"

"github.com/maxpert/marmot/cfg"
"github.com/maxpert/marmot/db"
"github.com/maxpert/marmot/logstream"
Expand All @@ -21,7 +22,7 @@ import (
func main() {
flag.Parse()

err := cfg.Load(*cfg.ConfigPath)
err := cfg.Load(*cfg.ConfigPathFlag)
if err != nil {
panic(err)
}
Expand All @@ -30,7 +31,11 @@ func main() {
if cfg.Config.Logging.Format == "json" {
writer = os.Stdout
}
gLog := zerolog.New(writer).With().Timestamp().Logger()
gLog := zerolog.New(writer).
With().
Timestamp().
Uint64("node_id", cfg.Config.NodeID).
Logger()

if cfg.Config.Logging.Verbose {
log.Logger = gLog.Level(zerolog.DebugLevel)
Expand All @@ -45,7 +50,7 @@ func main() {
return
}

if *cfg.Cleanup {
if *cfg.CleanupFlag {
err = streamDB.RemoveCDC(true)
if err != nil {
log.Panic().Err(err).Msg("Unable to clean up...")
Expand All @@ -71,7 +76,7 @@ func main() {
log.Panic().Err(err).Msg("Unable to initialize replicators")
}

if *cfg.SaveSnapshot {
if *cfg.SaveSnapshotFlag {
rep.SaveSnapshot()
return
}
Expand Down
12 changes: 6 additions & 6 deletions stream/embedded_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ func startEmbeddedServer(nodeName string) (*embeddedNats, error) {
},
}

if *cfg.ClusterPeers != "" {
opts.Routes = server.RoutesFromStr(*cfg.ClusterPeers)
if *cfg.ClusterPeersFlag != "" {
opts.Routes = server.RoutesFromStr(*cfg.ClusterPeersFlag)
}

if *cfg.ClusterListenAddr != "" {
host, port, err := parseHostAndPort(*cfg.ClusterListenAddr)
if *cfg.ClusterAddrFlag != "" {
host, port, err := parseHostAndPort(*cfg.ClusterAddrFlag)
if err != nil {
return nil, err
}

opts.Cluster.ListenStr = *cfg.ClusterListenAddr
opts.Cluster.ListenStr = *cfg.ClusterAddrFlag
opts.Cluster.Host = host
opts.Cluster.Port = port
}
Expand All @@ -82,7 +82,7 @@ func startEmbeddedServer(nodeName string) (*embeddedNats, error) {
}

if opts.StoreDir == "" {
opts.StoreDir = path.Join(cfg.TmpDir, "nats", nodeName)
opts.StoreDir = path.Join(cfg.DataRootDir, "nats", nodeName)
}

s, err := server.NewServer(opts)
Expand Down

0 comments on commit 09e2e8f

Please sign in to comment.