Skip to content

Commit

Permalink
Matched the log DB state loading expectations to state machine.
Browse files Browse the repository at this point in the history
SQLiteLogDB was not exactly returning / following the entry counts for
what was expected out of ReadState from a restarting state machine.
  • Loading branch information
maxpert committed Sep 20, 2022
1 parent 995248d commit f521b14
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
9 changes: 3 additions & 6 deletions lib/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -78,11 +77,9 @@ func (r *RaftServer) Init() error {
RaftEventListener: r,
}

if strings.ToLower(os.Getenv("SQLITE_LOG_STORE")) == "true" {
factory := NewSQLiteLogDBFactory(r.metaPath, r.nodeID)
hostConfig.Expert = config.ExpertConfig{
LogDBFactory: factory,
}
factory := NewSQLiteLogDBFactory(r.metaPath, r.nodeID)
hostConfig.Expert = config.ExpertConfig{
LogDBFactory: factory,
}

nodeHost, err := dragonboat.NewNodeHost(hostConfig)
Expand Down
59 changes: 34 additions & 25 deletions lib/sqlite_log_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *SQLiteLogDB) IterateEntries(
Uint64("entries", uint64(len(entries))).
Logger()

min, count, err := s.getEntryRange(nodeID, clusterID)
min, count, err := s.getEntryRange(nodeID, clusterID, nil)
if err == raftio.ErrNoSavedLog {
logger.Warn().Msg("No entries...")
return entries, size, nil
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *SQLiteLogDB) ReadRaftState(clusterID uint64, nodeID uint64, snapshotInd
ret := raftio.RaftState{}
log.Trace().Msg(fmt.Sprintf("ReadRaftState %d %d %d", clusterID, nodeID, snapshotIndex))

firstIndex, entriesCount, err := s.getEntryRange(nodeID, clusterID)
minIndex, entriesCount, err := s.getEntryRange(nodeID, clusterID, &snapshotIndex)
if err != nil {
return ret, err
}
Expand All @@ -337,14 +337,9 @@ func (s *SQLiteLogDB) ReadRaftState(clusterID uint64, nodeID uint64, snapshotInd
return ret, err
}

ret.FirstIndex = firstIndex
ret.FirstIndex = minIndex
ret.EntryCount = entriesCount

// Have to investigate but existing code in dragonboat suggests with 1 entry it returns 0
if snapshotIndex == (firstIndex + entriesCount - 1) {
ret.EntryCount = 0
}

return ret, nil
}

Expand Down Expand Up @@ -528,11 +523,10 @@ func (s *SQLiteLogDB) ImportSnapshot(snp raftpb.Snapshot, nodeID uint64) error {
})
}

func (s *SQLiteLogDB) getEntryRange(nodeID, clusterID uint64) (uint64, uint64, error) {
func (s *SQLiteLogDB) getEntryRange(nodeID, clusterID uint64, index *uint64) (uint64, uint64, error) {
count := uint64(0)
ok, err := s.db.From(raftInfoTable).Select(
goqu.COUNT("entry_index"),
).
ok, err := s.db.From(raftInfoTable).
Select(goqu.COUNT("entry_index")).
Where(goqu.Ex{"node_id": nodeID, "cluster_id": clusterID, "entry_type": Entry}).
Prepared(true).
Executor().
Expand All @@ -542,32 +536,47 @@ func (s *SQLiteLogDB) getEntryRange(nodeID, clusterID uint64) (uint64, uint64, e
return 0, 0, err
}

if !ok {
if !ok || count == 0 {
return 0, 0, raftio.ErrNoSavedLog
}

if count <= 0 {
return 0, 0, nil
if index == nil {
return 0, count, nil
}

min := uint64(0)
ok, err = s.db.From(raftInfoTable).Select(
goqu.MIN("entry_index"),
).
max := uint64(0)
_, err = s.db.From(raftInfoTable).
Select(goqu.MAX("entry_index")).
Where(goqu.Ex{"node_id": nodeID, "cluster_id": clusterID, "entry_type": Entry}).
Prepared(true).
Executor().
ScanVal(&min)
ScanVal(&max)

if err != nil {
return 0, 0, err
}

if !ok {
return 0, 0, raftio.ErrNoSavedLog
if max == *index {
return max, 0, nil
}

min := uint64(0)
_, err = s.db.From(raftInfoTable).
Select(goqu.MIN("entry_index")).
Where(
goqu.Ex{"node_id": nodeID, "cluster_id": clusterID, "entry_type": Entry},
goqu.C("entry_index").Gte(*index),
goqu.C("entry_index").Lte(max),
).
Prepared(true).
Executor().
ScanVal(&min)

if err != nil {
return 0, 0, err
}

return min, count, nil
return min, max - min + 1, nil
}

func deleteInfoTuple(
Expand Down Expand Up @@ -596,7 +605,7 @@ func deleteInfoTuple(

exps = append(exps, additionalExpressions...)

logger.Trace().Msg("Deleted rows")
logger.Trace().Msg("deleteInfoTuple")
_, err := db.Delete(raftInfoTable).
Where(exps...).
Prepared(true).
Expand Down Expand Up @@ -662,6 +671,6 @@ func saveInfoTuple(
"payload": data,
}).Prepared(true).Executor().Exec()

logger.Trace().Err(err).Msg("Saved")
logger.Trace().Err(err).Msg("saveTupleInfo")
return err
}

0 comments on commit f521b14

Please sign in to comment.