Skip to content

Commit

Permalink
Snapshot validator entries and block signers + associated views
Browse files Browse the repository at this point in the history
  • Loading branch information
lazynina committed Apr 26, 2024
1 parent c69997a commit 7ae0a61
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 20 deletions.
76 changes: 72 additions & 4 deletions entries/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package entries
import (
"context"
"encoding/hex"
"reflect"
"time"

"github.com/deso-protocol/core/lib"
Expand Down Expand Up @@ -34,12 +35,43 @@ type PGBlockEntry struct {
BlockEntry
}

type BlockSigner struct {
BlockHash string
SignerIndex uint64
}

type PGBlockSigner struct {
bun.BaseModel `bun:"table:block_signer"`
BlockSigner
}

// Convert the UserAssociation DeSo encoder to the PG struct used by bun.
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) *PGBlockEntry {
func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *lib.DeSoParams) (*PGBlockEntry, []*PGBlockSigner) {
blockHash, _ := block.Hash()
blockHashHex := hex.EncodeToString(blockHash[:])
qc := block.Header.GetQC()
blockSigners := []*PGBlockSigner{}
if !isInterfaceNil(qc) {
aggSig := qc.GetAggregatedSignature()
if !isInterfaceNil(aggSig) {
signersList := aggSig.GetSignersList()
for ii := 0; ii < signersList.Size(); ii++ {
// Skip signers that didn't sign.
if !signersList.Get(ii) {
continue
}
blockSigners = append(blockSigners, &PGBlockSigner{
BlockSigner: BlockSigner{
BlockHash: blockHashHex,
SignerIndex: uint64(ii),
},
})
}
}
}
return &PGBlockEntry{
BlockEntry: BlockEntry{
BlockHash: hex.EncodeToString(blockHash[:]),
BlockHash: blockHashHex,
PrevBlockHash: hex.EncodeToString(block.Header.PrevBlockHash[:]),
TxnMerkleRoot: hex.EncodeToString(block.Header.TransactionMerkleRoot[:]),
Timestamp: consumer.UnixNanoToTime(uint64(block.Header.TstampNanoSecs)),
Expand All @@ -53,7 +85,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte, params *li
ProposerVotePartialSignature: block.Header.ProposerVotePartialSignature.ToString(),
BadgerKey: keyBytes,
},
}
}, blockSigners
}

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
Expand Down Expand Up @@ -120,11 +152,13 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
// Create a new array to hold the bun struct.
pgBlockEntrySlice := make([]*PGBlockEntry, 0)
pgTransactionEntrySlice := make([]*PGTransactionEntry, 0)
pgBlockSignersEntrySlice := make([]*PGBlockSigner, 0)

for _, entry := range uniqueBlocks {
block := entry.Encoder.(*lib.MsgDeSoBlock)
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
pgBlockEntrySlice = append(pgBlockEntrySlice, blockEntry)
pgBlockSignersEntrySlice = append(pgBlockSignersEntrySlice, blockSigners...)
for jj, transaction := range block.Txns {
indexInBlock := uint64(jj)
pgTransactionEntry, err := TransactionEncoderToPGStruct(
Expand Down Expand Up @@ -166,6 +200,19 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting transaction entries")
}

if len(pgBlockSignersEntrySlice) > 0 {
// Execute the insert query.
query := db.NewInsert().Model(&pgBlockSignersEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (block_hash, signer_index) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockEntry: Error inserting block signers")
}
}

return nil
}

Expand Down Expand Up @@ -214,5 +261,26 @@ func bulkDeleteBlockEntriesFromKeysToDelete(db *bun.DB, keysToDelete [][]byte) e
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting utxo operation entries")
}

// Delete any signers associated with the block.
if _, err := db.NewDelete().
Model(&PGBlockSigner{}).
Where("block_hash IN (?)", bun.In(blockHashHexesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBlockEntry: Error deleting block signers")
}
return nil
}

// golang interface types are stored as a tuple of (type, value). A single i==nil check is not enough to
// determine if a pointer that implements an interface is nil. This function checks if the interface is nil
// by checking if the pointer itself is nil.
func isInterfaceNil(i interface{}) bool {
if i == nil {
return true
}

value := reflect.ValueOf(i)
return value.Kind() == reflect.Ptr && value.IsNil()
}
27 changes: 21 additions & 6 deletions entries/utxo_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
transactionUpdates := make([]*PGTransactionEntry, 0)
affectedPublicKeys := make([]*PGAffectedPublicKeyEntry, 0)
blockEntries := make([]*PGBlockEntry, 0)
pgBlockSigners := make([]*PGBlockSigner, 0)
stakeRewardEntries := make([]*PGStakeReward, 0)
jailedHistoryEntries := make([]*PGJailedHistoryEvent, 0)

// Start timer to track how long it takes to insert the entries.
start := time.Now()

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserting %v entries\n", len(uniqueEntries))
transactionCount := 0

