Skip to content

Commit

Permalink
Fix/reset height pending txs (#24)
Browse files Browse the repository at this point in the history
* delete pending txs & processed msgs

* send tx immediately even if node is syncing

* l1startheight starts from the given height

* search tx from the exact l1sequence
  • Loading branch information
sh-cha authored Sep 29, 2024
1 parent 1fc79c1 commit 4e8ac2c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 11 deletions.
12 changes: 12 additions & 0 deletions executor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ func ResetHeights(db types.DB) error {
if err := node.DeleteSyncInfo(db); err != nil {
return err
}
if err := node.DeletePendingTxs(db); err != nil {
return err
}
if err := node.DeleteProcessedMsgs(db); err != nil {
return err
}
fmt.Printf("reset height to 0 for node %s\n", string(db.GetPrefix()))
}
return nil
Expand All @@ -35,6 +41,12 @@ func ResetHeight(db types.DB, nodeName string) error {
if err != nil {
return err
}
if err := node.DeletePendingTxs(db); err != nil {
return err
}
if err := node.DeleteProcessedMsgs(db); err != nil {
return err
}
fmt.Printf("reset height to 0 for node %s\n", string(nodeDB.GetPrefix()))
return nil
}
18 changes: 10 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,13 @@ func (ex *Executor) getStartHeights(ctx context.Context, bridgeId uint64) (l1Sta
if err != nil {
return 0, 0, 0, 0, err
}
if l1Sequence > 1 {
depositTxHeight, err := ex.host.QueryDepositTxHeight(ctx, bridgeId, l1Sequence-1)
if err != nil {
return 0, 0, 0, 0, err
}
if l1StartHeight > depositTxHeight {
l1StartHeight = depositTxHeight
}

depositTxHeight, err := ex.host.QueryDepositTxHeight(ctx, bridgeId, l1Sequence)
if err != nil {
return 0, 0, 0, 0, err
}
if l1StartHeight > depositTxHeight {
l1StartHeight = depositTxHeight
}

if l2StartHeight == 0 {
Expand All @@ -252,5 +251,8 @@ func (ex *Executor) getStartHeights(ctx context.Context, bridgeId uint64) (l1Sta
if ex.cfg.BatchStartHeight > 0 {
batchStartHeight = ex.cfg.BatchStartHeight - 1
}
if l1StartHeight > 0 {
l1StartHeight--
}
return l1StartHeight, l2StartHeight, startOutputIndex, batchStartHeight, err
}
3 changes: 0 additions & 3 deletions executor/host/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) e
// collect more msgs if block height is not latest
blockHeight := args.Block.Header.Height
msgQueue := h.GetMsgQueue()
if blockHeight != args.LatestHeight && len(msgQueue) > 0 && len(msgQueue) <= 10 {
return nil
}

batchKVs := []types.RawKV{
h.Node().SyncInfoToRawKV(blockHeight),
Expand Down
21 changes: 21 additions & 0 deletions node/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
dbtypes "github.com/initia-labs/opinit-bots/db/types"
btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types"
nodetypes "github.com/initia-labs/opinit-bots/node/types"
"github.com/initia-labs/opinit-bots/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,3 +55,23 @@ func (n Node) DeleteSyncInfo() error {
func DeleteSyncInfo(db types.DB) error {
return db.Delete(nodetypes.LastProcessedBlockHeightKey)
}

func DeleteProcessedMsgs(db types.DB) error {
return db.PrefixedIterate(btypes.ProcessedMsgsKey, func(key, _ []byte) (stop bool, err error) {
err = db.Delete(key)
if err != nil {
return stop, err
}
return false, nil
})
}

func DeletePendingTxs(db types.DB) error {
return db.PrefixedIterate(btypes.PendingTxsKey, func(key, _ []byte) (stop bool, err error) {
err = db.Delete(key)
if err != nil {
return stop, err
}
return false, nil
})
}

0 comments on commit 4e8ac2c

Please sign in to comment.