Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ln/default-options-rosetta-index #55

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deso/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestUtxoOpsProblem(t *testing.T) {
rosettaIndexOpts.ValueDir = rosettaIndexDir
rosettaIndex, err := badger.Open(rosettaIndexOpts)
require.NoError(err)
node.Index = NewIndex(rosettaIndex)
node.Index = NewIndex(rosettaIndex, node.chainDB)

// Listen to transaction and block events so we can fill RosettaIndex with relevant data
node.EventManager = lib.NewEventManager()
Expand Down
257 changes: 126 additions & 131 deletions deso/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,63 +36,63 @@ func (node *Node) handleSnapshotCompleted() {
// Iterate through every single public key and put a balance snapshot down
// for it for this block. We don't need to worry about ancestral records here
// because we haven't generated any yet.

err := node.Index.db.Update(func(indexTxn *badger.Txn) error {
return node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
nodeIterator := chainTxn.NewIterator(opts)
defer nodeIterator.Close()
prefix := lib.Prefixes.PrefixPublicKeyToDeSoBalanceNanos

// Partition the balances across the blocks before the snapshot block height.
totalCount := uint64(0)
for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() {
totalCount++
err := node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
nodeIterator := chainTxn.NewIterator(opts)
defer nodeIterator.Close()
prefix := lib.Prefixes.PrefixPublicKeyToDeSoBalanceNanos

// Partition the balances across the blocks before the snapshot block height.
totalCount := uint64(0)
for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() {
totalCount++
}
currentBlockHeight := uint64(1)
// We'll force a ceiling on this because otherwise the last block could amass O(snapshotBlockHeight) balances
balancesPerBlock := totalCount / snapshotBlockHeight
balancesMap := make(map[lib.PublicKey]uint64)
if totalCount < snapshotBlockHeight {
balancesPerBlock = 1
}
currentCounter := uint64(0)

for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() {
key := nodeIterator.Item().Key()
keyCopy := make([]byte, len(key))
copy(keyCopy[:], key[:])

valCopy, err := nodeIterator.Item().ValueCopy(nil)
if err != nil {
return errors.Wrapf(err, "Problem iterating over chain database, "+
"on key (%v) and value (%v)", keyCopy, valCopy)
}
currentBlockHeight := uint64(1)
// We'll force a ceiling on this because otherwise the last block could amass O(snapshotBlockHeight) balances
balancesPerBlock := totalCount / snapshotBlockHeight
balancesMap := make(map[lib.PublicKey]uint64)
if totalCount < snapshotBlockHeight {
balancesPerBlock = 1
}
currentCounter := uint64(0)

for nodeIterator.Seek(prefix); nodeIterator.ValidForPrefix(prefix); nodeIterator.Next() {
key := nodeIterator.Item().Key()
keyCopy := make([]byte, len(key))
copy(keyCopy[:], key[:])

valCopy, err := nodeIterator.Item().ValueCopy(nil)
if err != nil {
return errors.Wrapf(err, "Problem iterating over chain database, "+
"on key (%v) and value (%v)", keyCopy, valCopy)
}

balance := lib.DecodeUint64(valCopy)
pubKey := lib.NewPublicKey(key[1:])
balancesMap[*pubKey] = balance

if err := node.Index.PutSingleBalanceSnapshotWithTxn(
indexTxn, currentBlockHeight, false, *pubKey, balance); err != nil {
return errors.Wrapf(err, "Problem updating balance snapshot in index, "+
"on key (%v), value (%v), and height (%v)", keyCopy, valCopy,
snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight)
}
balance := lib.DecodeUint64(valCopy)
pubKey := lib.NewPublicKey(key[1:])
balancesMap[*pubKey] = balance
err = node.Index.PutSingleBalanceSnapshot(currentBlockHeight, false, *pubKey, balance)
if err != nil {
return errors.Wrapf(err, "Problem updating balance snapshot in index,"+
"on key (%v), value (%v), and height (%v)", keyCopy, valCopy,
snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight)
}

currentCounter += 1
if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight {
node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap)
balancesMap = make(map[lib.PublicKey]uint64)
currentBlockHeight++
currentCounter = 0
currentCounter += 1
if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight {
if err = node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap); err != nil {
return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight)
}
balancesMap = make(map[lib.PublicKey]uint64)
currentBlockHeight++
currentCounter = 0
}
if currentCounter > 0 {
node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap)
}
if currentCounter > 0 {
if err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, false, balancesMap); err != nil {
return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight)
}
return nil
})
}
return nil
})
if err != nil {
glog.Errorf(lib.CLog(lib.Red, fmt.Sprintf("handleSnapshotCompleted: error: (%v)", err)))
Expand All @@ -109,87 +109,89 @@ func (node *Node) handleSnapshotCompleted() {
//
// TODO: Do we need to do anything special for SwapIdentity? See below for
// some tricky logic there.
// This is pretty much the same as lib.DBGetAllProfilesByCoinValue but we don't load all entries into memory.
err := node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error {
dbPrefixx := append([]byte{}, lib.Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID...)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
// Go in reverse order since a larger count is better.
opts.Reverse = true

it := chainTxn.NewIterator(opts)
defer it.Close()

totalCount := uint64(0)
for it.Seek(dbPrefixx); it.ValidForPrefix(dbPrefixx); it.Next() {
totalCount++
}
currentBlockHeight := uint64(1)
balancesPerBlock := totalCount / snapshotBlockHeight
balancesMap := make(map[lib.PublicKey]uint64)
if totalCount < snapshotBlockHeight {
balancesPerBlock = 1
}
currentCounter := uint64(0)

// Since we iterate backwards, the prefix must be bigger than all possible
// counts that could actually exist. We use eight bytes since the count is
// encoded as a 64-bit big-endian byte slice, which will be eight bytes long.
maxBigEndianUint64Bytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
prefix := append(dbPrefixx, maxBigEndianUint64Bytes...)
for it.Seek(prefix); it.ValidForPrefix(dbPrefixx); it.Next() {
rawKey := it.Item().Key()

// Strip the prefix off the key and check its length. If it contains
// a big-endian uint64 then it should be at least eight bytes.
lockedDeSoPubKeyConcatKey := rawKey[1:]
uint64BytesLen := len(maxBigEndianUint64Bytes)
expectedLength := uint64BytesLen + btcec.PubKeyBytesLenCompressed
if len(lockedDeSoPubKeyConcatKey) != expectedLength {
return fmt.Errorf("Invalid key length %d should be at least %d",
len(lockedDeSoPubKeyConcatKey), expectedLength)
}

err := node.Index.db.Update(func(indexTxn *badger.Txn) error {
// This is pretty much the same as lib.DBGetAllProfilesByCoinValue but we don't load all entries into memory.
return node.GetBlockchain().DB().View(func(chainTxn *badger.Txn) error {
dbPrefixx := append([]byte{}, lib.Prefixes.PrefixCreatorDeSoLockedNanosCreatorPKID...)
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
// Go in reverse order since a larger count is better.
opts.Reverse = true
lockedDeSoNanos := lib.DecodeUint64(lockedDeSoPubKeyConcatKey[:uint64BytesLen])

it := chainTxn.NewIterator(opts)
defer it.Close()
// Appended to the stake should be the profile pub key so extract it here.
profilePKIDbytes := make([]byte, btcec.PubKeyBytesLenCompressed)
copy(profilePKIDbytes[:], lockedDeSoPubKeyConcatKey[uint64BytesLen:])
profilePKID := lib.PublicKeyToPKID(profilePKIDbytes)

totalCount := uint64(0)
for it.Seek(dbPrefixx); it.ValidForPrefix(dbPrefixx); it.Next() {
totalCount++
pkBytes := lib.DBGetPublicKeyForPKIDWithTxn(chainTxn, nil, profilePKID)
if pkBytes == nil {
return fmt.Errorf("DBGetPublicKeyForPKIDWithTxn: Nil pkBytes for pkid %v",
lib.PkToStringMainnet(profilePKID[:]))
}
currentBlockHeight := uint64(1)
balancesPerBlock := totalCount / snapshotBlockHeight
balancesMap := make(map[lib.PublicKey]uint64)
if totalCount < snapshotBlockHeight {
balancesPerBlock = 1
pubKey := *lib.NewPublicKey(pkBytes)
balancesMap[pubKey] = lockedDeSoNanos

// We have to also put the balances in the other index. Not doing this would cause
// balances to return zero when we're PAST the first snapshot block height.
if err := node.Index.PutSingleBalanceSnapshot(
currentBlockHeight, true, pubKey, lockedDeSoNanos); err != nil {
return errors.Wrapf(err, "PutSingleBalanceSnapshot: problem with "+
"pubkey (%v), lockedDeSoNanos (%v) and firstSnapshotHeight (%v)",
pubKey, lockedDeSoNanos, snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight)
}
currentCounter := uint64(0)

// Since we iterate backwards, the prefix must be bigger than all possible
// counts that could actually exist. We use eight bytes since the count is
// encoded as a 64-bit big-endian byte slice, which will be eight bytes long.
maxBigEndianUint64Bytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
prefix := append(dbPrefixx, maxBigEndianUint64Bytes...)
for it.Seek(prefix); it.ValidForPrefix(dbPrefixx); it.Next() {
rawKey := it.Item().Key()

// Strip the prefix off the key and check its length. If it contains
// a big-endian uint64 then it should be at least eight bytes.
lockedDeSoPubKeyConcatKey := rawKey[1:]
uint64BytesLen := len(maxBigEndianUint64Bytes)
expectedLength := uint64BytesLen + btcec.PubKeyBytesLenCompressed
if len(lockedDeSoPubKeyConcatKey) != expectedLength {
return fmt.Errorf("Invalid key length %d should be at least %d",
len(lockedDeSoPubKeyConcatKey), expectedLength)
}

lockedDeSoNanos := lib.DecodeUint64(lockedDeSoPubKeyConcatKey[:uint64BytesLen])

// Appended to the stake should be the profile pub key so extract it here.
profilePKIDbytes := make([]byte, btcec.PubKeyBytesLenCompressed)
copy(profilePKIDbytes[:], lockedDeSoPubKeyConcatKey[uint64BytesLen:])
profilePKID := lib.PublicKeyToPKID(profilePKIDbytes)

pkBytes := lib.DBGetPublicKeyForPKIDWithTxn(chainTxn, nil, profilePKID)
if pkBytes == nil {
return fmt.Errorf("DBGetPublicKeyForPKIDWithTxn: Nil pkBytes for pkid %v",
lib.PkToStringMainnet(profilePKID[:]))
}
pubKey := *lib.NewPublicKey(pkBytes)
balancesMap[pubKey] = lockedDeSoNanos

// We have to also put the balances in the other index. Not doing this would cause
// balances to return zero when we're PAST the first snapshot block height.
if err := node.Index.PutSingleBalanceSnapshotWithTxn(
indexTxn, currentBlockHeight, true, pubKey, lockedDeSoNanos); err != nil {

return errors.Wrapf(err, "PutSingleBalanceSnapshotWithTxn: problem with "+
"pubkey (%v), lockedDeSoNanos (%v) and firstSnapshotHeight (%v)",
pubKey, lockedDeSoNanos, snapshot.CurrentEpochSnapshotMetadata.FirstSnapshotBlockHeight)
}

currentCounter += 1
if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight {
node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap)
balancesMap = make(map[lib.PublicKey]uint64)
currentBlockHeight++
currentCounter = 0
currentCounter += 1
if currentCounter >= balancesPerBlock && currentBlockHeight < snapshotBlockHeight {
err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap)
if err != nil {
return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight)
}
balancesMap = make(map[lib.PublicKey]uint64)
currentBlockHeight++
currentCounter = 0
}
if currentCounter > 0 {
node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap)
}
if currentCounter > 0 {
err := node.Index.PutHypersyncBlockBalances(currentBlockHeight, true, balancesMap)
if err != nil {
return errors.Wrapf(err, "Problem putting hypserync block balances at height %v", currentBlockHeight)
}
return nil
})
}
return nil
})
if err != nil {
glog.Errorf(lib.CLog(lib.Red, fmt.Sprintf("handleSnapshotCompleted: Problem iterating locked "+
Expand Down Expand Up @@ -223,16 +225,9 @@ func (node *Node) handleBlockConnected(event *lib.BlockEvent) {
// don't have a snapshot. We output extra metadata for this block to ensure
// Rosetta connects it appropriately.

// Save the UTXOOps. These are used to compute all of the meta information
// that Rosetta needs.
err := node.Index.PutUtxoOps(event.Block, event.UtxoOps)
if err != nil {
glog.Errorf("PutSpentUtxos: %v", err)
}

// Save a balance snapshot
balances := event.UtxoView.PublicKeyToDeSoBalanceNanos
err = node.Index.PutBalanceSnapshot(event.Block.Header.Height, false, balances)
err := node.Index.PutBalanceSnapshot(event.Block.Header.Height, false, balances)
if err != nil {
glog.Errorf("PutBalanceSnapshot: %v", err)
}
Expand Down
Loading