Skip to content

Commit

Permalink
Multiple fixes
Browse files Browse the repository at this point in the history
 - Timer efficiency improvements
 - Timer based change log cleanup
 - Improvements to documentation
 - Removing redundant values from replication event
 - Improving replicator to only create streams if they are missing, this
will let admins operations persist on jetstreams in case of restart
  • Loading branch information
maxpert committed Sep 25, 2022
1 parent 851d6a1 commit 0a360fa
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 36 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ as a side car letting you build replication cluster without making any changes t
application code, and allows you to keep using to your SQLite database file.

## Dependencies
Starting 0.4+ Marmot depends on [nats-server](https://nats.io/download/).
Starting 0.4+ Marmot depends on [nats-server](https://nats.io/download/) with JetStream support.

## Production status

Expand Down
18 changes: 14 additions & 4 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ func (conn *SqliteStreamDB) DeleteChangeLog(event *ChangeLogEvent) (bool, error)
return count > 0, nil
}

func (conn *SqliteStreamDB) CleanupChangeLogs() (int64, error) {
func (conn *SqliteStreamDB) CleanupChangeLogs(beforeTime time.Time) (int64, error) {
total := int64(0)
for name := range conn.watchTablesSchema {
metaTableName := conn.metaTable(name, changeLogName)
rs, err := conn.Delete(metaTableName).
Where(goqu.Ex{"state": Published}).
Where(
goqu.C("state").Eq(Published),
goqu.C("created_at").Lte(beforeTime.UnixMilli()),
).
Prepared(true).
Executor().
Exec()
Expand Down Expand Up @@ -199,6 +202,8 @@ func (conn *SqliteStreamDB) watchChanges(path string) {

errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
changeLogTicker := time.NewTicker(time.Millisecond * 100)
updateTime := time.Now()

for {
select {
Expand All @@ -210,8 +215,11 @@ func (conn *SqliteStreamDB) watchChanges(path string) {
if ev.Op != fsnotify.Chmod {
conn.publishChangeLog()
}
case <-time.After(time.Millisecond * 500):
conn.publishChangeLog()
case t := <-changeLogTicker.C:
if t.After(updateTime) {
conn.publishChangeLog()
}

if errShm != nil {
errShm = watcher.Add(shmPath)
}
Expand All @@ -220,6 +228,8 @@ func (conn *SqliteStreamDB) watchChanges(path string) {
errWal = watcher.Add(walPath)
}
}

updateTime = time.Now()
}
}

Expand Down
2 changes: 1 addition & 1 deletion db/change_log_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (e *ChangeLogEvent) getSortedPKColumns() []string {

tablePKColumnsLock.Lock()
defer tablePKColumnsLock.Unlock()
tablePKColumnsCache[e.TableName] = pkColumns

tablePKColumnsCache[e.TableName] = pkColumns
return pkColumns
}
3 changes: 0 additions & 3 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ Yes it will require additional storage to old/new values from triggers. But righ
Ask marmot to remove hooks and log tables by:
`marmot -db-path /path/to/your/db.db -cleanup`

Cleanup the Raft logs and snapshot by:
`rm -rf /tmp/raft`

### How would many shards should I have?

It depends on your usecase and what problem you are solving for. In a typical setting you should not need more than couple of dozen shards. While read scaling won't be a problem, your write throughput will depend on your network and
Expand Down
5 changes: 2 additions & 3 deletions lib/replication_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package lib
import "github.com/fxamacker/cbor/v2"

type ReplicationEvent[T any] struct {
FromNodeId uint64
ChangeRowId int64
Payload *T
FromNodeId uint64
Payload *T
}

func (e *ReplicationEvent[T]) Marshal() ([]byte, error) {
Expand Down
33 changes: 24 additions & 9 deletions lib/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"github.com/rs/zerolog/log"
)

const maxReplcateRetries = 7
const DefaultUrl = nats.DefaultURL
const NodeNamePrefix = "marmot-node"

var MaxLogEntries = int64(1024)
var EntryReplicas = int(0)
var EntryReplicas = 0
var StreamNamePrefix = "marmot-changes"
var SubjectPrefix = "marmot-change-log"

Expand Down Expand Up @@ -40,11 +41,10 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator
}

streamCfg := makeShardConfig(shard, shards)
_, err = js.StreamInfo(streamCfg.Name)
if err != nil {
_, err = js.AddStream(streamCfg)
} else {
_, err = js.UpdateStream(streamCfg)
info, err := js.StreamInfo(streamCfg.Name)
if err == nats.ErrStreamNotFound {
log.Debug().Uint64("shard", shard).Msg("Creating stream")
info, err = js.AddStream(streamCfg)
}

if err != nil {
Expand All @@ -53,7 +53,14 @@ func NewReplicator(nodeID uint64, natsServer string, shards uint64) (*Replicator

log.Debug().
Uint64("shard", shard).
Msg("Created stream")
Str("name", info.Config.Name).
Int("replicas", info.Config.Replicas).
Str("leader", info.Cluster.Leader).
Msg("Stream ready...")

if err != nil {
return nil, err
}

streamMap[shard] = js
}
Expand Down Expand Up @@ -89,14 +96,14 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
sub, err := js.SubscribeSync(
subjectName(shardID),
nats.Durable(nodeName(r.nodeID)),
nats.ManualAck(),
)

if err != nil {
return err
}
defer sub.Unsubscribe()

replRetry := 0
for sub.IsValid() {
msg, err := sub.NextMsg(1 * time.Second)

Expand All @@ -111,9 +118,17 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
log.Debug().Str("sub", msg.Subject).Uint64("shard", shardID).Send()
err = callback(msg.Data)
if err != nil {
return err
if replRetry > maxReplcateRetries {
return err
}

log.Error().Err(err).Msg("Unable to process message retrying")
msg.Nak()
replRetry++
continue
}

replRetry = 0
err = msg.Ack()
if err != nil {
return err
Expand Down
42 changes: 27 additions & 15 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"math/rand"
"time"

"github.com/maxpert/marmot/db"
"github.com/maxpert/marmot/lib"
Expand Down Expand Up @@ -67,20 +68,35 @@ func main() {

errChan := make(chan error)
for i := uint64(0); i < *shards; i++ {
go changeListener(streamDB, rep, i+1)
go changeListener(streamDB, rep, i+1, errChan)
}

err = <-errChan
if err != nil {
log.Panic().Err(err).Msg("Terminated listener")
cleanupInterval := 5 * time.Second
cleanupTicker := time.NewTicker(cleanupInterval)
defer cleanupTicker.Stop()

for {
select {
case err = <-errChan:
if err != nil {
log.Panic().Err(err).Msg("Terminated listener")
}
case t := <-cleanupTicker.C:
cnt, err := streamDB.CleanupChangeLogs(t.Add(-cleanupInterval))
if err != nil {
log.Warn().Err(err).Msg("Unable to cleanup change logs")
} else if cnt > 0 {
log.Debug().Int64("count", cnt).Msg("Cleaned up DB change logs")
}
}
}
}

func changeListener(streamDB *db.SqliteStreamDB, rep *lib.Replicator, shard uint64) {
func changeListener(streamDB *db.SqliteStreamDB, rep *lib.Replicator, shard uint64, errChan chan error) {
log.Debug().Uint64("shard", shard).Msg("Listening stream")
err := rep.Listen(shard, onChangeEvent(streamDB))
if err != nil {
log.Panic().Err(err).Msg("Listener error")
errChan <- err
}
}

Expand All @@ -89,24 +105,20 @@ func onChangeEvent(streamDB *db.SqliteStreamDB) func(data []byte) error {
ev := &lib.ReplicationEvent[db.ChangeLogEvent]{}
err := ev.Unmarshal(data)
if err != nil {
log.Error().Err(err).Send()
return err
}

ok, _ := streamDB.DeleteChangeLog(ev.Payload)
if ok {
return nil
}

_, _ = streamDB.DeleteChangeLog(ev.Payload)
return streamDB.Replicate(ev.Payload)
}
}

func onTableChanged(r *lib.Replicator, nodeID uint64, shards uint64) func(event *db.ChangeLogEvent) error {
return func(event *db.ChangeLogEvent) error {
ev := &lib.ReplicationEvent[db.ChangeLogEvent]{
FromNodeId: nodeID,
ChangeRowId: event.Id,
Payload: event,
FromNodeId: nodeID,
Payload: event,
}

data, err := ev.Marshal()
Expand All @@ -119,7 +131,7 @@ func onTableChanged(r *lib.Replicator, nodeID uint64, shards uint64) func(event
return err
}

err = r.Publish(hash%shards, data)
err = r.Publish(hash, data)
if err != nil {
return err
}
Expand Down

0 comments on commit 0a360fa

Please sign in to comment.