Skip to content

Commit

Permalink
feat: support for handling rollbacks
Browse files Browse the repository at this point in the history
This works by adding additional "added" and "deleted" keys for each UTxO
key to track the slot that each UTxO was added or deleted. On a rollback,
the slot numbers in these keys will be compared to the rollback slot,
and UTxOs will be added/removed accordingly.

Fixes #215
Fixes #167
  • Loading branch information
agaffney committed Jul 12, 2024
1 parent 0a5d847 commit a429389
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 32 deletions.
75 changes: 62 additions & 13 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

const (
syncStatusLogInterval = 30 * time.Second
rollbackSlots = 50 * 20 // 50 blocks with a 20s average between
)

type Indexer struct {
Expand Down Expand Up @@ -146,7 +147,7 @@ func (i *Indexer) Start() error {
// Configure pipeline filters
// We only care about transaction events
filterEvent := filter_event.New(
filter_event.WithTypes([]string{"chainsync.transaction"}),
filter_event.WithTypes([]string{"chainsync.transaction", "chainsync.rollback"}),
)
i.pipeline.AddFilter(filterEvent)
// We only care about transactions on a certain address
Expand Down Expand Up @@ -178,30 +179,69 @@ func (i *Indexer) Start() error {
}

func (i *Indexer) handleEvent(evt event.Event) error {
switch evt.Payload.(type) {
case input_chainsync.RollbackEvent:
return i.handleEventRollback(evt)
case input_chainsync.TransactionEvent:
return i.handleEventTransaction(evt)
default:
return fmt.Errorf("unknown event payload type: %T", evt.Payload)
}
}

func (i *Indexer) handleEventRollback(evt event.Event) error {
logger := logging.GetLogger()
store := storage.GetStorage()
eventRollback := evt.Payload.(input_chainsync.RollbackEvent)
store.Lock()
defer store.Unlock()
if err := store.Rollback(eventRollback.SlotNumber); err != nil {
return err
}
logger.Info(
fmt.Sprintf("rolled back to %d.%s", eventRollback.SlotNumber, eventRollback.BlockHash),
)
// Purge older deleted UTxOs
if err := store.PurgeDeletedUtxos(eventRollback.SlotNumber - rollbackSlots); err != nil {
logger.Warn(
fmt.Sprintf("failed to purge deleted UTxOs: %s", err),
)
}
return nil
}

func (i *Indexer) handleEventTransaction(evt event.Event) error {
cfg := config.GetConfig()
profileCfg := config.GetProfile()
logger := logging.GetLogger()
bursa := wallet.GetWallet()
store := storage.GetStorage()
eventTx := evt.Payload.(input_chainsync.TransactionEvent)
eventCtx := evt.Context.(input_chainsync.TransactionContext)
store.Lock()
defer store.Unlock()
// Delete used UTXOs
for _, txInput := range eventTx.Transaction.Consumed() {
// We don't have a ledger DB to know where the TX inputs came from, so we just try deleting them for our known addresses
for _, tmpAddress := range []string{cfg.Indexer.ScriptAddress, wallet.GetWallet().PaymentAddress} {
if err := store.RemoveUtxo(tmpAddress, txInput.Id().String(), txInput.Index()); err != nil {
for _, tmpAddress := range []string{cfg.Indexer.ScriptAddress, bursa.PaymentAddress} {
if err := store.RemoveUtxo(tmpAddress, txInput.Id().String(), txInput.Index(), eventCtx.SlotNumber); err != nil {
return err
}
}
}
for idx, txOutput := range eventTx.Transaction.Produced() {
// Write UTXO to storage
if err := store.AddUtxo(
txOutput.Address().String(),
eventCtx.TransactionHash,
uint32(idx),
txOutput.Cbor(),
); err != nil {
return err
if txOutput.Address().String() == cfg.Indexer.ScriptAddress ||
txOutput.Address().String() == bursa.PaymentAddress {
// Write UTXO to storage
if err := store.AddUtxo(
txOutput.Address().String(),
eventCtx.TransactionHash,
uint32(idx),
txOutput.Cbor(),
eventCtx.SlotNumber,
); err != nil {
return err
}
}
// Handle datum for script address
if txOutput.Address().String() == cfg.Indexer.ScriptAddress {
Expand Down Expand Up @@ -258,7 +298,7 @@ func (i *Indexer) handleEvent(evt event.Event) error {
trie := store.Trie()
trie.Lock()
trieKey := trie.HashKey(blockData.CurrentHash)
if err := trie.Update(trieKey, blockData.CurrentHash); err != nil {
if err := trie.Update(trieKey, blockData.CurrentHash, eventCtx.SlotNumber); err != nil {
trie.Unlock()
return err
}
Expand All @@ -279,14 +319,23 @@ func (i *Indexer) handleEvent(evt event.Event) error {
return err
}

// Restart miners for new datum
if i.tipReached {
// TODO: defer starting miner until after processing all TX outputs
// Restart miners for new datum
miner.GetManager().Stop()
miner.GetManager().Start(i.lastBlockData)
}
}
}
}
// Purge older deleted UTxOs
if i.tipReached {
if err := store.PurgeDeletedUtxos(eventCtx.SlotNumber - rollbackSlots); err != nil {
logger.Warn(
fmt.Sprintf("failed to purge delted UTxOs: %s", err),
)
}
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (m *Miner) Start() {
trie := storage.GetStorage().Trie()
trie.Lock()
tmpHashKey := storage.HashValue(targetHash).Bytes()
if err := trie.Update(tmpHashKey, targetHash); err != nil {
if err := trie.Update(tmpHashKey, targetHash, 0); err != nil {
panic(fmt.Sprintf("failed to update storage for trie: %s", err))
}
postDatum = models.TunaV2State{
Expand Down
204 changes: 194 additions & 10 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/blinklabs-io/bluefin/internal/config"
"github.com/blinklabs-io/bluefin/internal/logging"
Expand All @@ -34,6 +35,7 @@ const (
)

type Storage struct {
sync.Mutex
db *badger.DB
trie *Trie
}
Expand Down Expand Up @@ -190,8 +192,10 @@ func (s *Storage) AddUtxo(
txId string,
txOutIdx uint32,
txOutBytes []byte,
slot uint64,
) error {
key := fmt.Sprintf("utxo_%s_%s.%d", address, txId, txOutIdx)
keyUtxo := fmt.Sprintf("utxo_%s_%s.%d", address, txId, txOutIdx)
keyAdded := keyUtxo + `_added`
err := s.db.Update(func(txn *badger.Txn) error {
// Wrap TX output in UTxO structure to make it easier to consume later
txIdBytes, err := hex.DecodeString(txId)
Expand All @@ -213,7 +217,17 @@ func (s *Storage) AddUtxo(
if err != nil {
return err
}
if err := txn.Set([]byte(key), cborBytes); err != nil {
if err := txn.Set([]byte(keyUtxo), cborBytes); err != nil {
return err
}
// Set "added" key to provided slot number
if err := txn.Set(
[]byte(keyAdded),
[]byte(
// Convert slot to string for storage
strconv.Itoa(int(slot)),
),
); err != nil {
return err
}
return nil
Expand All @@ -225,10 +239,23 @@ func (s *Storage) RemoveUtxo(
address string,
txId string,
utxoIdx uint32,
slot uint64,
) error {
key := fmt.Sprintf("utxo_%s_%s.%d", address, txId, utxoIdx)
keyUtxo := fmt.Sprintf("utxo_%s_%s.%d", address, txId, utxoIdx)
keyDeleted := keyUtxo + `_deleted`
err := s.db.Update(func(txn *badger.Txn) error {
if err := txn.Delete([]byte(key)); err != nil {
// Check if UTxO exists at all
if _, err := txn.Get([]byte(keyUtxo)); err != nil {
return err
}
// Set "deleted" key to provided slot number
if err := txn.Set(
[]byte(keyDeleted),
[]byte(
// Convert slot to string for storage
strconv.Itoa(int(slot)),
),
); err != nil {
return err
}
return nil
Expand All @@ -250,15 +277,21 @@ func (s *Storage) GetUtxos(address string) ([][]byte, error) {
defer it.Close()
for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() {
item := it.Item()
err := item.Value(func(v []byte) error {
// Create copy of value for use outside of transaction
valCopy := append([]byte{}, v...)
ret = append(ret, valCopy)
return nil
})
key := item.Key()
// Ignore "added" and "deleted" metadata keys when iterating
if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) {
continue
}
// Ignore "deleted" UTxOs
keyDeleted := string(key) + `_deleted`
if _, err := txn.Get([]byte(keyDeleted)); err != badger.ErrKeyNotFound {
continue
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
ret = append(ret, val)
}
return nil
})
Expand All @@ -271,6 +304,157 @@ func (s *Storage) GetUtxos(address string) ([][]byte, error) {
return ret, nil
}

func (s *Storage) Rollback(slot uint64) error {
logger := logging.GetLogger()
keyPrefix := []byte(`utxo_`)
var deleteKeys [][]byte
err := s.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
// Ignore "added" and "deleted" metadata keys when iterating
if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) {
continue
}
// Restore UTxOs deleted after rollback slot
keyDeleted := string(key) + `_deleted`
delItem, err := txn.Get([]byte(keyDeleted))
if err != nil && err != badger.ErrKeyNotFound {
return err
}
if err != badger.ErrKeyNotFound {
delVal, err := delItem.ValueCopy(nil)
if err != nil {
return err
}
delSlot, err := strconv.ParseUint(string(delVal), 10, 64)
if err != nil {
return err
}
if delSlot > slot {
logger.Debug(
fmt.Sprintf(
"deleting key %s ('deleted' slot %d) to restore deleted UTxO",
keyDeleted,
delSlot,
),
)
deleteKeys = append(deleteKeys, []byte(keyDeleted))
}
}
// Remove UTxOs added after rollback slot
keyAdded := string(key) + `_added`
addItem, err := txn.Get([]byte(keyAdded))
if err != nil && err != badger.ErrKeyNotFound {
return err
}
if err != badger.ErrKeyNotFound {
addVal, err := addItem.ValueCopy(nil)
if err != nil {
return err
}
addSlot, err := strconv.ParseUint(string(addVal), 10, 64)
if err != nil {
return err
}
if addSlot > slot {
logger.Debug(
fmt.Sprintf(
"deleting keys %s ('added' slot %d) and %s to remove rolled-back UTxO",
key,
addSlot,
keyAdded,
),
)
deleteKeys = append(
deleteKeys,
key,
[]byte(keyAdded),
)
}
}
}
// We delete the keys outside of the iterator, because apparently you can't delete
// the current key when iterating
for _, key := range deleteKeys {
if err := txn.Delete([]byte(key)); err != nil {
return err
}
}
return nil
})
// Remove rolled-back hashes from trie
if err := s.trie.Rollback(slot); err != nil {
return err
}
return err
}

func (s *Storage) PurgeDeletedUtxos(beforeSlot uint64) error {
logger := logging.GetLogger()
keyPrefix := []byte(`utxo_`)
var deleteKeys [][]byte
err := s.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
// Ignore "added" and "deleted" metadata keys when iterating
if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) {
continue
}
// Check for "deleted" key
keyDeleted := string(key) + `_deleted`
delItem, err := txn.Get([]byte(keyDeleted))
if err != nil {
if err == badger.ErrKeyNotFound {
continue
}
return err
}
delVal, err := delItem.ValueCopy(nil)
if err != nil {
return err
}
delSlot, err := strconv.ParseUint(string(delVal), 10, 64)
if err != nil {
return err
}
if delSlot < beforeSlot {
deleteKeys = append(
deleteKeys,
// UTxO key
key,
// UTxO "added" key
[]byte(string(key)+`_added`),
// UTxO "deleted" key
[]byte(string(key)+`_deleted`),
)
}
}
// We delete the keys outside of the iterator, because apparently you can't delete
// the current key when iterating
for _, key := range deleteKeys {
if err := txn.Delete([]byte(key)); err != nil {
// Leave the rest for the next run if we hit the max transaction size
if err == badger.ErrTxnTooBig {
logger.Debug("purge deleted UTxOs: badger transaction too large, leaving remainder until next run")
break
}
return err
}
logger.Debug(
fmt.Sprintf("purged deleted UTxO key: %s", key),
)
}
return nil
})
return err
}

func GetStorage() *Storage {
return globalStorage
}
Expand Down
Loading

0 comments on commit a429389

Please sign in to comment.