Skip to content

Commit

Permalink
Merge branch 'main' into add-eth-fee-history-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
sideninja authored Jun 10, 2024
2 parents 11887ae + ff54643 commit 0244943
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 33 deletions.
19 changes: 17 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ import (

const maxFeeHistoryBlockCount = 1024

func SupportedAPIs(blockChainAPI *BlockChainAPI, streamAPI *StreamAPI, pullAPI *PullAPI) []rpc.API {
return []rpc.API{{
func SupportedAPIs(
blockChainAPI *BlockChainAPI,
streamAPI *StreamAPI,
pullAPI *PullAPI,
debugAPI *DebugAPI,
) []rpc.API {
apis := []rpc.API{{
Namespace: "eth",
Service: blockChainAPI,
}, {
Expand All @@ -48,6 +53,16 @@ func SupportedAPIs(blockChainAPI *BlockChainAPI, streamAPI *StreamAPI, pullAPI *
Namespace: "txpool",
Service: NewTxPoolAPI(),
}}

// optional debug api
if debugAPI != nil {
apis = append(apis, rpc.API{
Namespace: "debug",
Service: debugAPI,
})
}

return apis
}

type BlockChainAPI struct {
Expand Down
83 changes: 83 additions & 0 deletions api/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package api

import (
"context"

"github.com/goccy/go-json"
gethCommon "github.com/onflow/go-ethereum/common"
"github.com/onflow/go-ethereum/eth/tracers"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/storage"
)

type DebugAPI struct {
logger zerolog.Logger
tracer storage.TraceIndexer
blocks storage.BlockIndexer
}

func NewDebugAPI(tracer storage.TraceIndexer, blocks storage.BlockIndexer, logger zerolog.Logger) *DebugAPI {
return &DebugAPI{
logger: logger,
tracer: tracer,
blocks: blocks,
}
}

// TraceTransaction will return a debug execution trace of a transaction if it exists,
// currently we only support CALL traces, so the config is ignored.
func (d *DebugAPI) TraceTransaction(
ctx context.Context,
hash gethCommon.Hash,
_ *tracers.TraceConfig,
) (json.RawMessage, error) {
res, err := d.tracer.GetTransaction(hash)
if err != nil {
return handleError[json.RawMessage](d.logger, err)
}
return res, nil
}

func (d *DebugAPI) TraceBlockByNumber(
ctx context.Context,
number rpc.BlockNumber,
_ *tracers.TraceConfig,
) ([]json.RawMessage, error) {
block, err := d.blocks.GetByHeight(uint64(number.Int64()))
if err != nil {
return handleError[[]json.RawMessage](d.logger, err)
}

results := make([]json.RawMessage, len(block.TransactionHashes))
for i, h := range block.TransactionHashes {
results[i], err = d.TraceTransaction(ctx, h, nil)
if err != nil {
return nil, err
}
}

return results, nil
}

func (d *DebugAPI) TraceBlockByHash(
ctx context.Context,
hash gethCommon.Hash,
_ *tracers.TraceConfig,
) ([]json.RawMessage, error) {
block, err := d.blocks.GetByID(hash)
if err != nil {
return handleError[[]json.RawMessage](d.logger, err)
}

results := make([]json.RawMessage, len(block.TransactionHashes))
for i, h := range block.TransactionHashes {
results[i], err = d.TraceTransaction(ctx, h, nil)
if err != nil {
return nil, err
}
}

return results, nil
}
9 changes: 8 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func Start(ctx context.Context, cfg *config.Config) error {
transactions,
receipts,
accounts,
trace,
blocksBroadcaster,
transactionsBroadcaster,
logsBroadcaster,
Expand Down Expand Up @@ -247,6 +248,7 @@ func startServer(
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
accounts storage.AccountIndexer,
trace storage.TraceIndexer,
blocksBroadcaster *broadcast.Broadcaster,
transactionsBroadcaster *broadcast.Broadcaster,
logsBroadcaster *broadcast.Broadcaster,
Expand Down Expand Up @@ -331,7 +333,12 @@ func startServer(
ratelimiter,
)

supportedAPIs := api.SupportedAPIs(blockchainAPI, streamAPI, pullAPI)
var debugAPI *api.DebugAPI
if cfg.TracesEnabled {
debugAPI = api.NewDebugAPI(trace, blocks, logger)
}

supportedAPIs := api.SupportedAPIs(blockchainAPI, streamAPI, pullAPI, debugAPI)

if err := srv.EnableRPC(supportedAPIs); err != nil {
return err
Expand Down
65 changes: 35 additions & 30 deletions tests/web3js/eth_streaming_test.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
const conf = require('./config')
const helpers = require('./helpers')
const { assert } = require('chai')
const {Web3} = require("web3");
const { Web3 } = require("web3");

const timeout = 30 // test timeout seconds

it('streaming of logs using filters', async() => {
setTimeout(() => process.exit(1), (timeout-1)*1000) // hack if the ws connection is not closed
it('streaming of blocks, transactions, logs using filters', async () => {
setTimeout(() => process.exit(1), (timeout - 1) * 1000) // hack if the ws connection is not closed

let deployed = await helpers.deployContract("storage")
let contractAddress = deployed.receipt.contractAddress
Expand All @@ -26,56 +26,63 @@ it('streaming of logs using filters', async() => {
await new Promise((res, rej) => setTimeout(() => res(), 1000))

// subscribe to new blocks being produced by bellow transaction submission
let blockCount = 0
let blockHashes = []
let blockTxHashes = []
let subBlocks = await ws.eth.subscribe('newBlockHeaders')
let doneBlocks = new Promise(async (res, rej) => {
let subBlocks = await ws.eth.subscribe('newBlockHeaders')
subBlocks.on("error", err => {
rej(err)
})

subBlocks.on('data', async block => {
blockHashes.push(block.transactions[0]) // add received tx hash
blockTxHashes.push(block.transactions[0]) // add received tx hash

blockCount++
if (blockCount === testValues.length) {
await subBlocks.unsubscribe()
if (blockTxHashes.length === testValues.length) {
subBlocks.unsubscribe()
res()
}
})
subBlocks.on("error", console.log)
})

// subscribe to all new transaction events being produced by transaction submission bellow
let txCount = 0
let txHashes = []
let subTx = await ws.eth.subscribe('pendingTransactions')
let doneTxs = new Promise(async (res, rej) => {
let subTx = await ws.eth.subscribe('pendingTransactions')
subTx.on("error", err => {
rej(err)
})

subTx.on('data', async tx => {
txHashes.push(tx) // add received tx hash
txCount++
if (txCount === testValues.length) {
await subTx.unsubscribe()

if (txHashes.length === testValues.length) {
subTx.unsubscribe()
res()
}
})
})

let logCount = 0
let logHashes = []
let logTxHashes = []
let subLog = await ws.eth.subscribe('logs', {
address: contractAddress,
})
// subscribe to events being emitted by a deployed contract and bellow transaction interactions
let doneAddressLogs = new Promise(async (res, rej) => {
let subLog = await ws.eth.subscribe('logs', {
address: contractAddress,
subLog.on("error", err => {
rej(err)
})

subLog.on('data', async (data) => {
logHashes.push(data.transactionHash)
logCount++
if (logCount === testValues.length) {
await subLog.unsubscribe()
logTxHashes.push(data.transactionHash)

if (logTxHashes.length === testValues.length) {
subLog.unsubscribe()
res()
}
})
})

// wait for subscription for a bit
await new Promise((res, rej) => setTimeout(() => res(), 300))
await new Promise((res, rej) => setTimeout(() => res(), 1000))

let sentHashes = []
// produce events by submitting transactions
Expand All @@ -96,11 +103,9 @@ it('streaming of logs using filters', async() => {

// check that transaction hashes we received when submitting transactions above
// match array of transaction hashes received from events for blocks and txs
assert.deepEqual(blockHashes, sentHashes)
assert.deepEqual(blockTxHashes, sentHashes)
assert.deepEqual(txHashes, sentHashes)
assert.deepEqual(logHashes, sentHashes)

await ws.eth.clearSubscriptions()
assert.deepEqual(logTxHashes, sentHashes)

process.exit(0)
}).timeout(timeout*1500)
}).timeout(timeout * 1500)

0 comments on commit 0244943

Please sign in to comment.