Skip to content

Commit

Permalink
Adding graceful cluster publishing failure
Browse files Browse the repository at this point in the history
 - Cleaner way for change log to fail on publishing changes (tansport
not ready state)
 - CLI parameter to select max log size before snapshotting
  • Loading branch information
maxpert committed Sep 19, 2022
1 parent 85f9eb5 commit 75423a8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
19 changes: 15 additions & 4 deletions db/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"github.com/samber/lo"
)

// ScanLimit is number of change log rows processed at a time, to limit memory usage
const ScanLimit = uint(100)

var ErrNoTableMapping = errors.New("no table mapping found")
var ErrLogNotReadyToPublish = errors.New("not ready to publish changes")

//go:embed table_change_log_script.tmpl
var tableChangeLogScriptTemplate string
Expand Down Expand Up @@ -198,7 +202,6 @@ func (conn *SqliteStreamDB) watchChanges(path string) {

func (conn *SqliteStreamDB) publishChangeLog() {
conn.publishLock.Lock()
scanLimit := uint(100)
processed := uint64(0)

// TODO: Move cleanup logic to time based cleanup
Expand Down Expand Up @@ -227,7 +230,7 @@ func (conn *SqliteStreamDB) publishChangeLog() {
return tx.Select("id", "type", "state").
From(conn.metaTable(tableName, changeLogName)).
Where(goqu.Ex{"state": Pending}).
Limit(scanLimit).
Limit(ScanLimit).
Prepared(true).
ScanStructs(&changes)
})
Expand All @@ -243,11 +246,15 @@ func (conn *SqliteStreamDB) publishChangeLog() {

err = conn.consumeChangeLogs(tableName, changes)
if err != nil {
if err == ErrLogNotReadyToPublish {
break
}

log.Error().Err(err).Msg("Unable to consume changes")
}

processed += uint64(len(changes))
if uint(len(changes)) <= scanLimit {
if uint(len(changes)) <= ScanLimit {
break
}
}
Expand Down Expand Up @@ -300,7 +307,11 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang
})

if err != nil {
logger.Error().Err(err).Msg("Failed to acquire consensus")
if err == ErrLogNotReadyToPublish {
return err
}

logger.Error().Err(err).Msg("Failed to publish for table " + tableName)
return err
}
}
Expand Down
5 changes: 0 additions & 5 deletions lib/sqlite_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,6 @@ func (ssm *SQLiteStateMachine) saveIndex() error {
return err
}

log.Debug().
Uint64("node_id", ssm.NodeID).
Uint64("cluster_id", ssm.ClusterID).
Uint64("index", ssm.applied.Index).
Msg("Saved index")
return nil
}

Expand Down
14 changes: 11 additions & 3 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func main() {
nodeID := flag.Uint64("node-id", rand.Uint64(), "Node ID")
followFlag := flag.Bool("follow", false, "Start Raft in follower mode")
shards := flag.Uint64("shards", 16, "Total number of shards for this instance")
maxLogEntries := flag.Uint64("max-log-entries", 1024, "Total number of log entries per shard before snapshotting")
bindAddress := flag.String("bind", "0.0.0.0:8160", "Bind address for Raft server")
bindPane := flag.String("bind-pane", "localhost:6010", "Bind address for control pane server")
initialAddrs := flag.String("bootstrap", "", "<CLUSTER_ID>@IP:PORT list of initial nodes separated by comma (,)")
Expand Down Expand Up @@ -52,6 +53,9 @@ func main() {
return
}

// Set max log entries from command line
lib.MaxLogEntries = *maxLogEntries

*metaPath = fmt.Sprintf("%s/node-%d", *metaPath, *nodeID)
raft := lib.NewRaftServer(*bindAddress, *nodeID, *metaPath, srcDb)
err = raft.Init()
Expand All @@ -72,7 +76,7 @@ func main() {
}
}

srcDb.OnChange = onTableChanged(nodeID, raft)
srcDb.OnChange = onTableChanged(raft, *nodeID, *shards)
log.Info().Msg("Starting change data capture pipeline...")
if err := srcDb.InstallCDC(); err != nil {
log.Error().Err(err).Msg("Unable to install change data capture pipeline")
Expand All @@ -85,10 +89,14 @@ func main() {
}
}

func onTableChanged(nodeID *uint64, raft *lib.RaftServer) func(event *db.ChangeLogEvent) error {
func onTableChanged(raft *lib.RaftServer, nodeID uint64, shards uint64) func(event *db.ChangeLogEvent) error {
return func(event *db.ChangeLogEvent) error {
if uint64(len(raft.GetActiveClusters())) != shards {
return db.ErrLogNotReadyToPublish
}

ev := &lib.ReplicationEvent[db.ChangeLogEvent]{
FromNodeId: *nodeID,
FromNodeId: nodeID,
Payload: event,
}

Expand Down

0 comments on commit 75423a8

Please sign in to comment.