Skip to content

Commit

Permalink
TRD-637 Tradelogs v2: Backfill Flow (#78)
Browse files Browse the repository at this point in the history
* TRD-637 tradelogs v2: backfill flow

* TRD-637 update unit test

* TRD-637 update unit test

* update release flow

* TRD-637 refactor backfill flow

* TRD-637 remove price filler

* TRD-637 update backfill table
  • Loading branch information
linhnt3400 authored Oct 9, 2024
1 parent 7b0a323 commit 7627ccf
Show file tree
Hide file tree
Showing 21 changed files with 671 additions and 58 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
image_name: "asia.gcr.io/kyber-operation/foundation/trading/${{ env.SERVICE }}"
image_tag: ${{ steps.get_tag.outputs.image_tag }}
branch_tag: ${{ steps.get_tag.outputs.branch_tag }}
dockerfile_path: ${{ steps.determine_dockerfile.outputs.dockerfile }}
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -66,6 +67,18 @@ jobs:
branch_tag="$(echo "$CURRENT_BRANCH" | sed 's/[^a-zA-Z0-9]/-/g' | sed 's/--*/-/g' | sed 's/-$//g')"
echo "::set-output name=image_tag::$branch_tag-$short_sha"
echo "::set-output name=branch_tag::$branch_tag-$short_sha"
- name: Determine Dockerfile
id: determine_dockerfile
run: |
if [[ "${{ steps.current_branch.outputs.value }}" == v1* ]]; then
echo "::set-output name=dockerfile::Dockerfile"
elif [[ "${{ steps.current_branch.outputs.value }}" == v2* ]]; then
echo "::set-output name=dockerfile::Dockerfile-v2"
else
echo "::set-output name=dockerfile::Dockerfile" # Default to Dockerfile if not v1 or v2
fi
lint:
name: Run golangci-lint
runs-on: [ubuntu-22.04]
Expand Down Expand Up @@ -182,6 +195,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
file: ${{ needs.prepare.outputs.dockerfile_path }} # Use the corresponding Dockerfile version
push: true
labels: |
commit=${{ github.sha }}
Expand Down
13 changes: 13 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ jobs:
outputs:
version_tag: ${{ steps.version_tag.outputs.value }}
build_date: ${{ steps.build_date.outputs.value }}
dockerfile_path: ${{ steps.determine_dockerfile.outputs.dockerfile }}
steps:
- name: Format version tag
shell: bash
Expand All @@ -30,6 +31,17 @@ jobs:
id: build_date
run: echo "::set-output name=value::$(date +%FT%T%z)"

- name: Determine Dockerfile
id: determine_dockerfile
run: |
if [[ "${{ github.event.inputs.version }}" == v1* ]]; then
echo "::set-output name=dockerfile::Dockerfile"
elif [[ "${{ github.event.inputs.version }}" == v2* ]]; then
echo "::set-output name=dockerfile::Dockerfile-v2"
else
echo "::set-output name=dockerfile::Dockerfile" # Default to Dockerfile if not v1 or v2
fi
docker:
needs:
- prepare
Expand Down Expand Up @@ -66,6 +78,7 @@ jobs:
uses: docker/build-push-action@v3
with:
context: .
file: ${{ needs.prepare.outputs.dockerfile_path }} # Use the corresponding Dockerfile version
push: true
build-args: |
VERSION=${{ env.VERSION_TAG }}
Expand Down
9 changes: 0 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,17 @@ COPY . .

RUN go build -o app ./cmd/tradelogs

RUN go build -o parse_log ./v2/cmd/parse_log


## DEPLOY
FROM debian:bullseye

RUN apt-get update && \
apt install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*

### tradelogs v1
WORKDIR /cmd

COPY --from=builder /src/app /cmd/app

COPY cmd/tradelogs/migrations migrations

### tradelogs v2
WORKDIR /parse_log
COPY --from=builder /src/parse_log /parse_log/parse_log
COPY v2/cmd/parse_log/migrations /parse_log/migrations

CMD /cmd/app
27 changes: 27 additions & 0 deletions Dockerfile-v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## BUILDER
FROM golang:1.22-bullseye as builder

WORKDIR /src

COPY . .

RUN go build -o parse_log ./v2/cmd/parse_log
RUN go build -o backfill ./v2/cmd/backfill


## DEPLOY
FROM debian:bullseye

RUN apt-get update && \
apt install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*


WORKDIR /v2

COPY --from=builder /src/parse_log /v2/parse_log
COPY --from=builder /src/backfill /v2/backfill

COPY v2/cmd/migrations /v2/migrations

CMD /v2/parse_log
150 changes: 150 additions & 0 deletions v2/cmd/backfill/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"fmt"
"log"
"os"

"github.com/KyberNetwork/tradelogs/v2/internal/server"
"github.com/KyberNetwork/tradelogs/v2/internal/worker"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
"github.com/KyberNetwork/tradelogs/v2/pkg/handler"
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser"
"github.com/KyberNetwork/tradelogs/v2/pkg/parser/zxotc"
"github.com/KyberNetwork/tradelogs/v2/pkg/rpcnode"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/backfill"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/state"
"github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs"
storageTypes "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/types"
zxotcStorage "github.com/KyberNetwork/tradelogs/v2/pkg/storage/tradelogs/zxotc"
"github.com/KyberNetwork/tradinglib/pkg/dbutil"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jmoiron/sqlx"
"github.com/urfave/cli"
"go.uber.org/zap"
)

