Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: return proper response for tx jsonrpc endpoint #97

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

rpctypes "github.com/initia-labs/minievm/jsonrpc/types"
"github.com/initia-labs/minievm/x/evm/keeper"
"github.com/initia-labs/minievm/x/evm/types"
)

func (e *EVMIndexerImpl) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error {
Expand Down Expand Up @@ -111,7 +110,6 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
receipts = append(receipts, &receipt)
}

chainId := types.ConvertCosmosChainIDToEthereumChainID(sdkCtx.ChainID())
blockGasMeter := sdkCtx.BlockGasMeter()
blockHeight := sdkCtx.BlockHeight()

Expand Down Expand Up @@ -145,7 +143,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
receipt := receipts[idx]

// store tx
rpcTx := rpctypes.NewRPCTransaction(ethTx, blockHash, uint64(blockHeight), uint64(receipt.TransactionIndex), chainId)
rpcTx := rpctypes.NewRPCTransaction(ethTx, blockHash, uint64(blockHeight), uint64(receipt.TransactionIndex), ethTx.ChainId())
if err := e.TxMap.Set(sdkCtx, txHash.Bytes(), *rpcTx); err != nil {
e.logger.Error("failed to store rpcTx", "err", err)
return err
Expand Down
6 changes: 3 additions & 3 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type EVMIndexer interface {

// mempool
MempoolWrapper(mempool mempool.Mempool) mempool.Mempool
TxInMempool(hash common.Hash) bool
TxInMempool(hash common.Hash) *rpctypes.RPCTransaction
}

// EVMIndexerImpl implements EVMIndexer.
Expand Down Expand Up @@ -80,7 +80,7 @@ type EVMIndexerImpl struct {
pendingChans []chan *rpctypes.RPCTransaction

// txPendingMap is a map to store tx hashes in pending state.
txPendingMap *ttlcache.Cache[common.Hash, bool]
txPendingMap *ttlcache.Cache[common.Hash, *rpctypes.RPCTransaction]
}

func NewEVMIndexer(
Expand Down Expand Up @@ -128,7 +128,7 @@ func NewEVMIndexer(
// Use ttlcache to cope with abnormal cases like tx not included in a block
txPendingMap: ttlcache.New(
// pending tx lifetime is 1 minute in indexer
ttlcache.WithTTL[common.Hash, bool](time.Minute),
ttlcache.WithTTL[common.Hash, *rpctypes.RPCTransaction](time.Minute),
),
}

Expand Down
17 changes: 9 additions & 8 deletions indexer/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

rpctypes "github.com/initia-labs/minievm/jsonrpc/types"
evmkeeper "github.com/initia-labs/minievm/x/evm/keeper"
evmtypes "github.com/initia-labs/minievm/x/evm/types"
)

var _ mempool.Mempool = (*MempoolWrapper)(nil)
Expand All @@ -27,8 +26,13 @@ func (indexer *EVMIndexerImpl) MempoolWrapper(mempool mempool.Mempool) mempool.M
}

// TxInMempool returns true if the transaction with the given hash is in the mempool.
func (indexer *EVMIndexerImpl) TxInMempool(hash common.Hash) bool {
return indexer.txPendingMap.Has(hash)
func (indexer *EVMIndexerImpl) TxInMempool(hash common.Hash) *rpctypes.RPCTransaction {
item := indexer.txPendingMap.Get(hash)
if item == nil {
return nil
}

return item.Value()
}

// CountTx implements mempool.Mempool.
Expand All @@ -46,14 +50,11 @@ func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error {
}

if ethTx != nil {
sdkCtx := sdk.UnwrapSDKContext(ctx)
chainId := evmtypes.ConvertCosmosChainIDToEthereumChainID(sdkCtx.ChainID())
rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainId)

ethTxHash := ethTx.Hash()
rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId())

m.indexer.logger.Debug("inserting tx into mempool", "pending len", m.indexer.txPendingMap.Len(), "ethTxHash", ethTxHash)
m.indexer.txPendingMap.Set(ethTxHash, true, ttlcache.DefaultTTL)
m.indexer.txPendingMap.Set(ethTxHash, rpcTx, ttlcache.DefaultTTL)

