diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a9f0c9a..fa96179 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,6 +26,7 @@ jobs: image_name: "asia.gcr.io/kyber-operation/foundation/trading/${{ env.SERVICE }}" image_tag: ${{ steps.get_tag.outputs.image_tag }} branch_tag: ${{ steps.get_tag.outputs.branch_tag }} + dockerfile_path: ${{ steps.determine_dockerfile.outputs.dockerfile }} steps: - name: Checkout uses: actions/checkout@v3 @@ -66,6 +67,18 @@ jobs: branch_tag="$(echo "$CURRENT_BRANCH" | sed 's/[^a-zA-Z0-9]/-/g' | sed 's/--*/-/g' | sed 's/-$//g')" echo "::set-output name=image_tag::$branch_tag-$short_sha" echo "::set-output name=branch_tag::$branch_tag-$short_sha" + + - name: Determine Dockerfile + id: determine_dockerfile + run: | + if [[ "${{ steps.current_branch.outputs.value }}" == v1* ]]; then + echo "::set-output name=dockerfile::Dockerfile" + elif [[ "${{ steps.current_branch.outputs.value }}" == v2* ]]; then + echo "::set-output name=dockerfile::Dockerfile-v2" + else + echo "::set-output name=dockerfile::Dockerfile" # Default to Dockerfile if not v1 or v2 + fi + lint: name: Run golangci-lint runs-on: [ubuntu-22.04] @@ -182,6 +195,7 @@ jobs: uses: docker/build-push-action@v3 with: context: . + file: ${{ needs.prepare.outputs.dockerfile_path }} # Use the corresponding Dockerfile version push: true labels: | commit=${{ github.sha }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8ad41c9..ef17689 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,6 +16,7 @@ jobs: outputs: version_tag: ${{ steps.version_tag.outputs.value }} build_date: ${{ steps.build_date.outputs.value }} + dockerfile_path: ${{ steps.determine_dockerfile.outputs.dockerfile }} steps: - name: Format version tag shell: bash @@ -30,6 +31,17 @@ jobs: id: build_date run: echo "::set-output name=value::$(date +%FT%T%z)" + - name: Determine Dockerfile + id: determine_dockerfile + run: | + if [[ "${{ github.event.inputs.version }}" == v1* ]]; then + echo "::set-output name=dockerfile::Dockerfile" + elif [[ "${{ github.event.inputs.version }}" == v2* ]]; then + echo "::set-output name=dockerfile::Dockerfile-v2" + else + echo "::set-output name=dockerfile::Dockerfile" # Default to Dockerfile if not v1 or v2 + fi + docker: needs: - prepare @@ -66,6 +78,7 @@ jobs: uses: docker/build-push-action@v3 with: context: . + file: ${{ needs.prepare.outputs.dockerfile_path }} # Use the corresponding Dockerfile version push: true build-args: | VERSION=${{ env.VERSION_TAG }} diff --git a/Dockerfile b/Dockerfile index 76151dd..d87e651 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,9 +7,6 @@ COPY . . RUN go build -o app ./cmd/tradelogs -RUN go build -o parse_log ./v2/cmd/parse_log - - ## DEPLOY FROM debian:bullseye @@ -17,16 +14,10 @@ RUN apt-get update && \ apt install -y ca-certificates && \ rm -rf /var/lib/apt/lists/* -### tradelogs v1 WORKDIR /cmd COPY --from=builder /src/app /cmd/app COPY cmd/tradelogs/migrations migrations -### tradelogs v2 -WORKDIR /parse_log -COPY --from=builder /src/parse_log /parse_log/parse_log -COPY v2/cmd/parse_log/migrations /parse_log/migrations - CMD /cmd/app diff --git a/Dockerfile-v2 b/Dockerfile-v2 new file mode 100644 index 0000000..539a164 --- /dev/null +++ b/Dockerfile-v2 @@ -0,0 +1,27 @@ +## BUILDER +FROM golang:1.22-bullseye as builder + +WORKDIR /src + +COPY . . + +RUN go build -o parse_log ./v2/cmd/parse_log +RUN go build -o backfill ./v2/cmd/backfill + + +## DEPLOY +FROM debian:bullseye + +RUN apt-get update && \ + apt install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + + +WORKDIR /v2 + +COPY --from=builder /src/parse_log /v2/parse_log +COPY --from=builder /src/backfill /v2/backfill + +COPY v2/cmd/migrations /v2/migrations + +CMD /v2/parse_log diff --git a/v2/cmd/backfill/main.go b/v2/cmd/backfill/main.go new file mode 100644 index 0000000..14e19e1 --- /dev/null +++ b/v2/cmd/backfill/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/KyberNetwork/tradelogs/v2/internal/server" + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app" + "github.com/KyberNetwork/tradelogs/v2/pkg/handler" + "github.com/KyberNetwork/tradelogs/v2/pkg/kafka" + "github.com/KyberNetwork/tradelogs/v2/pkg/parser" + "github.com/KyberNetwork/tradelogs/v2/pkg/parser/zxotc" + "github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/state" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs" + storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" + zxotcStorage "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/zxotc" + "github.com/KyberNetwork/tradinglib/pkg/dbutil" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/jmoiron/sqlx" + "github.com/urfave/cli" + "go.uber.org/zap" +) + +func main() { + app := libapp.NewApp() + app.Name = "trade log backfill service" + app.Action = run + + app.Flags = append(app.Flags, libapp.RPCNodeFlags()...) + app.Flags = append(app.Flags, libapp.PostgresSQLFlags("tradelogs_v2")...) + app.Flags = append(app.Flags, libapp.KafkaFlag()...) + app.Flags = append(app.Flags, libapp.HTTPServerFlags()...) + + if err := app.Run(os.Args); err != nil { + log.Panic(err) + } +} + +func run(c *cli.Context) error { + logger, _, flush, err := libapp.NewLogger(c) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + defer flush() + + zap.ReplaceGlobals(logger) + l := logger.Sugar() + l.Infow("Starting backfill service") + + db, err := initDB(c) + if err != nil { + return fmt.Errorf("cannot init DB: %w", err) + } + + // trade log manager + storages := []storageTypes.Storage{ + zxotcStorage.New(l, db), + } + manager := tradelogs.NewManager(l, storages) + + // backfill storage + backfillStorage := backfill.New(l, db) + + // state storage + stateStorage := state.New(l, db) + + // rpc node to query trace call + rpcURL := c.StringSlice(libapp.RPCUrlFlagName) + if len(rpcURL) == 0 { + return fmt.Errorf("rpc url is empty") + } + + ethClients := make([]*ethclient.Client, len(rpcURL)) + for i, url := range rpcURL { + client, err := ethclient.Dial(url) + if err != nil { + return fmt.Errorf("cannot dial eth client: %w", err) + } + ethClients[i] = client + } + rpcNode := rpcnode.NewClient(l, ethClients...) + + parsers := []parser.Parser{ + //kyberswap.MustNewParser(), + zxotc.MustNewParser(), + //paraswap.MustNewParser(), + //kyberswaprfq.MustNewParser(), + //hashflowv3.MustNewParser(), + //oneinchv6.MustNewParser(traceCalls), + //uniswapxv1.MustNewParser(traceCalls), + //uniswapx.MustNewParser(traceCalls), + //bebop.MustNewParser(traceCalls), + //zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)), + } + + // kafka broadcast topic + broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name) + err = kafka.ValidateTopicName(broadcastTopic) + if err != nil { + return fmt.Errorf("invalid kafka topic: %w", err) + } + + // kafka publisher for broadcasting trade logs + kafkaPublisher, err := kafka.NewPublisher(libapp.KafkaConfigFromFlags(c)) + if err != nil { + return fmt.Errorf("cannot create kafka publisher: %w", err) + } + + // trade log handler + tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher) + + // parse log worker + w := worker.NewBackFiller(tradeLogHandler, backfillStorage, stateStorage, l, rpcNode, parsers) + + go func() { + if err = w.Run(); err != nil { + panic(err) + } + }() + + s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), w) + + return s.Run() +} + +func initDB(c *cli.Context) (*sqlx.DB, error) { + db, err := libapp.NewDB(map[string]interface{}{ + "host": c.String(libapp.PostgresHost.Name), + "port": c.Int(libapp.PostgresPort.Name), + "user": c.String(libapp.PostgresUser.Name), + "password": c.String(libapp.PostgresPassword.Name), + "dbname": c.String(libapp.PostgresDatabase.Name), + "sslmode": "disable", + }) + if err != nil { + return nil, err + } + + _, err = dbutil.RunMigrationUp(db.DB, c.String(libapp.PostgresMigrationPath.Name), + c.String(libapp.PostgresDatabase.Name)) + if err != nil { + return nil, err + } + return db, nil +} diff --git a/v2/cmd/parse_log/migrations/00001_init.up.sql b/v2/cmd/migrations/00001_init.up.sql similarity index 100% rename from v2/cmd/parse_log/migrations/00001_init.up.sql rename to v2/cmd/migrations/00001_init.up.sql diff --git a/v2/cmd/migrations/00002_add_backfill.up.sql b/v2/cmd/migrations/00002_add_backfill.up.sql new file mode 100644 index 0000000..4c1f19e --- /dev/null +++ b/v2/cmd/migrations/00002_add_backfill.up.sql @@ -0,0 +1,8 @@ +create table backfill ( + exchange text not null primary key, + deploy_block bigint not null, + backfilled_block bigint not null default 0 +); + +insert into backfill (exchange, deploy_block) +values ('zerox', 20926953) \ No newline at end of file diff --git a/v2/cmd/parse_log/main.go b/v2/cmd/parse_log/main.go index b3f3406..db8c969 100644 --- a/v2/cmd/parse_log/main.go +++ b/v2/cmd/parse_log/main.go @@ -35,7 +35,6 @@ func main() { app.Flags = append(app.Flags, libapp.EvmListenerFlags()...) app.Flags = append(app.Flags, libapp.RPCNodeFlags()...) app.Flags = append(app.Flags, libapp.KafkaFlag()...) - //app.Flags = append(app.Flags, pricefiller.PriceFillerFlags()...) if err := app.Run(os.Args); err != nil { log.Panic(err) @@ -109,10 +108,10 @@ func run(c *cli.Context) error { } // trade log handler - tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher, s) + tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher) // parse log worker - w := worker.NewParseLog(tradeLogHandler) + w := worker.NewParseLog(tradeLogHandler, s, l) mostRecentBlock, err := getMostRecentBlock(l, s, rpcNode) if err != nil { diff --git a/v2/internal/server/backfill.go b/v2/internal/server/backfill.go new file mode 100644 index 0000000..26aa368 --- /dev/null +++ b/v2/internal/server/backfill.go @@ -0,0 +1,94 @@ +package server + +import ( + "fmt" + "net/http" + + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "github.com/rs/xid" + "go.uber.org/zap" +) + +// BackfillServer to serve the service. +type BackfillServer struct { + l *zap.SugaredLogger + r *gin.Engine + bindAddr string + worker *worker.BackFiller +} + +type Query struct { + FromBlock uint64 `json:"from_block" binding:"required"` + ToBlock uint64 `json:"to_block" binding:"required"` + Exchange string `json:"exchange" binding:"required"` +} + +func NewBackfill(l *zap.SugaredLogger, bindAddr string, w *worker.BackFiller) *BackfillServer { + engine := gin.New() + engine.Use(gin.Recovery()) + + server := &BackfillServer{ + l: l, + r: engine, + bindAddr: bindAddr, + worker: w, + } + + gin.SetMode(gin.ReleaseMode) + server.register() + + return server +} + +func (s *BackfillServer) Run() error { + if err := s.r.Run(s.bindAddr); err != nil { + return fmt.Errorf("run server: %w", err) + } + + return nil +} + +func (s *BackfillServer) register() { + pprof.Register(s.r, "/debug") + s.r.POST("/backfill", s.backfill) +} + +func responseErr(c *gin.Context, err error) { + c.JSON(http.StatusBadRequest, gin.H{ + "success": false, + "error": err.Error(), + }) +} + +func responseOK(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{ + "success": true, + }) +} + +func (s *BackfillServer) backfill(c *gin.Context) { + var params Query + if err := c.BindJSON(¶ms); err != nil { + responseErr(c, err) + return + } + + if params.FromBlock > params.ToBlock { + responseErr(c, fmt.Errorf("from block is greater than to block")) + return + } + + l := s.l.With("reqID", xid.New().String()) + l.Infow("receive backfill params", "params", params) + + err := s.worker.BackfillByExchange(params.FromBlock, params.ToBlock, params.Exchange) + if err != nil { + l.Errorw("error when backfill", "error", err) + responseErr(c, err) + return + } + + responseOK(c) +} diff --git a/v2/internal/worker/backfiller.go b/v2/internal/worker/backfiller.go new file mode 100644 index 0000000..65c965a --- /dev/null +++ b/v2/internal/worker/backfiller.go @@ -0,0 +1,184 @@ +package worker + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + + "github.com/KyberNetwork/tradelogs/v2/pkg/handler" + "github.com/KyberNetwork/tradelogs/v2/pkg/parser" + "github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/state" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" +) + +type BackFiller struct { + handler *handler.TradeLogHandler + backfillStorage backfill.IStorage + stateStorage state.Storage + l *zap.SugaredLogger + rpc *rpcnode.Client + parsers []parser.Parser +} + +func NewBackFiller(handler *handler.TradeLogHandler, backfillStorage backfill.IStorage, stateStorage state.Storage, + l *zap.SugaredLogger, rpc *rpcnode.Client, parsers []parser.Parser) *BackFiller { + return &BackFiller{ + handler: handler, + backfillStorage: backfillStorage, + stateStorage: stateStorage, + l: l, + rpc: rpc, + parsers: parsers, + } +} + +func (w *BackFiller) Run() error { + for { + first, last, err := w.getBlockRanges() + if err != nil { + return fmt.Errorf("cannot get block ranges: %w", err) + } + if first >= last { + return nil + } + w.l.Infow("backfill blocks", "first", first, "last", last) + err = w.processBlock(last - 1) + if err != nil { + w.l.Errorw("error when backfill block", "block", last-1, "err", err) + return fmt.Errorf("cannot process block: %w", err) + } + + err = w.backfillStorage.Update(last - 1) + if err != nil { + w.l.Errorw("error when update backfill table", "block", last-1, "err", err) + return fmt.Errorf("cannot update backfill table with %d: %w", last-1, err) + } + } +} + +// getBlockRanges return separated and non-overlapping ranges that cover all backfill ranges +func (w *BackFiller) getBlockRanges() (uint64, uint64, error) { + states, err := w.backfillStorage.Get() + if err != nil { + return 0, 0, err + } + if len(states) == 0 { + return 0, 0, nil + } + + first, last := states[0].DeployBlock, states[0].BackFilledBlock + + // get the oldest deploy block + for _, state := range states { + first = min(first, state.DeployBlock) + } + + // get the newest filled block + for _, state := range states { + // fill new exchange + if state.BackFilledBlock <= 0 { + blockNumber, err := w.getRecentBlock() + if err != nil { + return 0, 0, err + } + last = blockNumber + break + } + last = max(last, state.BackFilledBlock) + } + return first, last, nil +} + +// getRecentBlock get the newest processed block to backfill for new deployed exchange +func (w *BackFiller) getRecentBlock() (uint64, error) { + var blockNumber uint64 + block, err := w.stateStorage.GetState(state.ProcessedBlockKey) + if err == nil { + blockNumber, err = strconv.ParseUint(block, 10, 64) + if err == nil { + return blockNumber, nil + } + } + w.l.Errorw("cannot get from db", "err", err) + blockNumber, err = w.rpc.GetBlockNumber(context.Background()) + if err != nil { + return 0, fmt.Errorf("cannot get from node: %w", err) + } + return blockNumber, nil +} + +func (w *BackFiller) processBlock(blockNumber uint64) error { + block, err := w.rpc.BlockByNumber(context.Background(), blockNumber) + if err != nil { + return fmt.Errorf("cannot get block %d: %w", blockNumber, err) + } + + err = w.handler.ProcessBlock(block.Hash().String(), blockNumber, block.Time()) + if err != nil { + return fmt.Errorf("cannot process block %d: %w", blockNumber, err) + } + + w.l.Infow("successfully backfill block", "block", blockNumber) + return nil +} + +func (w *BackFiller) BackfillByExchange(from, to uint64, exchange string) error { + blocks, err := w.getBlockByExchange(from, to, exchange) + if err != nil { + return fmt.Errorf("cannot get block %d: %w", from, err) + } + + w.l.Infow("start to backfill blocks", "blocks", blocks) + + // backfill from the newest blocks, if error occurs we can continue backfill from error block + for _, b := range blocks { + err = w.processBlock(b) + if err != nil { + w.l.Errorw("cannot backfill block", "block", b, "err", err) + return fmt.Errorf("error when backfill to block %d: %w", b, err) + } + } + + return nil +} + +// getBlockByExchange get the blocks having logs of specific exchange, the block number sorted descending +func (w *BackFiller) getBlockByExchange(from, to uint64, exchange string) ([]uint64, error) { + var ( + address string + topics []string + ) + // get exchange address and topics to filter logs + for _, p := range w.parsers { + if strings.EqualFold(p.Exchange(), exchange) { + address = p.Address() + topics = p.Topics() + break + } + } + + // get logs + logs, err := w.rpc.FetchLogs(context.Background(), from, to, address, topics) + if err != nil { + return nil, err + } + + // get blocks need to backfill + blocksNumber := sets.New[uint64]() + for _, l := range logs { + blocksNumber.Insert(l.BlockNumber) + } + + // sort the blocks descending + blocks := blocksNumber.UnsortedList() + sort.Slice(blocks, func(i, j int) bool { + return blocks[i] > blocks[j] + }) + + return blocks, nil +} diff --git a/v2/internal/worker/log_parser.go b/v2/internal/worker/log_parser.go index b03c233..27f1f87 100644 --- a/v2/internal/worker/log_parser.go +++ b/v2/internal/worker/log_parser.go @@ -3,18 +3,25 @@ package worker import ( "context" "fmt" + "strconv" "github.com/KyberNetwork/evmlistener/pkg/types" "github.com/KyberNetwork/tradelogs/v2/pkg/handler" + "github.com/KyberNetwork/tradelogs/v2/pkg/storage/state" + "go.uber.org/zap" ) type LogParser struct { handler *handler.TradeLogHandler + state state.Storage + l *zap.SugaredLogger } -func NewParseLog(handler *handler.TradeLogHandler) *LogParser { +func NewParseLog(handler *handler.TradeLogHandler, s state.Storage, l *zap.SugaredLogger) *LogParser { return &LogParser{ handler: handler, + state: s, + l: l, } } @@ -41,10 +48,21 @@ func (w *LogParser) processMessage(msg types.Message) error { } for _, block := range msg.NewBlocks { - err := w.handler.ProcessBlock(block.Hash, block.Number.Uint64(), block.Timestamp) + blockNumber := block.Number.Uint64() + + err := w.handler.ProcessBlock(block.Hash, blockNumber, block.Timestamp) if err != nil { return fmt.Errorf("failed to process new block: %w", err) } + + // persist most recent processed block + err = w.state.SetState(state.ProcessedBlockKey, strconv.FormatUint(blockNumber, 10)) + if err != nil { + w.l.Errorw("cannot persist processed block", "blockNumber", blockNumber, "err", err) + return fmt.Errorf("cannot persist processed block: %w", err) + } + + w.l.Infow("successfully persist processed block", "blockNumber", blockNumber) } return nil } diff --git a/v2/pkg/app/price_filler.go b/v2/pkg/app/price_filler.go new file mode 100644 index 0000000..9027785 --- /dev/null +++ b/v2/pkg/app/price_filler.go @@ -0,0 +1,22 @@ +package app + +import ( + "github.com/urfave/cli" +) + +var BinanceAPIKeyFlag = cli.StringFlag{ + Name: "binance-api-key", + EnvVar: "BINANCE_API_KEY", +} + +var BinanceSecretKeyFlag = cli.StringFlag{ + Name: "binance-secret-key", + EnvVar: "BINANCE_SECRET_KEY", +} + +func PriceFillerFlags() []cli.Flag { + return []cli.Flag{ + BinanceAPIKeyFlag, + BinanceSecretKeyFlag, + } +} diff --git a/v2/pkg/app/server.go b/v2/pkg/app/server.go new file mode 100644 index 0000000..0225d0f --- /dev/null +++ b/v2/pkg/app/server.go @@ -0,0 +1,16 @@ +package app + +import "github.com/urfave/cli" + +var HTTPBackfillServerFlag = cli.StringFlag{ + Name: "backfill-server-address", + Usage: "Run the rest for backfill server", + EnvVar: "BACKFILL_SERVER_ADDRESS", + Value: "localhost:8081", +} + +func HTTPServerFlags() []cli.Flag { + return []cli.Flag{ + HTTPBackfillServerFlag, + } +} diff --git a/v2/pkg/handler/trade_logs.go b/v2/pkg/handler/trade_logs.go index 0817d52..0b687ce 100644 --- a/v2/pkg/handler/trade_logs.go +++ b/v2/pkg/handler/trade_logs.go @@ -4,12 +4,10 @@ import ( "context" "encoding/json" "fmt" - "strconv" "github.com/KyberNetwork/tradelogs/v2/pkg/kafka" "github.com/KyberNetwork/tradelogs/v2/pkg/parser" "github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode" - "github.com/KyberNetwork/tradelogs/v2/pkg/storage/state" "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs" storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types" "github.com/KyberNetwork/tradelogs/v2/pkg/types" @@ -26,7 +24,6 @@ type TradeLogHandler struct { parsers []parser.Parser kafkaTopic string publisher kafka.Publisher - state state.Storage } type logMetadata struct { @@ -38,7 +35,7 @@ type logMetadata struct { } func NewTradeLogHandler(l *zap.SugaredLogger, rpc *rpcnode.Client, storage *tradelogs.Manager, parsers []parser.Parser, - kafkaTopic string, publisher kafka.Publisher, state state.Storage) *TradeLogHandler { + kafkaTopic string, publisher kafka.Publisher) *TradeLogHandler { return &TradeLogHandler{ l: l, rpcClient: rpc, @@ -46,15 +43,14 @@ func NewTradeLogHandler(l *zap.SugaredLogger, rpc *rpcnode.Client, storage *trad parsers: parsers, kafkaTopic: kafkaTopic, publisher: publisher, - state: state, } } func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, timestamp uint64) error { // remove old trade log in db of processing block - err := h.RevertBlock([]uint64{blockNumber}) + err := h.storage.Delete([]uint64{blockNumber}) if err != nil { - return fmt.Errorf("error when revert block number %d before processing: %w", blockNumber, err) + return fmt.Errorf("delete blocks error: %w", err) } // fetch trace call @@ -112,13 +108,7 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim h.l.Infow("successfully publish trade logs", "blockNumber", blockNumber, "success", passCount, "fail", failCount) } - // persist most recent processed block - err = h.state.SetState(state.ProcessedBlockKey, strconv.FormatUint(blockNumber, 10)) - if err != nil { - h.l.Errorw("cannot persist processed block", "blockNumber", blockNumber, "err", err) - return fmt.Errorf("cannot persist processed block: %w", err) - } - h.l.Infow("successfully persist processed block", "blockNumber", blockNumber) + h.l.Infow("successfully process block", "blockNumber", blockNumber) return nil } @@ -188,11 +178,13 @@ func (h *TradeLogHandler) RevertBlock(blocks []uint64) error { if err != nil { h.l.Errorw(" error when marshal revert message to json", "err", err) } + err = h.publisher.Publish(h.kafkaTopic, msgBytes) if err != nil { h.l.Errorw("error when publish revert message", "err", err) } - h.l.Infow("published revert message", "message", msgBytes) + + h.l.Infow("published revert message", "message", string(msgBytes)) return nil } diff --git a/v2/pkg/handler/trade_logs_test.go b/v2/pkg/handler/trade_logs_test.go index 88590d7..e4ef3fd 100644 --- a/v2/pkg/handler/trade_logs_test.go +++ b/v2/pkg/handler/trade_logs_test.go @@ -32,7 +32,8 @@ func TestTradeLogHandler_ProcessBlock(t *testing.T) { mockStorage := &mocks.MockStorage{} mockStorage.On("Exchange").Return("zerox"). - On("Insert", mock.Anything).Return(nil) + On("Insert", mock.Anything).Return(nil). + On("Delete", mock.Anything).Return(nil) s := tradelogs.NewManager(zap.S(), []types.Storage{mockStorage}) p := zxotc2.MustNewParser() @@ -40,10 +41,7 @@ func TestTradeLogHandler_ProcessBlock(t *testing.T) { mockKafka := &mocks.MockPublisher{} mockKafka.On("Publish", mock.Anything, mock.Anything).Return(nil) - mockState := &mocks.MockState{} - mockState.On("SetState", mock.Anything, mock.Anything).Return(nil) - - h := NewTradeLogHandler(zap.S(), client, s, []parser.Parser{p}, "test", mockKafka, mockState) + h := NewTradeLogHandler(zap.S(), client, s, []parser.Parser{p}, "test", mockKafka) err = h.ProcessBlock("0x04b65fabd0eaaa00eae00782128a8add39e30098552738c305610259f14ea048", 20181990, 1725436442) if err != nil { @@ -51,7 +49,7 @@ func TestTradeLogHandler_ProcessBlock(t *testing.T) { } assert.True(t, mockStorage.AssertNumberOfCalls(t, "Insert", 1)) - assert.True(t, mockKafka.AssertNumberOfCalls(t, "Publish", 1)) + assert.True(t, mockKafka.AssertNumberOfCalls(t, "Publish", 2)) } func TestAssignLogIndexes(t *testing.T) { diff --git a/v2/pkg/parser/parser.go b/v2/pkg/parser/parser.go index 185bc24..18a38d5 100644 --- a/v2/pkg/parser/parser.go +++ b/v2/pkg/parser/parser.go @@ -20,4 +20,5 @@ type Parser interface { UseTraceCall() bool ParseWithCallFrame(callFrame types.CallFrame, log ethereumTypes.Log, blockTime uint64) ([]storageTypes.TradeLog, error) LogFromExchange(log ethereumTypes.Log) bool + Address() string } diff --git a/v2/pkg/parser/zxotc/parser.go b/v2/pkg/parser/zxotc/parser.go index 7992700..d532ab8 100644 --- a/v2/pkg/parser/zxotc/parser.go +++ b/v2/pkg/parser/zxotc/parser.go @@ -144,3 +144,7 @@ func (p *Parser) LogFromExchange(log ethereumTypes.Log) bool { len(log.Topics) > 0 && strings.EqualFold(log.Topics[0].String(), p.eventHash) } + +func (p *Parser) Address() string { + return constant.Addr0x +} diff --git a/v2/pkg/rpcnode/client.go b/v2/pkg/rpcnode/client.go index badeb6c..c83ba84 100644 --- a/v2/pkg/rpcnode/client.go +++ b/v2/pkg/rpcnode/client.go @@ -6,6 +6,8 @@ import ( "math/big" "github.com/KyberNetwork/tradelogs/v2/pkg/types" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" ethereumTypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "go.uber.org/zap" @@ -45,27 +47,6 @@ func (c *Client) FetchTraceCalls(ctx context.Context, blockHash string) ([]types return result, err } -func (c *Client) FetchLogs(ctx context.Context, fromBlock, toBlock *big.Int, topics []string) ([]ethereumTypes.Log, error) { - var ( - result []ethereumTypes.Log - err error - ) - for i, client := range c.ethClient { - err = client.Client().CallContext(ctx, &result, "eth_getLogs", map[string]interface{}{ - "fromBlock": fromBlock, - "toBlock": toBlock, - "topics": topics, - }) - if err != nil { - c.l.Errorw("fetch logs failed", "error", err, "clientID", i) - continue - } - return result, nil - } - - return result, err -} - func (c *Client) FetchLogsByBlockHash(ctx context.Context, blockHash string) ([]ethereumTypes.Log, error) { var ( result []ethereumTypes.Log @@ -100,3 +81,47 @@ func (c *Client) GetBlockNumber(ctx context.Context) (uint64, error) { } return 0, fmt.Errorf("block number not found: %w", err) } + +func (c *Client) BlockByNumber(ctx context.Context, blockNumber uint64) (*ethereumTypes.Block, error) { + var ( + block *ethereumTypes.Block + err error + ) + for i, client := range c.ethClient { + block, err = client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) + if err != nil { + c.l.Errorw("get block failed", "error", err, "clientID", i, "blockNumber", blockNumber) + continue + } + return block, nil + } + return nil, fmt.Errorf("block with number %d not found: %w", blockNumber, err) +} + +func (c *Client) FetchLogs(ctx context.Context, from, to uint64, address string, topics []string) ([]ethereumTypes.Log, error) { + var ( + logs []ethereumTypes.Log + err error + ) + filter := ethereum.FilterQuery{ + FromBlock: new(big.Int).SetUint64(from), + ToBlock: new(big.Int).SetUint64(to), + Addresses: []common.Address{common.HexToAddress(address)}, + } + newTopics := make([]common.Hash, len(topics)) + if len(topics) > 0 { + for i, topic := range topics { + newTopics[i] = common.HexToHash(topic) + } + filter.Topics = [][]common.Hash{newTopics} + } + for i, client := range c.ethClient { + logs, err = client.FilterLogs(ctx, filter) + if err != nil { + c.l.Errorw("get logs failed", "error", err, "clientID", i, "from", from, "to", to, "topics", topics) + continue + } + return logs, nil + } + return nil, fmt.Errorf("error when get logs: %w", err) +} diff --git a/v2/pkg/storage/backfill/storage.go b/v2/pkg/storage/backfill/storage.go new file mode 100644 index 0000000..b4effa8 --- /dev/null +++ b/v2/pkg/storage/backfill/storage.go @@ -0,0 +1,57 @@ +package backfill + +import ( + "fmt" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" +) + +type IStorage interface { + Get() ([]State, error) + Update(backfilled uint64) error +} + +type Storage struct { + db *sqlx.DB + l *zap.SugaredLogger +} + +type State struct { + Exchange string `db:"exchange" json:"exchange"` + DeployBlock uint64 `db:"deploy_block" json:"deploy_block"` + BackFilledBlock uint64 `db:"backfilled_block" json:"backfilled_block"` +} + +func New(l *zap.SugaredLogger, db *sqlx.DB) *Storage { + return &Storage{ + db: db, + l: l, + } +} + +func (s *Storage) Get() ([]State, error) { + var infos []State + // we only need exchanges that not fully filled + err := s.db.Select(&infos, "SELECT * FROM backfill WHERE backfilled_block > deploy_block OR backfilled_block = 0") + if err != nil { + s.l.Errorw("failed to fetch all backfill", "err", err) + return nil, fmt.Errorf("failed to fetch all backfill state: %w", err) + } + return infos, nil +} + +func (s *Storage) Update(backfilled uint64) error { + res, err := s.db.Exec("UPDATE backfill SET backfilled_block=$1 WHERE backfilled_block > $2 OR backfilled_block = 0", backfilled, backfilled) + if err != nil { + s.l.Errorw("failed to update backfill", "err", err) + return fmt.Errorf("failed to update backfill: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + s.l.Errorw("failed to get rows affected", "err", err) + return fmt.Errorf("failed to get rows affected: %w", err) + } + s.l.Infow("backfill updated", "rowsAffected", rowsAffected) + return nil +} diff --git a/v2/pkg/storage/state/storage_test.go b/v2/pkg/storage/state/storage_test.go index 2189ca1..33e7c41 100644 --- a/v2/pkg/storage/state/storage_test.go +++ b/v2/pkg/storage/state/storage_test.go @@ -11,7 +11,7 @@ import ( func TestState_GetState(t *testing.T) { l := zap.S() db, tearDown := testutil.MustNewDevelopmentDB( - "../../../cmd/parse_log/migrations", + "../../../cmd/migrations", testutil.DefaultDSN(), testutil.RandomString(8), ) diff --git a/v2/pkg/storage/tradelogs/zxotc/storage_test.go b/v2/pkg/storage/tradelogs/zxotc/storage_test.go index b7a8044..a75f3dd 100644 --- a/v2/pkg/storage/tradelogs/zxotc/storage_test.go +++ b/v2/pkg/storage/tradelogs/zxotc/storage_test.go @@ -22,7 +22,7 @@ type TestCase struct { func TestSimple(t *testing.T) { l := zap.S() db, tearDown := testutil.MustNewDevelopmentDB( - "../../../../cmd/parse_log/migrations", + "../../../../cmd/migrations", testutil.DefaultDSN(), testutil.RandomString(8), )