diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 52bb670..bfb92aa 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -38,6 +38,7 @@ import ( const ( syncStatusLogInterval = 30 * time.Second + rollbackSlots = 50 * 20 // 50 blocks with a 20s average between ) type Indexer struct { @@ -146,7 +147,7 @@ func (i *Indexer) Start() error { // Configure pipeline filters // We only care about transaction events filterEvent := filter_event.New( - filter_event.WithTypes([]string{"chainsync.transaction"}), + filter_event.WithTypes([]string{"chainsync.transaction", "chainsync.rollback"}), ) i.pipeline.AddFilter(filterEvent) // We only care about transactions on a certain address @@ -178,30 +179,69 @@ func (i *Indexer) Start() error { } func (i *Indexer) handleEvent(evt event.Event) error { + switch evt.Payload.(type) { + case input_chainsync.RollbackEvent: + return i.handleEventRollback(evt) + case input_chainsync.TransactionEvent: + return i.handleEventTransaction(evt) + default: + return fmt.Errorf("unknown event payload type: %T", evt.Payload) + } +} + +func (i *Indexer) handleEventRollback(evt event.Event) error { + logger := logging.GetLogger() + store := storage.GetStorage() + eventRollback := evt.Payload.(input_chainsync.RollbackEvent) + store.Lock() + defer store.Unlock() + if err := store.Rollback(eventRollback.SlotNumber); err != nil { + return err + } + logger.Info( + fmt.Sprintf("rolled back to %d.%s", eventRollback.SlotNumber, eventRollback.BlockHash), + ) + // Purge older deleted UTxOs + if err := store.PurgeDeletedUtxos(eventRollback.SlotNumber - rollbackSlots); err != nil { + logger.Warn( + fmt.Sprintf("failed to purge deleted UTxOs: %s", err), + ) + } + return nil +} + +func (i *Indexer) handleEventTransaction(evt event.Event) error { cfg := config.GetConfig() profileCfg := config.GetProfile() logger := logging.GetLogger() + bursa := wallet.GetWallet() store := storage.GetStorage() eventTx := evt.Payload.(input_chainsync.TransactionEvent) eventCtx := evt.Context.(input_chainsync.TransactionContext) + store.Lock() + defer store.Unlock() // Delete used UTXOs for _, txInput := range eventTx.Transaction.Consumed() { // We don't have a ledger DB to know where the TX inputs came from, so we just try deleting them for our known addresses - for _, tmpAddress := range []string{cfg.Indexer.ScriptAddress, wallet.GetWallet().PaymentAddress} { - if err := store.RemoveUtxo(tmpAddress, txInput.Id().String(), txInput.Index()); err != nil { + for _, tmpAddress := range []string{cfg.Indexer.ScriptAddress, bursa.PaymentAddress} { + if err := store.RemoveUtxo(tmpAddress, txInput.Id().String(), txInput.Index(), eventCtx.SlotNumber); err != nil { return err } } } for idx, txOutput := range eventTx.Transaction.Produced() { - // Write UTXO to storage - if err := store.AddUtxo( - txOutput.Address().String(), - eventCtx.TransactionHash, - uint32(idx), - txOutput.Cbor(), - ); err != nil { - return err + if txOutput.Address().String() == cfg.Indexer.ScriptAddress || + txOutput.Address().String() == bursa.PaymentAddress { + // Write UTXO to storage + if err := store.AddUtxo( + txOutput.Address().String(), + eventCtx.TransactionHash, + uint32(idx), + txOutput.Cbor(), + eventCtx.SlotNumber, + ); err != nil { + return err + } } // Handle datum for script address if txOutput.Address().String() == cfg.Indexer.ScriptAddress { @@ -258,7 +298,7 @@ func (i *Indexer) handleEvent(evt event.Event) error { trie := store.Trie() trie.Lock() trieKey := trie.HashKey(blockData.CurrentHash) - if err := trie.Update(trieKey, blockData.CurrentHash); err != nil { + if err := trie.Update(trieKey, blockData.CurrentHash, eventCtx.SlotNumber); err != nil { trie.Unlock() return err } @@ -279,14 +319,23 @@ func (i *Indexer) handleEvent(evt event.Event) error { return err } - // Restart miners for new datum if i.tipReached { + // TODO: defer starting miner until after processing all TX outputs + // Restart miners for new datum miner.GetManager().Stop() miner.GetManager().Start(i.lastBlockData) } } } } + // Purge older deleted UTxOs + if i.tipReached { + if err := store.PurgeDeletedUtxos(eventCtx.SlotNumber - rollbackSlots); err != nil { + logger.Warn( + fmt.Sprintf("failed to purge delted UTxOs: %s", err), + ) + } + } return nil } diff --git a/internal/miner/miner.go b/internal/miner/miner.go index 36d4a46..2bee29f 100644 --- a/internal/miner/miner.go +++ b/internal/miner/miner.go @@ -279,7 +279,7 @@ func (m *Miner) Start() { trie := storage.GetStorage().Trie() trie.Lock() tmpHashKey := storage.HashValue(targetHash).Bytes() - if err := trie.Update(tmpHashKey, targetHash); err != nil { + if err := trie.Update(tmpHashKey, targetHash, 0); err != nil { panic(fmt.Sprintf("failed to update storage for trie: %s", err)) } postDatum = models.TunaV2State{ diff --git a/internal/storage/storage.go b/internal/storage/storage.go index f767890..ef38419 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/blinklabs-io/bluefin/internal/config" "github.com/blinklabs-io/bluefin/internal/logging" @@ -34,6 +35,7 @@ const ( ) type Storage struct { + sync.Mutex db *badger.DB trie *Trie } @@ -190,8 +192,10 @@ func (s *Storage) AddUtxo( txId string, txOutIdx uint32, txOutBytes []byte, + slot uint64, ) error { - key := fmt.Sprintf("utxo_%s_%s.%d", address, txId, txOutIdx) + keyUtxo := fmt.Sprintf("utxo_%s_%s.%d", address, txId, txOutIdx) + keyAdded := keyUtxo + `_added` err := s.db.Update(func(txn *badger.Txn) error { // Wrap TX output in UTxO structure to make it easier to consume later txIdBytes, err := hex.DecodeString(txId) @@ -213,7 +217,17 @@ func (s *Storage) AddUtxo( if err != nil { return err } - if err := txn.Set([]byte(key), cborBytes); err != nil { + if err := txn.Set([]byte(keyUtxo), cborBytes); err != nil { + return err + } + // Set "added" key to provided slot number + if err := txn.Set( + []byte(keyAdded), + []byte( + // Convert slot to string for storage + strconv.Itoa(int(slot)), + ), + ); err != nil { return err } return nil @@ -225,10 +239,23 @@ func (s *Storage) RemoveUtxo( address string, txId string, utxoIdx uint32, + slot uint64, ) error { - key := fmt.Sprintf("utxo_%s_%s.%d", address, txId, utxoIdx) + keyUtxo := fmt.Sprintf("utxo_%s_%s.%d", address, txId, utxoIdx) + keyDeleted := keyUtxo + `_deleted` err := s.db.Update(func(txn *badger.Txn) error { - if err := txn.Delete([]byte(key)); err != nil { + // Check if UTxO exists at all + if _, err := txn.Get([]byte(keyUtxo)); err != nil { + return err + } + // Set "deleted" key to provided slot number + if err := txn.Set( + []byte(keyDeleted), + []byte( + // Convert slot to string for storage + strconv.Itoa(int(slot)), + ), + ); err != nil { return err } return nil @@ -250,15 +277,21 @@ func (s *Storage) GetUtxos(address string) ([][]byte, error) { defer it.Close() for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() { item := it.Item() - err := item.Value(func(v []byte) error { - // Create copy of value for use outside of transaction - valCopy := append([]byte{}, v...) - ret = append(ret, valCopy) - return nil - }) + key := item.Key() + // Ignore "added" and "deleted" metadata keys when iterating + if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) { + continue + } + // Ignore "deleted" UTxOs + keyDeleted := string(key) + `_deleted` + if _, err := txn.Get([]byte(keyDeleted)); err != badger.ErrKeyNotFound { + continue + } + val, err := item.ValueCopy(nil) if err != nil { return err } + ret = append(ret, val) } return nil }) @@ -271,6 +304,157 @@ func (s *Storage) GetUtxos(address string) ([][]byte, error) { return ret, nil } +func (s *Storage) Rollback(slot uint64) error { + logger := logging.GetLogger() + keyPrefix := []byte(`utxo_`) + var deleteKeys [][]byte + err := s.db.Update(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() { + item := it.Item() + key := item.KeyCopy(nil) + // Ignore "added" and "deleted" metadata keys when iterating + if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) { + continue + } + // Restore UTxOs deleted after rollback slot + keyDeleted := string(key) + `_deleted` + delItem, err := txn.Get([]byte(keyDeleted)) + if err != nil && err != badger.ErrKeyNotFound { + return err + } + if err != badger.ErrKeyNotFound { + delVal, err := delItem.ValueCopy(nil) + if err != nil { + return err + } + delSlot, err := strconv.ParseUint(string(delVal), 10, 64) + if err != nil { + return err + } + if delSlot > slot { + logger.Debug( + fmt.Sprintf( + "deleting key %s ('deleted' slot %d) to restore deleted UTxO", + keyDeleted, + delSlot, + ), + ) + deleteKeys = append(deleteKeys, []byte(keyDeleted)) + } + } + // Remove UTxOs added after rollback slot + keyAdded := string(key) + `_added` + addItem, err := txn.Get([]byte(keyAdded)) + if err != nil && err != badger.ErrKeyNotFound { + return err + } + if err != badger.ErrKeyNotFound { + addVal, err := addItem.ValueCopy(nil) + if err != nil { + return err + } + addSlot, err := strconv.ParseUint(string(addVal), 10, 64) + if err != nil { + return err + } + if addSlot > slot { + logger.Debug( + fmt.Sprintf( + "deleting keys %s ('added' slot %d) and %s to remove rolled-back UTxO", + key, + addSlot, + keyAdded, + ), + ) + deleteKeys = append( + deleteKeys, + key, + []byte(keyAdded), + ) + } + } + } + // We delete the keys outside of the iterator, because apparently you can't delete + // the current key when iterating + for _, key := range deleteKeys { + if err := txn.Delete([]byte(key)); err != nil { + return err + } + } + return nil + }) + // Remove rolled-back hashes from trie + if err := s.trie.Rollback(slot); err != nil { + return err + } + return err +} + +func (s *Storage) PurgeDeletedUtxos(beforeSlot uint64) error { + logger := logging.GetLogger() + keyPrefix := []byte(`utxo_`) + var deleteKeys [][]byte + err := s.db.Update(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() { + item := it.Item() + key := item.KeyCopy(nil) + // Ignore "added" and "deleted" metadata keys when iterating + if strings.HasSuffix(string(key), `_deleted`) || strings.HasSuffix(string(key), `_added`) { + continue + } + // Check for "deleted" key + keyDeleted := string(key) + `_deleted` + delItem, err := txn.Get([]byte(keyDeleted)) + if err != nil { + if err == badger.ErrKeyNotFound { + continue + } + return err + } + delVal, err := delItem.ValueCopy(nil) + if err != nil { + return err + } + delSlot, err := strconv.ParseUint(string(delVal), 10, 64) + if err != nil { + return err + } + if delSlot < beforeSlot { + deleteKeys = append( + deleteKeys, + // UTxO key + key, + // UTxO "added" key + []byte(string(key)+`_added`), + // UTxO "deleted" key + []byte(string(key)+`_deleted`), + ) + } + } + // We delete the keys outside of the iterator, because apparently you can't delete + // the current key when iterating + for _, key := range deleteKeys { + if err := txn.Delete([]byte(key)); err != nil { + // Leave the rest for the next run if we hit the max transaction size + if err == badger.ErrTxnTooBig { + logger.Debug("purge deleted UTxOs: badger transaction too large, leaving remainder until next run") + break + } + return err + } + logger.Debug( + fmt.Sprintf("purged deleted UTxO key: %s", key), + ) + } + return nil + }) + return err +} + func GetStorage() *Storage { return globalStorage } diff --git a/internal/storage/trie.go b/internal/storage/trie.go index 61d812f..e498ec8 100644 --- a/internal/storage/trie.go +++ b/internal/storage/trie.go @@ -17,6 +17,7 @@ package storage import ( "encoding/hex" "fmt" + "strconv" "strings" "sync" @@ -55,7 +56,7 @@ func (t *Trie) load() error { return err } trieKey := t.HashKey(seedHashBytes) - if err := t.Update(trieKey, seedHashBytes); err != nil { + if err := t.Update(trieKey, seedHashBytes, 0); err != nil { return err } } @@ -82,13 +83,27 @@ func (t *Trie) load() error { return err } -func (t *Trie) Update(key []byte, val []byte) error { +func (t *Trie) Update(key []byte, val []byte, slot uint64) error { // Update trie t.trie.Set(key, val) // Update storage dbKey := t.dbKeyPrefix(key) err := t.db.Update(func(txn *badger.Txn) error { - return txn.Set(dbKey, val) + if err := txn.Set(dbKey, val); err != nil { + return err + } + // Set "added" key to provided slot number + keyAdded := `meta_` + string(dbKey) + `_added` + if err := txn.Set( + []byte(keyAdded), + []byte( + // Convert slot to string for storage + strconv.Itoa(int(slot)), + ), + ); err != nil { + return err + } + return nil }) return err } @@ -101,7 +116,57 @@ func (t *Trie) Delete(key []byte) error { // Update storage dbKey := t.dbKeyPrefix(key) err := t.db.Update(func(txn *badger.Txn) error { - return txn.Delete(dbKey) + if err := txn.Delete(dbKey); err != nil { + return err + } + // Delete "added" key + keyAdded := `meta_` + string(dbKey) + `_added` + if err := txn.Delete([]byte(keyAdded)); err != nil { + if err != badger.ErrKeyNotFound { + return err + } + } + return nil + }) + return err +} + +func (t *Trie) Rollback(slot uint64) error { + dbKeyPrefix := t.dbKeyPrefix(nil) + err := t.db.Update(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + for it.Seek(dbKeyPrefix); it.ValidForPrefix(dbKeyPrefix); it.Next() { + item := it.Item() + key := item.Key() + keyAdded := `meta_` + string(key) + `_added` + addItem, err := txn.Get([]byte(keyAdded)) + if err != nil { + if err == badger.ErrKeyNotFound { + continue + } + return err + } + addVal, err := addItem.ValueCopy(nil) + if err != nil { + return err + } + addSlot, err := strconv.Atoi(string(addVal)) + if err != nil { + return err + } + if addSlot > int(slot) { + // Delete rolled-back hashes from trie + tmpKey := strings.TrimPrefix( + string(item.Key()), + string(dbKeyPrefix), + ) + if err := t.Delete([]byte(tmpKey)); err != nil { + return err + } + } + } + return nil }) return err } diff --git a/internal/tx/tx.go b/internal/tx/tx.go index 25ba9b6..6daaff0 100644 --- a/internal/tx/tx.go +++ b/internal/tx/tx.go @@ -17,6 +17,7 @@ package tx import ( "bytes" "encoding/hex" + "errors" "fmt" "io" "math/big" @@ -70,6 +71,7 @@ func createTx(blockData any, nonce [16]byte) ([]byte, error) { cfg := config.GetConfig() logger := logging.GetLogger() bursa := wallet.GetWallet() + store := storage.GetStorage() profileCfg := config.GetProfile() @@ -95,10 +97,13 @@ func createTx(blockData any, nonce [16]byte) ([]byte, error) { SetWalletAsChangeAddress() // Gather input UTxOs from our wallet - utxosBytes, err := storage.GetStorage().GetUtxos(bursa.PaymentAddress) + store.Lock() + utxosBytes, err := store.GetUtxos(bursa.PaymentAddress) if err != nil { + store.Unlock() return nil, err } + store.Unlock() var utxos []UTxO.UTxO var tunaPolicyId *Policy.PolicyId if profileCfg.UseTunaV1 { @@ -132,11 +137,13 @@ func createTx(blockData any, nonce [16]byte) ([]byte, error) { } // Gather UTxO(s) for script - scriptUtxosBytes, err := storage.GetStorage(). - GetUtxos(cfg.Indexer.ScriptAddress) + store.Lock() + scriptUtxosBytes, err := store.GetUtxos(cfg.Indexer.ScriptAddress) if err != nil { + store.Unlock() return nil, err } + store.Unlock() scriptUtxos := []UTxO.UTxO{} for _, utxoBytes := range scriptUtxosBytes { var utxo UTxO.UTxO @@ -153,6 +160,9 @@ func createTx(blockData any, nonce [16]byte) ([]byte, error) { len(scriptUtxos), ) } + if len(scriptUtxos) == 0 { + return nil, errors.New("no script UTxOs found") + } validatorOutRef := scriptUtxos[0] var blockDataRealTimeNow int64 @@ -238,7 +248,7 @@ func createTx(blockData any, nonce [16]byte) ([]byte, error) { trie := storage.GetStorage().Trie() trie.Lock() tmpHashKey := storage.HashValue(blockDataHash).Bytes() - if err := trie.Update(tmpHashKey, blockDataHash); err != nil { + if err := trie.Update(tmpHashKey, blockDataHash, 0); err != nil { trie.Unlock() return nil, err }