// Whether we are inserting transactions for the first time, or just updating them.
Expand All @@ -126,8 +127,9 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
if entry.Block != nil {
insertTransactions = true
block := entry.Block
blockEntry := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntry, blockSigners := BlockEncoderToPGStruct(block, entry.KeyBytes, params)
blockEntries = append(blockEntries, blockEntry)
pgBlockSigners = append(pgBlockSigners, blockSigners...)
for ii, txn := range block.Txns {
indexInBlock := uint64(ii)
pgTxn, err := TransactionEncoderToPGStruct(
Expand Down Expand Up @@ -277,7 +279,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
transactionCount += len(innerTransactionsUtxoOperations)
// Print how long it took to insert the entries.
}
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Processed %v txns in %v s\n", transactionCount, time.Since(start))

start = time.Now()

Expand All @@ -299,6 +301,16 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
return errors.Wrapf(err, "entries.bulkInsertBlock: Error inserting entries")
}

blockSignerQuery := db.NewInsert().Model(&pgBlockSigners)

if operationType == lib.DbOperationTypeUpsert {
blockSignerQuery = blockSignerQuery.On("CONFLICT (block_hash, signer_index) DO UPDATE")
}

if _, err := blockSignerQuery.Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBlockSigners: Error inserting block signer entries")
}

} else {
values := db.NewValues(&transactionUpdates)
_, err := db.NewUpdate().
Expand All @@ -317,7 +329,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
}
}

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Updated %v txns in %v s\n", len(transactionUpdates), time.Since(start))

start = time.Now()

Expand All @@ -329,7 +341,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
}
}

fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v affected public keys in %v s\n", len(affectedPublicKeys), time.Since(start))

start = time.Now()

Expand All @@ -340,7 +352,7 @@ func bulkInsertUtxoOperationsEntry(entries []*lib.StateChangeEntry, db *bun.DB,
return errors.Wrapf(err, "InsertStakeRewards: Problem inserting stake rewards")
}
}
fmt.Printf("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))
glog.V(2).Infof("entries.bulkInsertUtxoOperationsEntry: Inserted %v stake rewards in %v s\n", len(stakeRewardEntries), time.Since(start))

if len(jailedHistoryEntries) > 0 {
_, err := db.NewInsert().Model(&jailedHistoryEntries).On("CONFLICT (validator_pkid, jailed_at_epoch_number, unjailed_at_epoch_number) DO NOTHING").Exec(context.Background())
Expand Down Expand Up @@ -410,6 +422,9 @@ func parseUtxoOperationBundle(
}
txIndexMetadata, err := consumer.ComputeTransactionMetadata(transaction, blockHashHex, params, transaction.TxnFeeNanos, uint64(jj), utxoOps)
if err != nil {
glog.Errorf("parseUtxoOperationBundle: Problem computing transaction metadata for "+
"entry %+v at block height %v: %v", entry, entry.BlockHeight, err)
// TODO: swallow error and continue.
return nil,
nil,
nil,
Expand Down
110 changes: 100 additions & 10 deletions entries/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ type PGValidatorEntryUtxoOps struct {
UtxoOperation
}

type SnapshotValidatorEntry struct {
ValidatorPKID string `bun:",nullzero"`
Domains []string `bun:",array"`
DisableDelegatedStake bool
DelegatedStakeCommissionBasisPoints uint64
VotingPublicKey string `bun:",nullzero"`
VotingAuthorization string `bun:",nullzero"`
// Use bunbig.Int to store the balance as a numeric in the pg database.
TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"`
LastActiveAtEpochNumber uint64
JailedAtEpochNumber uint64
SnapshotAtEpochNumber uint64 `pg:",use_zero"`

ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}

type PGSnapshotValidatorEntry struct {
bun.BaseModel `bun:"table:snapshot_validator_entry"`
SnapshotValidatorEntry
}

// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun.
func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry {
pgValidatorEntry := ValidatorEntry{
Expand Down Expand Up @@ -96,23 +118,43 @@ func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
uniqueValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)
uniqueSnapshotValidatorEntries := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries))
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueValidatorEntries))
pgSnapshotEntrySlice := make([]*PGSnapshotValidatorEntry, len(uniqueSnapshotValidatorEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
for ii, entry := range uniqueValidatorEntries {
pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}
for ii, entry := range uniqueSnapshotValidatorEntries {
pgSnapshotEntrySlice[ii] = &PGSnapshotValidatorEntry{SnapshotValidatorEntry: SnapshotValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)
if len(pgEntrySlice) > 0 {
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting validator entries")
}
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries")
if len(pgSnapshotEntrySlice) > 0 {
query := db.NewInsert().Model(&pgSnapshotEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting snapshot validator entries")
}
}
return nil
}
Expand All @@ -123,16 +165,64 @@ func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, opera
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
validatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixValidatorByPKID)

// Execute the delete query.
snapshotValidatorEntriesToDelete := consumer.FilterEntriesByPrefix(uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorSetByPKID)

// Execute the delete query for validator entries.
if _, err := db.NewDelete().
Model(&PGValidatorEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Where("badger_key IN (?)", bun.In(validatorEntriesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries")
}

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGSnapshotValidatorEntry{}).
Where("badger_key IN (?)", bun.In(snapshotValidatorEntriesToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteSnapshotValidatorEntry: Error deleting entries")
}

return nil
}

// Convert the SnapshotValidatorEntry DeSo encoder to the PGSnapshotValidatorEntry struct used by bun.
func SnapshotValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) SnapshotValidatorEntry {
pgValidatorEntry := SnapshotValidatorEntry{
ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData),
BadgerKey: keyBytes,
}

if validatorEntry.ValidatorPKID != nil {
pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params)
}

if validatorEntry.Domains != nil {
pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains))
for ii, domain := range validatorEntry.Domains {
pgValidatorEntry.Domains[ii] = string(domain)
}
}

pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake
pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints

if validatorEntry.VotingPublicKey != nil {
pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString()
}

if validatorEntry.VotingAuthorization != nil {
pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString()
}

pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig())
pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber
pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber
keyBytesWithoutPrefix := keyBytes[1:]
pgValidatorEntry.SnapshotAtEpochNumber = lib.DecodeUint64(keyBytesWithoutPrefix[:8])
return pgValidatorEntry
}
Loading

0 comments on commit 7ae0a61

Please sign in to comment.