Skip to content

Commit

Permalink
waiting completed snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
klim0v committed Dec 15, 2021
1 parent 0c54acb commit 30a59fb
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 28 deletions.
4 changes: 2 additions & 2 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ api_simultaneous_requests = {{ .BaseConfig.APISimultaneousRequests }}
fast_sync = {{ .BaseConfig.FastSync }}
# State sync snapshot interval
snapshot_interval = {{ .BaseConfig.SnapshotInterval }}
snapshot_interval = {{ .BaseConfig.SnapshotInterval }}
# State sync snapshot to keep
snapshot_keep_recent = {{ .BaseConfig.SnapshotKeepRecent }}
snapshot_keep_recent = {{ .BaseConfig.SnapshotKeepRecent }}
# Database backend: leveldb | memdb
db_backend = "{{ .BaseConfig.DBBackend }}"
Expand Down
105 changes: 79 additions & 26 deletions coreV2/appdb/appdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"github.com/MinterTeam/minter-go-node/tree"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"sync"

//"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/iavl"

"github.com/MinterTeam/minter-go-node/coreV2/appdb/types"
sdktypes "github.com/cosmos/cosmos-sdk/store/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
iavltree "github.com/cosmos/iavl"
protoio "github.com/gogo/protobuf/io"
Expand Down Expand Up @@ -40,9 +41,9 @@ const (
// AppDB is responsible for storing basic information about app state on disk
type AppDB struct {
db db.DB
mu sync.RWMutex

store tree.MTree
keysByName map[string]sdktypes.StoreKey
store tree.MTree

startHeight uint64
lastHeight uint64
Expand All @@ -63,6 +64,9 @@ func (appDB *AppDB) Close() error {

// GetLastBlockHash returns latest block hash stored on disk
func (appDB *AppDB) GetLastBlockHash() []byte {
appDB.mu.RLock()
defer appDB.mu.RUnlock()

rawHash, err := appDB.db.Get([]byte(hashPath))
if err != nil {
panic(err)
Expand All @@ -79,6 +83,9 @@ func (appDB *AppDB) GetLastBlockHash() []byte {

// SetLastBlockHash stores given block hash on disk, panics on error
func (appDB *AppDB) SetLastBlockHash(hash []byte) {
appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(hashPath), hash); err != nil {
panic(err)
}
Expand All @@ -91,6 +98,9 @@ func (appDB *AppDB) GetLastHeight() uint64 {
return val
}

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(heightPath))
if err != nil {
panic(err)
Expand All @@ -108,6 +118,10 @@ func (appDB *AppDB) GetLastHeight() uint64 {
func (appDB *AppDB) SetLastHeight(height uint64) {
h := make([]byte, 8)
binary.BigEndian.PutUint64(h, height)

appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(heightPath), h); err != nil {
panic(err)
}
Expand All @@ -124,6 +138,10 @@ func (appDB *AppDB) SetStartHeight(height uint64) {
func (appDB *AppDB) SaveStartHeight() {
h := make([]byte, 8)
binary.BigEndian.PutUint64(h, atomic.LoadUint64(&appDB.startHeight))

appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(startHeightPath), h); err != nil {
panic(err)
}
Expand All @@ -135,6 +153,10 @@ func (appDB *AppDB) GetStartHeight() uint64 {
if val != 0 {
return val
}

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(startHeightPath))
if err != nil {
panic(err)
Expand All @@ -154,6 +176,9 @@ func (appDB *AppDB) GetValidators() abcTypes.ValidatorUpdates {
return appDB.validators
}

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(validatorsPath))
if err != nil {
panic(err)
Expand Down Expand Up @@ -188,6 +213,9 @@ func (appDB *AppDB) FlushValidators() {
panic(err)
}

appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(validatorsPath), data); err != nil {
panic(err)
}
Expand All @@ -199,6 +227,10 @@ const BlocksTimeCount = 4
// GetLastBlockTimeDelta returns delta of time between latest blocks
func (appDB *AppDB) GetLastBlockTimeDelta() (sumTimes int, count int) {
if len(appDB.lastTimeBlocks) == 0 {

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(blocksTimePath))
if err != nil {
panic(err)
Expand Down Expand Up @@ -232,6 +264,10 @@ func calcBlockDelta(times []uint64) (sumTimes int, num int) {

func (appDB *AppDB) AddBlocksTime(time time.Time) {
if len(appDB.lastTimeBlocks) == 0 {

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(blocksTimePath))
if err != nil {
panic(err)
Expand All @@ -257,6 +293,9 @@ func (appDB *AppDB) SaveBlocksTime() {
panic(err)
}

appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(blocksTimePath), data); err != nil {
panic(err)
}
Expand Down Expand Up @@ -291,6 +330,10 @@ func (appDB *AppDB) GetVersionHeight(name string) uint64 {

func (appDB *AppDB) GetVersions() []*Version {
if len(appDB.versions) == 0 {

appDB.mu.RLock()
defer appDB.mu.RUnlock()

result, err := appDB.db.Get([]byte(versionsPath))
if err != nil {
panic(err)
Expand All @@ -301,7 +344,6 @@ func (appDB *AppDB) GetVersions() []*Version {
panic(err)
}
}
// appDB.version = appDB.versions[len(appDB.versions)-1]
}

return appDB.versions
Expand All @@ -328,6 +370,9 @@ func (appDB *AppDB) SaveVersions() {
panic(err)
}

appDB.mu.Lock()
defer appDB.mu.Unlock()

if err := appDB.db.Set([]byte(versionsPath), data); err != nil {
panic(err)
}
Expand Down Expand Up @@ -375,10 +420,30 @@ func (appDB *AppDB) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser
if height == 0 {
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0")
}
if height > uint64(appDB.GetLastHeight()) {
if height > appDB.GetLastHeight() {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height)
}

var results []*types.SnapshotItem

appDB.mu.RLock()
for _, name := range []string{validatorsPath, heightPath, hashPath, versionsPath, blocksTimePath, startHeightPath} {
result, err := appDB.db.Get([]byte(name))
if err != nil {
panic(err)
}

results = append(results, &types.SnapshotItem{
Item: &types.SnapshotItem_Store{
Store: &types.SnapshotStoreItem{
Name: name,
Value: result,
},
},
})
}
appDB.mu.RUnlock()

// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go func() {
Expand Down Expand Up @@ -410,6 +475,14 @@ func (appDB *AppDB) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser
}()

{
for _, s := range results {
err = protoWriter.WriteMsg(s)
if err != nil {
chunkWriter.CloseWithError(err)
return
}
}

// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
Expand Down Expand Up @@ -458,26 +531,6 @@ func (appDB *AppDB) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser
}
exporter.Close()
}

for _, name := range []string{validatorsPath, heightPath, hashPath, versionsPath, blocksTimePath} {
result, err := appDB.db.Get([]byte(name))
if err != nil {
panic(err)
}

err = protoWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_Store{
Store: &types.SnapshotStoreItem{
Name: name,
Value: result,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
}
}()

return ch, nil
Expand Down Expand Up @@ -550,7 +603,7 @@ func (appDB *AppDB) Restore(
}
defer importer.Close()

case validatorsPath, heightPath, hashPath, versionsPath, blocksTimePath:
case validatorsPath, heightPath, hashPath, versionsPath, blocksTimePath, startHeightPath:
if err := appDB.db.Set([]byte(item.Store.Name), item.Store.Value); err != nil {
panic(err)
}
Expand Down

0 comments on commit 30a59fb

Please sign in to comment.