func main() {
app := libapp.NewApp()
app.Name = "trade log backfill service"
app.Action = run

app.Flags = append(app.Flags, libapp.RPCNodeFlags()...)
app.Flags = append(app.Flags, libapp.PostgresSQLFlags("tradelogs_v2")...)
app.Flags = append(app.Flags, libapp.KafkaFlag()...)
app.Flags = append(app.Flags, libapp.HTTPServerFlags()...)

if err := app.Run(os.Args); err != nil {
log.Panic(err)
}
}

func run(c *cli.Context) error {
logger, _, flush, err := libapp.NewLogger(c)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

defer flush()

zap.ReplaceGlobals(logger)
l := logger.Sugar()
l.Infow("Starting backfill service")

db, err := initDB(c)
if err != nil {
return fmt.Errorf("cannot init DB: %w", err)
}

// trade log manager
storages := []storageTypes.Storage{
zxotcStorage.New(l, db),
}
manager := tradelogs.NewManager(l, storages)

// backfill storage
backfillStorage := backfill.New(l, db)

// state storage
stateStorage := state.New(l, db)

// rpc node to query trace call
rpcURL := c.StringSlice(libapp.RPCUrlFlagName)
if len(rpcURL) == 0 {
return fmt.Errorf("rpc url is empty")
}

ethClients := make([]*ethclient.Client, len(rpcURL))
for i, url := range rpcURL {
client, err := ethclient.Dial(url)
if err != nil {
return fmt.Errorf("cannot dial eth client: %w", err)
}
ethClients[i] = client
}
rpcNode := rpcnode.NewClient(l, ethClients...)

parsers := []parser.Parser{
//kyberswap.MustNewParser(),
zxotc.MustNewParser(),
//paraswap.MustNewParser(),
//kyberswaprfq.MustNewParser(),
//hashflowv3.MustNewParser(),
//oneinchv6.MustNewParser(traceCalls),
//uniswapxv1.MustNewParser(traceCalls),
//uniswapx.MustNewParser(traceCalls),
//bebop.MustNewParser(traceCalls),
//zxrfqv3.MustNewParserWithDeployer(traceCalls, ethClient, common.HexToAddress(parser.Deployer0xV3)),
}

// kafka broadcast topic
broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name)
err = kafka.ValidateTopicName(broadcastTopic)
if err != nil {
return fmt.Errorf("invalid kafka topic: %w", err)
}

// kafka publisher for broadcasting trade logs
kafkaPublisher, err := kafka.NewPublisher(libapp.KafkaConfigFromFlags(c))
if err != nil {
return fmt.Errorf("cannot create kafka publisher: %w", err)
}

// trade log handler
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher)

// parse log worker
w := worker.NewBackFiller(tradeLogHandler, backfillStorage, stateStorage, l, rpcNode, parsers)

go func() {
if err = w.Run(); err != nil {
panic(err)
}
}()

s := server.NewBackfill(l, c.String(libapp.HTTPBackfillServerFlag.Name), w)

return s.Run()
}

func initDB(c *cli.Context) (*sqlx.DB, error) {
db, err := libapp.NewDB(map[string]interface{}{
"host": c.String(libapp.PostgresHost.Name),
"port": c.Int(libapp.PostgresPort.Name),
"user": c.String(libapp.PostgresUser.Name),
"password": c.String(libapp.PostgresPassword.Name),
"dbname": c.String(libapp.PostgresDatabase.Name),
"sslmode": "disable",
})
if err != nil {
return nil, err
}

_, err = dbutil.RunMigrationUp(db.DB, c.String(libapp.PostgresMigrationPath.Name),
c.String(libapp.PostgresDatabase.Name))
if err != nil {
return nil, err
}
return db, nil
}
File renamed without changes.
8 changes: 8 additions & 0 deletions v2/cmd/migrations/00002_add_backfill.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create table backfill (
exchange text not null primary key,
deploy_block bigint not null,
backfilled_block bigint not null default 0
);

insert into backfill (exchange, deploy_block)
values ('zerox', 20926953)
5 changes: 2 additions & 3 deletions v2/cmd/parse_log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func main() {
app.Flags = append(app.Flags, libapp.EvmListenerFlags()...)
app.Flags = append(app.Flags, libapp.RPCNodeFlags()...)
app.Flags = append(app.Flags, libapp.KafkaFlag()...)
//app.Flags = append(app.Flags, pricefiller.PriceFillerFlags()...)

if err := app.Run(os.Args); err != nil {
log.Panic(err)
Expand Down Expand Up @@ -109,10 +108,10 @@ func run(c *cli.Context) error {
}

// trade log handler
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher, s)
tradeLogHandler := handler.NewTradeLogHandler(l, rpcNode, manager, parsers, broadcastTopic, kafkaPublisher)

// parse log worker
w := worker.NewParseLog(tradeLogHandler)
w := worker.NewParseLog(tradeLogHandler, s, l)

mostRecentBlock, err := getMostRecentBlock(l, s, rpcNode)
if err != nil {
Expand Down
Loading

0 comments on commit 7627ccf

Please sign in to comment.