Skip to content

Commit

Permalink
add functions to decode and batch fetching log (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 authored Dec 18, 2024
1 parent 64ffe4e commit fc13d96
Show file tree
Hide file tree
Showing 4 changed files with 1,038 additions and 196 deletions.
64 changes: 42 additions & 22 deletions v2/internal/worker/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/state"
ethereumTypes "github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
)

const maxQueryBlockRange = 10000

type BackFiller struct {
mu sync.Mutex
handler *handler.TradeLogHandler
Expand Down Expand Up @@ -215,17 +218,15 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) {
return
}

blocks, exclusions, err := w.getBlockByExchange(from, to, task.Exchange)
if err != nil {
w.l.Errorw("cannot get block", "task", task, "err", err)
err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed)
if err != nil {
w.l.Errorw("cannot update task status", "task", task, "err", err)
}
return
// error can occur while filtering blocks, we handle partial result and
// mark the status as `failed` after processing received blocks.
// else if there are no error, mark the status as `done` after processing all blocks
blocks, exclusions, getBlockErr := w.getBlockByExchange(from, to, task.Exchange)
if getBlockErr != nil {
w.l.Errorw("error when get block by exchange", "task", task, "err", getBlockErr)
}

w.l.Infow("start to backfill blocks", "task_id", task.ID, "blocks", blocks)
w.l.Infow("start to backfill blocks", "task_id", task.ID, "num_block", len(blocks))

// backfill from the newest blocks, if error occurs we can continue backfill from error block
for _, b := range blocks {
Expand Down Expand Up @@ -258,6 +259,16 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) {
}
}

// mark the status = `failed`
if getBlockErr != nil {
err = w.backfillStorage.UpdateTask(task.ID, nil, backfill.StatusTypeFailed)
if err != nil {
w.l.Errorw("cannot update task status", "task", task, "err", err)
}
return
}

// else mark the status = `done`
err = w.backfillStorage.UpdateTask(task.ID, &task.FromBlock, backfill.StatusTypeDone)
if err != nil {
w.l.Errorw("cannot update task status", "task_id", task.ID, "err", err)
Expand All @@ -267,9 +278,12 @@ func (w *BackFiller) BackfillByExchange(task backfill.Task) {
// getBlockByExchange get the blocks having logs of specific exchange, the block number sorted descending
func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uint64, sets.Set[string], error) {
var (
address string
topics []string
exclusions = sets.New[string]()
address string
topics []string
exclusions = sets.New[string]()
blocksNumber = sets.New[uint64]()
logs []ethereumTypes.Log
err error
)
// get exchange address and topics to filter logs
for _, p := range w.parsers {
Expand All @@ -281,16 +295,22 @@ func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uin
exclusions.Insert(p.Exchange())
}

// get logs
logs, err := w.rpc.FetchLogs(context.Background(), from, to, address, topics)
if err != nil {
return nil, nil, err
}
for currentTo := to; currentTo >= from; {
currentFrom := max(currentTo-maxQueryBlockRange, from)

// get logs
logs, err = w.rpc.FetchLogs(context.Background(), currentFrom, currentTo, address, topics)
if err != nil {
break
}

// get blocks need to backfill
for _, l := range logs {
blocksNumber.Insert(l.BlockNumber)
}

// get blocks need to backfill
blocksNumber := sets.New[uint64]()
for _, l := range logs {
blocksNumber.Insert(l.BlockNumber)
// update new block range
currentTo = currentFrom - 1
}

// sort the blocks descending
Expand All @@ -299,5 +319,5 @@ func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uin
return blocks[i] > blocks[j]
})

return blocks, exclusions, nil
return blocks, exclusions, err
}
155 changes: 1 addition & 154 deletions v2/pkg/parser/kyberswap/abi.json
Original file line number Diff line number Diff line change
@@ -1,154 +1 @@
[
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "address",
"name": "sender",
"type": "address"
},
{
"indexed": false,
"internalType": "contract IERC20",
"name": "srcToken",
"type": "address"
},
{
"indexed": false,
"internalType": "contract IERC20",
"name": "dstToken",
"type": "address"
},
{
"indexed": false,
"internalType": "address",
"name": "dstReceiver",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "spentAmount",
"type": "uint256"
},
{
"indexed": false,
"internalType": "uint256",
"name": "returnAmount",
"type": "uint256"
}
],
"name": "Swapped",
"type": "event"
},
{
"inputs": [
{
"components": [
{
"internalType": "address",
"name": "callTarget",
"type": "address"
},
{
"internalType": "address",
"name": "approveTarget",
"type": "address"
},
{
"internalType": "bytes",
"name": "targetData",
"type": "bytes"
},
{
"components": [
{
"internalType": "contract IERC20",
"name": "srcToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "dstToken",
"type": "address"
},
{
"internalType": "address[]",
"name": "srcReceivers",
"type": "address[]"
},
{
"internalType": "uint256[]",
"name": "srcAmounts",
"type": "uint256[]"
},
{
"internalType": "address[]",
"name": "feeReceivers",
"type": "address[]"
},
{
"internalType": "uint256[]",
"name": "feeAmounts",
"type": "uint256[]"
},
{
"internalType": "address",
"name": "dstReceiver",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "minReturnAmount",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "flags",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "permit",
"type": "bytes"
}
],
"internalType": "struct MetaAggregationRouterV2.SwapDescriptionV2",
"name": "desc",
"type": "tuple"
},
{
"internalType": "bytes",
"name": "clientData",
"type": "bytes"
}
],
"internalType": "struct MetaAggregationRouterV2.SwapExecutionParams",
"name": "execution",
"type": "tuple"
}
],
"name": "swap",
"outputs": [
{
"internalType": "uint256",
"name": "returnAmount",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "gasUsed",
"type": "uint256"
}
],
"stateMutability": "payable",
"type": "function"
}
]