go func() {
// emit the transaction to all pending channels
Expand Down
65 changes: 38 additions & 27 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
cacheKey := fmt.Sprintf("%s-%d", senderHex, txSeq)

txHash := tx.Hash()
b.queuedTxHashes.Store(txHash, true)
b.queuedTxHashes.Store(txHash, cacheKey)
_ = b.queuedTxs.Add(cacheKey, txQueueItem{hash: txHash, bytes: txBytes, body: tx, sender: senderHex})

// check if there are queued txs which can be sent
Expand Down Expand Up @@ -133,7 +133,12 @@ func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context,

// GetTransactionByHash returns the transaction with the given hash.
func (b *JSONRPCBackend) GetTransactionByHash(hash common.Hash) (*rpctypes.RPCTransaction, error) {
return b.getTransaction(hash)
rpcTx, err := b.getTransaction(hash)
if rpcTx == nil {
return nil, err
}

return rpcTx, nil
}

// GetTransactionCount returns the number of transactions at the given block number.
Expand Down Expand Up @@ -181,21 +186,23 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa

// GetTransactionReceipt returns the transaction receipt for the given transaction hash.
func (b *JSONRPCBackend) GetTransactionReceipt(hash common.Hash) (map[string]interface{}, error) {
receipt, err := b.getReceipt(hash)
if err != nil {
return nil, err
} else if receipt == nil {
return nil, nil
rpcTx, err := b.getTransaction(hash)
if rpcTx == nil && err == nil {
return nil, nil // tx is not found
} else if rpcTx != nil && err != nil {
return nil, NewTxIndexingError() // tx is not fully indexed
} else if err != nil {
return nil, err // just error case
}

tx, err := b.getTransaction(hash)
receipt, err := b.getReceipt(hash)
if err != nil {
return nil, err
} else if tx == nil {
} else if receipt == nil {
return nil, nil
}

return marshalReceipt(receipt, tx), nil
return marshalReceipt(receipt, rpcTx), nil
}

// GetTransactionByBlockHashAndIndex returns the transaction at the given block hash and index.
Expand Down Expand Up @@ -270,13 +277,8 @@ func (b *JSONRPCBackend) GetBlockTransactionCountByNumber(blockNum rpc.BlockNumb
// GetRawTransactionByHash returns the bytes of the transaction for the given hash.
func (b *JSONRPCBackend) GetRawTransactionByHash(hash common.Hash) (hexutil.Bytes, error) {
rpcTx, err := b.getTransaction(hash)
if err != nil && errors.Is(err, collections.ErrNotFound) {
return nil, nil
} else if err != nil {
b.logger.Error("failed to get raw transaction by hash", "err", err)
return nil, NewTxIndexingError()
} else if rpcTx == nil {
return nil, nil
if rpcTx == nil {
return nil, err
}

return rpcTx.ToTransaction().MarshalBinary()
Expand All @@ -295,11 +297,6 @@ func (b *JSONRPCBackend) GetRawTransactionByBlockHashAndIndex(blockHash common.H
}

func (b *JSONRPCBackend) PendingTransactions() ([]*rpctypes.RPCTransaction, error) {
chainID, err := b.ChainID()
if err != nil {
return nil, err
}

queryCtx, err := b.getQueryCtx()
if err != nil {
return nil, err
Expand Down Expand Up @@ -330,7 +327,7 @@ func (b *JSONRPCBackend) PendingTransactions() ([]*rpctypes.RPCTransaction, erro
if ethTx != nil {
result = append(
result,
rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID),
rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId()),
)
}
}
Expand Down Expand Up @@ -369,18 +366,32 @@ func (b *JSONRPCBackend) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc
return result, nil
}

// getTransaction retrieves the lookup along with the transaction itself associate
// with the given transaction hash.
//
// An error will be returned if the transaction is not found, and background
// indexing for transactions is still in progress. The error is used to indicate the
// scenario explicitly that the transaction might be reachable shortly.
//
// A null will be returned in the transaction is not found and background transaction
// indexing is already finished. The transaction is not existent from the perspective
// of node.
func (b *JSONRPCBackend) getTransaction(hash common.Hash) (*rpctypes.RPCTransaction, error) {
if tx, ok := b.txLookupCache.Get(hash); ok {
return tx, nil
}

// check if the transaction is in the queued txs
if _, ok := b.queuedTxHashes.Load(hash); ok {
return nil, NewTxIndexingError()
if cacheKey, ok := b.queuedTxHashes.Load(hash); ok {
if cacheItem, ok := b.queuedTxs.Get(cacheKey.(string)); ok {
rpcTx := rpctypes.NewRPCTransaction(cacheItem.body, common.Hash{}, 0, 0, cacheItem.body.ChainId())
return rpcTx, NewTxIndexingError()
}
}

// check if the transaction is in the pending txs
if ok := b.app.EVMIndexer().TxInMempool(hash); ok {
return nil, NewTxIndexingError()
if tx := b.app.EVMIndexer().TxInMempool(hash); tx != nil {
return tx, NewTxIndexingError()
}

queryCtx, err := b.getQueryCtx()
Expand Down
9 changes: 2 additions & 7 deletions jsonrpc/backend/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct
return nil, err
}

chainID, err := b.ChainID()
if err != nil {
return nil, err
}

txUtils := keeper.NewTxUtils(b.app.EVMKeeper)
for _, tx := range pending.Txs {
cosmosTx, err := b.app.TxDecode(tx)
Expand All @@ -54,7 +49,7 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct
content["pending"][account.Hex()] = dump
}

dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID)
dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId())
}

// load queued txs
Expand All @@ -72,7 +67,7 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct
content["queued"][sender] = dump
}

dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID)
dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId())
}

return content, nil
Expand Down
Loading