Skip to content

Commit

Permalink
Merge pull request #37 from maxpert/cfg-flags
Browse files Browse the repository at this point in the history
Extra configurations to enable various settings
  • Loading branch information
maxpert authored Dec 11, 2022
2 parents e5fe5d0 + ef7a975 commit 50fbbf8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 14 deletions.
20 changes: 14 additions & 6 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ type LoggingConfiguration struct {
}

type Configuration struct {
SeqMapPath string `toml:"seq_map_path"`
DBPath string `toml:"db_path"`
NodeID uint64 `toml:"node_id"`
SeqMapPath string `toml:"seq_map_path"`
DBPath string `toml:"db_path"`
NodeID uint64 `toml:"node_id"`
Publish bool `toml:"publish"`
Replicate bool `toml:"replicate"`
ScanMaxChanges uint32 `toml:"scan_max_changes"`
CleanInterval uint32 `toml:"cleanup_interval"`

Snapshot SnapshotConfiguration `toml:"snapshot"`
ReplicationLog ReplicationLogConfiguration `toml:"replication_log"`
Expand All @@ -75,9 +79,13 @@ var Cleanup = flag.Bool("cleanup", false, "Only cleanup marmot triggers and chan
var SaveSnapshot = flag.Bool("save-snapshot", false, "Only take snapshot and upload")

var Config = &Configuration{
SeqMapPath: "/tmp/seq-map.cbor",
DBPath: "/tmp/marmot.db",
NodeID: 1,
SeqMapPath: "/tmp/seq-map.cbor",
DBPath: "/tmp/marmot.db",
NodeID: 1,
Publish: true,
Replicate: true,
ScanMaxChanges: 512,
CleanInterval: 5,

Snapshot: SnapshotConfiguration{
Enable: true,
Expand Down
15 changes: 15 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ db_path="/tmp/marmot.db"
# and replay all logs in order to restore database
# seq_map_path="/tmp/seq-map.cbor"

# Replication enabled/disabled (default: true)
# This will allow process to consume incoming changes from NATS
# replicate = true

# Publishing enabled/disabled (default: true)
# This will allow process to control publishing of local DB changes to NATS
# publish = true

# Number of maximum rows to process per change allows configuring the maximum number of rows Marmot
# will process (scan/load in memory) before publishing to NATS (default: 512)
# scan_max_changes = 512

# Cleanup interval in seconds used to clean up published rows. This is done in order to reduce write
# load on the system (default: 5)
# cleanup_interval = 5

# Snapshots are used to limit log size and have a database snapshot backedup on your
# configured blob storage (NATS for now). This helps speedier recovery or cold boot
Expand Down
11 changes: 5 additions & 6 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/maxpert/marmot/cfg"
"regexp"
"strings"
"text/template"
Expand All @@ -18,9 +19,6 @@ import (
"github.com/samber/lo"
)

// ScanLimit is number of change log rows processed at a time, to limit memory usage
const ScanLimit = uint(128)

var ErrNoTableMapping = errors.New("no table mapping found")
var ErrLogNotReadyToPublish = errors.New("not ready to publish changes")

Expand Down Expand Up @@ -248,7 +246,7 @@ func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string)
}
}

func (conn *SqliteStreamDB) getGlobalChanges(limit uint) ([]globalChangeLogEntry, error) {
func (conn *SqliteStreamDB) getGlobalChanges(limit uint32) ([]globalChangeLogEntry, error) {
sqlConn, err := conn.pool.Borrow()
if err != nil {
return nil, err
Expand All @@ -258,7 +256,8 @@ func (conn *SqliteStreamDB) getGlobalChanges(limit uint) ([]globalChangeLogEntry
var entries []globalChangeLogEntry
err = sqlConn.DB().
From(conn.globalMetaTable()).
Limit(limit).
Order(goqu.I("id").Asc()).
Limit(uint(limit)).
ScanStructs(&entries)

if err != nil {
Expand All @@ -274,7 +273,7 @@ func (conn *SqliteStreamDB) publishChangeLog() {
}
defer conn.publishLock.Unlock()

changes, err := conn.getGlobalChanges(ScanLimit)
changes, err := conn.getGlobalChanges(cfg.Config.ScanMaxChanges)
if err != nil {
log.Error().Err(err).Msg("Unable to scan global changes")
return
Expand Down
12 changes: 10 additions & 2 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
return
}

if cfg.Config.Snapshot.Enable {
if cfg.Config.Snapshot.Enable && cfg.Config.Replicate {
err = rep.RestoreSnapshot()
if err != nil {
log.Panic().Err(err).Msg("Unable to restore snapshot")
Expand All @@ -101,7 +101,7 @@ func main() {
go changeListener(streamDB, rep, i+1, errChan)
}

cleanupInterval := 5 * time.Second
cleanupInterval := time.Duration(cfg.Config.CleanInterval) * time.Second
cleanupTicker := time.NewTicker(cleanupInterval)
defer cleanupTicker.Stop()

Expand Down Expand Up @@ -132,6 +132,10 @@ func changeListener(streamDB *db.SqliteStreamDB, rep *logstream.Replicator, shar

func onChangeEvent(streamDB *db.SqliteStreamDB) func(data []byte) error {
return func(data []byte) error {
if !cfg.Config.Replicate {
return nil
}

ev := &logstream.ReplicationEvent[db.ChangeLogEvent]{}
err := ev.Unmarshal(data)
if err != nil {
Expand All @@ -145,6 +149,10 @@ func onChangeEvent(streamDB *db.SqliteStreamDB) func(data []byte) error {

func onTableChanged(r *logstream.Replicator, nodeID uint64) func(event *db.ChangeLogEvent) error {
return func(event *db.ChangeLogEvent) error {
if !cfg.Config.Publish {
return nil
}

ev := &logstream.ReplicationEvent[db.ChangeLogEvent]{
FromNodeId: nodeID,
Payload: event,
Expand Down

0 comments on commit 50fbbf8

Please sign in to comment.