[{"inputs":[{"internalType":"address","name":"_WETH","type":"address"}],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"bytes","name":"clientData","type":"bytes"}],"name":"ClientData","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"string","name":"reason","type":"string"}],"name":"Error","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"pair","type":"address"},{"indexed":false,"internalType":"uint256","name":"amountOut","type":"uint256"},{"indexed":false,"internalType":"address","name":"output","type":"address"}],"name":"Exchange","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"token","type":"address"},{"indexed":false,"internalType":"uint256","name":"totalAmount","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"totalFee","type":"uint256"},{"indexed":false,"internalType":"address[]","name":"recipients","type":"address[]"},{"indexed":false,"internalType":"uint256[]","name":"amounts","type":"uint256[]"},{"indexed":false,"internalType":"bool","name":"isBps","type":"bool"}],"name":"Fee","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"internalType":"address","name":"previousOwner","type":"address"},{"indexed":true,"internalType":"address","name":"newOwner","type":"address"}],"name":"OwnershipTransferred","type":"event"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"address","name":"sender","type":"address"},{"indexed":false,"internalType":"contract IERC20","name":"srcToken","type":"address"},{"indexed":false,"internalType":"contract IERC20","name":"dstToken","type":"address"},{"indexed":false,"internalType":"address","name":"dstReceiver","type":"address"},{"indexed":false,"internalType":"uint256","name":"spentAmount","type":"uint256"},{"indexed":false,"internalType":"uint256","name":"returnAmount","type":"uint256"}],"name":"Swapped","type":"event"},{"inputs":[],"name":"WETH","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"address","name":"","type":"address"}],"name":"isWhitelist","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"owner","outputs":[{"internalType":"address","name":"","type":"address"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"renounceOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"token","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"}],"name":"rescueFunds","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"components":[{"internalType":"address","name":"callTarget","type":"address"},{"internalType":"address","name":"approveTarget","type":"address"},{"internalType":"bytes","name":"targetData","type":"bytes"},{"components":[{"internalType":"contract IERC20","name":"srcToken","type":"address"},{"internalType":"contract IERC20","name":"dstToken","type":"address"},{"internalType":"address[]","name":"srcReceivers","type":"address[]"},{"internalType":"uint256[]","name":"srcAmounts","type":"uint256[]"},{"internalType":"address[]","name":"feeReceivers","type":"address[]"},{"internalType":"uint256[]","name":"feeAmounts","type":"uint256[]"},{"internalType":"address","name":"dstReceiver","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"},{"internalType":"uint256","name":"minReturnAmount","type":"uint256"},{"internalType":"uint256","name":"flags","type":"uint256"},{"internalType":"bytes","name":"permit","type":"bytes"}],"internalType":"struct MetaAggregationRouterV2.SwapDescriptionV2","name":"desc","type":"tuple"},{"internalType":"bytes","name":"clientData","type":"bytes"}],"internalType":"struct MetaAggregationRouterV2.SwapExecutionParams","name":"execution","type":"tuple"}],"name":"swap","outputs":[{"internalType":"uint256","name":"returnAmount","type":"uint256"},{"internalType":"uint256","name":"gasUsed","type":"uint256"}],"stateMutability":"payable","type":"function"},{"inputs":[{"components":[{"internalType":"address","name":"callTarget","type":"address"},{"internalType":"address","name":"approveTarget","type":"address"},{"internalType":"bytes","name":"targetData","type":"bytes"},{"components":[{"internalType":"contract IERC20","name":"srcToken","type":"address"},{"internalType":"contract IERC20","name":"dstToken","type":"address"},{"internalType":"address[]","name":"srcReceivers","type":"address[]"},{"internalType":"uint256[]","name":"srcAmounts","type":"uint256[]"},{"internalType":"address[]","name":"feeReceivers","type":"address[]"},{"internalType":"uint256[]","name":"feeAmounts","type":"uint256[]"},{"internalType":"address","name":"dstReceiver","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"},{"internalType":"uint256","name":"minReturnAmount","type":"uint256"},{"internalType":"uint256","name":"flags","type":"uint256"},{"internalType":"bytes","name":"permit","type":"bytes"}],"internalType":"struct MetaAggregationRouterV2.SwapDescriptionV2","name":"desc","type":"tuple"},{"internalType":"bytes","name":"clientData","type":"bytes"}],"internalType":"struct MetaAggregationRouterV2.SwapExecutionParams","name":"execution","type":"tuple"}],"name":"swapGeneric","outputs":[{"internalType":"uint256","name":"returnAmount","type":"uint256"},{"internalType":"uint256","name":"gasUsed","type":"uint256"}],"stateMutability":"payable","type":"function"},{"inputs":[{"internalType":"contract IAggregationExecutor","name":"caller","type":"address"},{"components":[{"internalType":"contract IERC20","name":"srcToken","type":"address"},{"internalType":"contract IERC20","name":"dstToken","type":"address"},{"internalType":"address[]","name":"srcReceivers","type":"address[]"},{"internalType":"uint256[]","name":"srcAmounts","type":"uint256[]"},{"internalType":"address[]","name":"feeReceivers","type":"address[]"},{"internalType":"uint256[]","name":"feeAmounts","type":"uint256[]"},{"internalType":"address","name":"dstReceiver","type":"address"},{"internalType":"uint256","name":"amount","type":"uint256"},{"internalType":"uint256","name":"minReturnAmount","type":"uint256"},{"internalType":"uint256","name":"flags","type":"uint256"},{"internalType":"bytes","name":"permit","type":"bytes"}],"internalType":"struct MetaAggregationRouterV2.SwapDescriptionV2","name":"desc","type":"tuple"},{"internalType":"bytes","name":"executorData","type":"bytes"},{"internalType":"bytes","name":"clientData","type":"bytes"}],"name":"swapSimpleMode","outputs":[{"internalType":"uint256","name":"returnAmount","type":"uint256"},{"internalType":"uint256","name":"gasUsed","type":"uint256"}],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address","name":"newOwner","type":"address"}],"name":"transferOwnership","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"address[]","name":"addr","type":"address[]"},{"internalType":"bool[]","name":"value","type":"bool[]"}],"name":"updateWhitelist","outputs":[],"stateMutability":"nonpayable","type":"function"},{"stateMutability":"payable","type":"receive"}]
Loading

0 comments on commit fc13d96

Please sign in to comment.