diff --git a/cmd/tradelogs/main.go b/cmd/tradelogs/main.go index 5dac484..6d25643 100644 --- a/cmd/tradelogs/main.go +++ b/cmd/tradelogs/main.go @@ -8,12 +8,14 @@ import ( "os" "time" + "github.com/KyberNetwork/go-binance/v2" "github.com/KyberNetwork/tradelogs/pkg/dune" "github.com/KyberNetwork/tradelogs/pkg/parser" "github.com/KyberNetwork/tradelogs/pkg/parser/bebop" "github.com/KyberNetwork/tradelogs/pkg/parser/oneinch" "github.com/KyberNetwork/tradelogs/pkg/parser/oneinchv6" "github.com/KyberNetwork/tradelogs/pkg/parser/uniswapx" + "github.com/KyberNetwork/tradelogs/pkg/pricefiller" "github.com/KyberNetwork/tradelogs/pkg/rpcnode" "github.com/KyberNetwork/tradelogs/pkg/tracecall" @@ -50,6 +52,7 @@ func main() { app.Flags = append(app.Flags, libapp.HTTPServerFlags()...) app.Flags = append(app.Flags, libapp.BigqueryFlags()...) app.Flags = append(app.Flags, libapp.RPCNodeFlags()...) + app.Flags = append(app.Flags, pricefiller.PriceFillerFlags()...) if err := app.Run(os.Args); err != nil { log.Panic(err) @@ -111,8 +114,15 @@ func run(c *cli.Context) error { bebop.MustNewParser(traceCalls), } + binanceClient := binance.NewClient(c.String(pricefiller.BinanceAPIKeyFlag.Name), c.String(pricefiller.BinanceSecretKeyFlag.Name)) + priceFiller, err := pricefiller.NewPriceFiller(binanceClient, s) + if err != nil { + l.Errorw("Error while init price filler") + return err + } + tradeLogChan := make(chan storage.TradeLog, 1000) - w, err := worker.New(l, s, listener, tradeLogChan, parsers...) + w, err := worker.New(l, s, listener, priceFiller, tradeLogChan, parsers...) if err != nil { l.Errorw("Error while init worker") return err diff --git a/cmd/tradelogs/migrations/00008_add_price_usd_column.up.sql b/cmd/tradelogs/migrations/00008_add_price_usd_column.up.sql new file mode 100644 index 0000000..396706d --- /dev/null +++ b/cmd/tradelogs/migrations/00008_add_price_usd_column.up.sql @@ -0,0 +1,10 @@ +CREATE TYPE tradelog_states AS ENUM ('new', 'processed'); + +ALTER TABLE tradelogs + ADD COLUMN maker_token_price FLOAT NOT NULL DEFAULT 0, + ADD COLUMN taker_token_price FLOAT NOT NULL DEFAULT 0, + ADD COLUMN maker_usd_amount FLOAT NOT NULL DEFAULT 0, + ADD COLUMN taker_usd_amount FLOAT NOT NULL DEFAULT 0, + ADD COLUMN state tradelog_states NOT NULL DEFAULT 'new'; + +CREATE INDEX tradelogs_state_idx ON tradelogs (state); diff --git a/go.mod b/go.mod index 1fa4175..fd582fd 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ toolchain go1.22.2 require ( cloud.google.com/go/bigquery v1.56.0 github.com/KyberNetwork/cclog v1.1.0 + github.com/KyberNetwork/go-binance/v2 v2.0.3 + github.com/KyberNetwork/tradinglib v0.4.36 github.com/TheZeroSlave/zapsentry v1.20.2 github.com/ethereum/go-ethereum v1.13.14 github.com/getsentry/sentry-go v0.26.0 @@ -33,13 +35,13 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.5 // indirect github.com/DataDog/zstd v1.5.2 // indirect - github.com/KyberNetwork/tradinglib v0.4.36 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/adshao/go-binance/v2 v2.5.1 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/apache/arrow/go/v12 v12.0.0 // indirect github.com/apache/thrift v0.16.0 // indirect + github.com/bitly/go-simplejson v0.5.0 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect - github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/bytedance/sonic v1.10.2 // indirect github.com/cespare/cp v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index e0eeb13..b08bdcf 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/KyberNetwork/cclog v1.1.0 h1:3gqKpSayABuTjS4J7H8qcQHrlfyggpjmHUnCYQT0 github.com/KyberNetwork/cclog v1.1.0/go.mod h1:vf9+yocFGEyqqObn4Gr9rtj4ffnuKVpKWiSCtfsxIHg= github.com/KyberNetwork/evmlistener v0.4.7 h1:SlJwzqngj2N2ot/1M391GbwE/A6wyKfV9z1RIlxZpyI= github.com/KyberNetwork/evmlistener v0.4.7/go.mod h1:7ylrHTrF9bRJRcVdw02f4P2k3ohc1/gKGQ3HgaYtfE8= +github.com/KyberNetwork/go-binance/v2 v2.0.3 h1:VsBBHXAJTxryBsdO6U9MyK+oHEJ9qZtQpZvmEzpHmGA= +github.com/KyberNetwork/go-binance/v2 v2.0.3/go.mod h1:ihNzjOXDgxZ4cSENV2sQfXnO30wobXQPpqTuKxxUVjw= github.com/KyberNetwork/tradinglib v0.4.36 h1:h8+pKMPiqnR/3Q+wGq1JWYxP3qFJ3qXqsPJQETuJcFw= github.com/KyberNetwork/tradinglib v0.4.36/go.mod h1:3Ciie58Qmd0mFSnUWAg6phbalao6f2GGcAZ6nxw/lcQ= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= @@ -47,6 +49,8 @@ github.com/TheZeroSlave/zapsentry v1.20.2 h1:llgC91ZJdoU/OzGxYpUlEhKinf65mw9hJ2K github.com/TheZeroSlave/zapsentry v1.20.2/go.mod h1:D1YMfSuu6xnkhwFXxrronesmsiyDhIqo+86I3Ok+r64= github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= +github.com/adshao/go-binance/v2 v2.5.1 h1:3CV9iyzEpHFqZFVzjnxI3GtZdIWH9Z/ewR66+q+mCk0= +github.com/adshao/go-binance/v2 v2.5.1/go.mod h1:41Up2dG4NfMXpCldrDPETEtiOq+pHoGsFZ73xGgaumo= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= @@ -58,8 +62,12 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 h1:KdUfX2zKommPRa+PD0sWZUyXe9w277ABlgELO7H04IM= @@ -528,6 +536,7 @@ github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobt github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/internal/worker/worker.go b/internal/worker/worker.go index ecbbc8e..6fb153f 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -9,6 +9,7 @@ import ( "github.com/KyberNetwork/tradelogs/pkg/convert" "github.com/KyberNetwork/tradelogs/pkg/evmlistenerclient" "github.com/KyberNetwork/tradelogs/pkg/parser" + "github.com/KyberNetwork/tradelogs/pkg/pricefiller" "github.com/KyberNetwork/tradelogs/pkg/storage" "go.uber.org/zap" ) @@ -18,10 +19,12 @@ type Worker struct { l *zap.SugaredLogger s *storage.Storage p map[string]parser.Parser + priceFiller *pricefiller.PriceFiller tradeLogChan chan storage.TradeLog } -func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, tradeLogChan chan storage.TradeLog, +func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, + priceFiller *pricefiller.PriceFiller, tradeLogChan chan storage.TradeLog, parsers ...parser.Parser) (*Worker, error) { p := make(map[string]parser.Parser) for _, ps := range parsers { @@ -34,6 +37,7 @@ func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.C l: l, s: s, p: p, + priceFiller: priceFiller, tradeLogChan: tradeLogChan, }, nil } @@ -115,6 +119,7 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error { if err := w.s.Delete(deleteBlocks); err != nil { return err } + w.priceFiller.FullFillTradeLogs(insertOrders) if err := w.s.Insert(insertOrders); err != nil { return err } @@ -164,6 +169,7 @@ func (w *Worker) retryParseLog() error { insertOrders = append(insertOrders, order) } + w.priceFiller.FullFillTradeLogs(insertOrders) if err := w.s.Insert(insertOrders); err != nil { return err } diff --git a/pkg/pricefiller/flags.go b/pkg/pricefiller/flags.go new file mode 100644 index 0000000..a9aa897 --- /dev/null +++ b/pkg/pricefiller/flags.go @@ -0,0 +1,22 @@ +package pricefiller + +import ( + "github.com/urfave/cli" +) + +var BinanceAPIKeyFlag = cli.StringFlag{ + Name: "binance-api-key", + EnvVar: "BINANCE_API_KEY", +} + +var BinanceSecretKeyFlag = cli.StringFlag{ + Name: "binance-secret-key", + EnvVar: "BINANCE_SECRET_KEY", +} + +func PriceFillerFlags() []cli.Flag { + return []cli.Flag{ + BinanceAPIKeyFlag, + BinanceSecretKeyFlag, + } +} diff --git a/pkg/pricefiller/ks_client.go b/pkg/pricefiller/ks_client.go new file mode 100644 index 0000000..02dff50 --- /dev/null +++ b/pkg/pricefiller/ks_client.go @@ -0,0 +1,105 @@ +package pricefiller + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +const ksSettingUrl = "https://ks-setting.kyberswap.com/api/v1" + +type KsClient struct { + client *http.Client + baseURL string +} + +func NewKsClient() *KsClient { + return &KsClient{ + client: &http.Client{}, + baseURL: ksSettingUrl, + } +} + +func (c *KsClient) DoRequest(ctx context.Context, method, path string, jsonData interface{}, out interface{}) error { + req, err := createRequest(ctx, method, path, jsonData) + if err != nil { + return err + } + + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + bb, err := readResponse(resp.Body, out) + if err != nil { + return fmt.Errorf("readResponse error: %w, data: %s", err, string(bb)) + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server return %d - %v", resp.StatusCode, string(bb)) + } + + return nil +} + +func createRequest(ctx context.Context, method, url string, jsonData interface{}) (*http.Request, error) { + var buf io.Reader + if jsonData != nil { + body, err := json.Marshal(jsonData) + if err != nil { + return nil, err + } + buf = bytes.NewBuffer(body) + } + req, err := http.NewRequestWithContext(ctx, method, url, buf) + if err != nil { + return nil, err + } + if jsonData != nil { + req.Header.Set("Content-Type", "application/json") + } + return req, nil +} + +func readResponse(data io.Reader, dataField interface{}) ([]byte, error) { + if dataField == nil { + return nil, fmt.Errorf("nil data") + } + bb, err := io.ReadAll(data) + if err != nil { + return nil, err + } + return bb, json.Unmarshal(bb, dataField) +} + +type TokenCatalogResp struct { + Code int64 `json:"code"` + Message string `json:"message"` + Data struct { + Tokens []TokenCatalog `json:"tokens"` + } +} + +type TokenCatalog struct { + Decimals int64 `json:"decimals"` +} + +func (c *KsClient) GetTokenCatalog(address string) (TokenCatalogResp, error) { + var resp TokenCatalogResp + err := c.DoRequest(context.Background(), http.MethodGet, + fmt.Sprintf("%s/tokens?chainIds=%d&query=%s", c.baseURL, NetworkETHChanID, address), + nil, &resp) + if err != nil { + return TokenCatalogResp{}, err + } + + if resp.Code != 0 { + return TokenCatalogResp{}, fmt.Errorf("invalid response code: %d", resp.Code) + } + + return resp, nil +} diff --git a/pkg/pricefiller/price_fillter.go b/pkg/pricefiller/price_fillter.go new file mode 100644 index 0000000..cae0db4 --- /dev/null +++ b/pkg/pricefiller/price_fillter.go @@ -0,0 +1,222 @@ +package pricefiller + +import ( + "context" + "errors" + "strconv" + "strings" + "sync" + "time" + + "github.com/KyberNetwork/go-binance/v2" + "github.com/KyberNetwork/tradelogs/pkg/storage" + "go.uber.org/zap" +) + +const ( + NetworkETHChanID = 1 + NetworkETH = "ETH" + updateAllCoinInfoInterval = time.Hour + backfillTradeLogsPriceInterval = time.Hour +) + +var ( + ErrNoPrice = errors.New(("no price from binance")) +) + +type CoinInfo struct { + Coin string + Network string + ContractAddress string + Decimals int64 +} + +type PriceFiller struct { + l *zap.SugaredLogger + s *storage.Storage + mu sync.Mutex + ksClient *KsClient + binanceClient *binance.Client + mappedCoinInfo map[string]CoinInfo // address - coinInfo +} + +func NewPriceFiller(binanceClient *binance.Client, s *storage.Storage) (*PriceFiller, error) { + p := &PriceFiller{ + l: zap.S(), + s: s, + ksClient: NewKsClient(), + binanceClient: binanceClient, + } + + if err := p.updateAllCoinInfo(); err != nil { + return nil, err + } + + go p.runUpdateAllCoinInfoRoutine() + go p.runBackFillTradelogPriceRoutine() + + return p, nil +} + +func (p *PriceFiller) getPrice(token string, timestamp int64) (float64, error) { + candles, err := p.binanceClient.NewKlinesService().Symbol(withAlias(token) + "USDT"). + Interval("1s").StartTime(timestamp).EndTime(timestamp).Do(context.Background()) + if err != nil { + return 0, err + } + if len(candles) == 0 { + return 0, ErrNoPrice + } + low, err := strconv.ParseFloat(candles[0].Low, 64) + if err != nil { + return 0, err + } + high, err := strconv.ParseFloat(candles[0].High, 64) + if err != nil { + return 0, err + } + return (low + high) / 2, nil +} + +func (p *PriceFiller) updateAllCoinInfo() error { + resp, err := p.binanceClient.NewAllCoinService().Do(context.Background()) + if err != nil { + p.l.Errorw("Failed to get all coins info", "err", err) + return err + } + + newMappedCoinInfo := make(map[string]CoinInfo) + for _, coinInfo := range resp { + for _, network := range coinInfo.NetworkList { + if network.Network == NetworkETH { + address := strings.ToLower(network.ContractAddress) + newMappedCoinInfo[address] = CoinInfo{ + Coin: network.Coin, + Network: network.Network, + ContractAddress: address, + } + break + } + } + } + + p.l.Infow("New mapped coin info", "data", newMappedCoinInfo) + p.mappedCoinInfo = newMappedCoinInfo + return nil +} + +func (p *PriceFiller) runUpdateAllCoinInfoRoutine() { + ticker := time.NewTicker(updateAllCoinInfoInterval) + defer ticker.Stop() + + for range ticker.C { + if err := p.updateAllCoinInfo(); err != nil { + p.l.Errorw("Failed to updateAllCoinInfo", "err", err) + } + } +} + +func (p *PriceFiller) runBackFillTradelogPriceRoutine() { + ticker := time.NewTicker(backfillTradeLogsPriceInterval) + defer ticker.Stop() + + for range ticker.C { + tradeLogs, err := p.s.Get(storage.TradeLogsQuery{ + State: string(storage.TradeLogStateNew), + Limit: 100, + }) + if err != nil { + p.l.Errorw("Failed to get tradeLogs", "err", err) + continue + } + + p.FullFillTradeLogs(tradeLogs) + if err := p.s.Insert(tradeLogs); err != nil { + p.l.Errorw("Failed to insert tradeLogs", "err", err) + continue + } + + p.l.Infow("backfill tradelog price successfully", "trades", tradeLogs) + } +} + +func (p *PriceFiller) fullFillTradeLog(tradeLog storage.TradeLog) (storage.TradeLog, error) { + makerPrice, makerUsdAmount, err := p.getPriceAndAmountUsd(tradeLog.MakerToken, tradeLog.MakerTokenAmount, int64(tradeLog.Timestamp)) + if err != nil { + p.l.Errorw("Failed to getPriceAndAmountUsd for maker", "err", err) + return tradeLog, err + } + + tradeLog.MakerTokenPrice = makerPrice + tradeLog.MakerUsdAmount = makerUsdAmount + + takerPrice, takerUsdAmount, err := p.getPriceAndAmountUsd(tradeLog.TakerToken, tradeLog.TakerTokenAmount, int64(tradeLog.Timestamp)) + if err != nil { + p.l.Errorw("Failed to getPriceAndAmountUsd for taker", "err", err) + return tradeLog, err + } + + tradeLog.TakerTokenPrice = takerPrice + tradeLog.TakerUsdAmount = takerUsdAmount + tradeLog.State = storage.TradeLogStateProcessed + + return tradeLog, nil +} + +func (p *PriceFiller) getPriceAndAmountUsd(address, rawAmt string, at int64) (float64, float64, error) { + p.mu.Lock() + coin, ok := p.mappedCoinInfo[address] + p.mu.Unlock() + if ok { + if coin.Decimals == 0 { + d, err := p.getDecimals(address) + if err != nil { + p.l.Errorw("Failed to getDecimals", "err", err, "address", address) + return 0, 0, err + } + coin.Decimals = d + p.mu.Lock() + p.mappedCoinInfo[address] = coin + p.mu.Unlock() + } + + price, err := p.getPrice(coin.Coin, int64(at)) + if err != nil { + if !errors.Is(err, ErrNoPrice) { + p.l.Errorw("Failed to getPrice", "err", err, "coin", coin.Coin, "at", at) + return 0, 0, err + } + } + + return price, calculateAmountUsd(rawAmt, coin.Decimals, price), nil + } + return 0, 0, nil +} + +func (p *PriceFiller) FullFillTradeLogs(tradeLogs []storage.TradeLog) { + for idx, tradeLog := range tradeLogs { + // for the safety, sleep a bit to avoid Binance rate limit + time.Sleep(10 * time.Millisecond) + filledTradeLog, err := p.fullFillTradeLog(tradeLog) + if err != nil { + p.l.Errorw("Failed to fullFillTradeLog", "err", err, "tradeLog", tradeLog) + continue + } + tradeLogs[idx] = filledTradeLog + } +} + +func (p *PriceFiller) getDecimals(address string) (int64, error) { + resp, err := p.ksClient.GetTokenCatalog(address) + if err != nil { + p.l.Errorw("Failed to GetTokenCatalog", "err", err) + return 0, err + } + + if len(resp.Data.Tokens) != 1 { + p.l.Errorw("Weird token catalog response", "resp", resp) + return 0, errors.New("weird token catalog response") + } + + return resp.Data.Tokens[0].Decimals, nil +} diff --git a/pkg/pricefiller/price_fillter_test.go b/pkg/pricefiller/price_fillter_test.go new file mode 100644 index 0000000..59016a2 --- /dev/null +++ b/pkg/pricefiller/price_fillter_test.go @@ -0,0 +1,64 @@ +package pricefiller + +import ( + "testing" + + "github.com/KyberNetwork/go-binance/v2" + "github.com/KyberNetwork/tradelogs/pkg/storage" + "github.com/test-go/testify/require" +) + +// go test -v -timeout 30s -run ^TestFillPrice$ github.com/KyberNetwork/tradelogs/pkg/pricefiller +func TestFillPrice(t *testing.T) { + t.Skip("Need to add Binance credentials") + bClient := binance.NewClient("", "") + filler, err := NewPriceFiller(bClient, nil) + if err != nil { + require.NoError(t, err) + } + + tradeLogs := []storage.TradeLog{ + { + Taker: "0x807cf9a772d5a3f9cefbc1192e939d62f0d9bd38", + MakerToken: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + TakerToken: "0x320623b8e4ff03373931769a31fc52a4e78b5d70", + MakerTokenAmount: "503568522079108960", + TakerTokenAmount: "336970435721651800000000", + ContractAddress: "0x6131b5fae19ea4f9d964eac0408e4408b66337b5", + BlockNumber: 20230391, + TxHash: "0x0aad2e38b90390d6d060a5886ddd20d312c6beb3a305b2360bd6d25593ddf058", + LogIndex: 57, + Timestamp: 1720062815000, + EventHash: "0xd6d4f5681c246c9f42c203e287975af1601f8df8035a9251f79aab5c8f09e2f8", + }, + { + Taker: "0x807cf9a772d5a3f9cefbc1192e939d62f0d9bd38", + MakerToken: "0x7fc66500c84a76ad7e9c93437bfc5ac33e2ddae9", + TakerToken: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + MakerTokenAmount: "179003122862979007021", + TakerTokenAmount: "4688506567482331000", + ContractAddress: "0x6131b5fae19ea4f9d964eac0408e4408b66337b5", + BlockNumber: 20230305, + TxHash: "0x21b8f97e43ff6debbe2ab323f079f0a936c14c16bdb6693587b48a5d66dcc37c", + LogIndex: 136, + Timestamp: 1720061783000, + EventHash: "0xd6d4f5681c246c9f42c203e287975af1601f8df8035a9251f79aab5c8f09e2f8", + }, + { + Taker: "0x807cf9a772d5a3f9cefbc1192e939d62f0d9bd38", + MakerToken: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + TakerToken: "0x514910771af9ca656af840dff83e8264ecf986ca", + MakerTokenAmount: "783414511682884466", + TakerTokenAmount: "190359937738916760000", + ContractAddress: "0x6131b5fae19ea4f9d964eac0408e4408b66337b5", + BlockNumber: 20230291, + TxHash: "0x2ff00bcfa69c85fdbd0c7fc9b193717009751a32cf661c31815d9745deb68552", + LogIndex: 136, + Timestamp: 1720061615000, + EventHash: "0xd6d4f5681c246c9f42c203e287975af1601f8df8035a9251f79aab5c8f09e2f8", + }, + } + filler.FullFillTradeLogs(tradeLogs) + + t.Log(tradeLogs) +} diff --git a/pkg/pricefiller/utils.go b/pkg/pricefiller/utils.go new file mode 100644 index 0000000..a2311c4 --- /dev/null +++ b/pkg/pricefiller/utils.go @@ -0,0 +1,29 @@ +package pricefiller + +import ( + "math/big" + + "github.com/KyberNetwork/tradelogs/pkg/convert" +) + +var aliasCoinMap = map[string]string{ + "WETH": "ETH", + "STETH": "ETH", +} + +func withAlias(coin string) string { + if s, ok := aliasCoinMap[coin]; ok { + return s + } + return coin +} + +// calculateAmountUsd returns raw / (10**decimals) * price +func calculateAmountUsd(raw string, decimals int64, price float64) float64 { + rawAmt, ok := new(big.Int).SetString(raw, 10) + if !ok { + return 0 + } + + return convert.WeiToFloat(rawAmt, decimals) * price +} diff --git a/pkg/pricefiller/utils_test.go b/pkg/pricefiller/utils_test.go new file mode 100644 index 0000000..a25e768 --- /dev/null +++ b/pkg/pricefiller/utils_test.go @@ -0,0 +1,22 @@ +package pricefiller + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func floatEqual(f1, f2 float64) bool { + return math.Abs(f1-f2) < 1e-9 +} + +func TestCalculateAmountUsd(t *testing.T) { + // RSR + usdAmountRSR := calculateAmountUsd("336970435721651800000000", 18, 0.003863) + assert.True(t, floatEqual(usdAmountRSR, 1301.716793192741), usdAmountRSR) + + // WETH + usdtAmountWETH := calculateAmountUsd("503568522079108960", 18, 2600) + assert.True(t, floatEqual(usdtAmountWETH, 1309.2781574056833), usdtAmountWETH) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 904aad4..ef1cdf3 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -67,7 +67,12 @@ func (s *Storage) Insert(orders []TradeLog) error { timestamp=excluded.timestamp, event_hash=excluded.event_hash, maker_traits=excluded.maker_traits, - expiration_date=excluded.expiration_date + expiration_date=excluded.expiration_date, + maker_token_price=excluded.maker_token_price, + taker_token_price=excluded.taker_token_price, + maker_usd_amount=excluded.maker_usd_amount, + taker_usd_amount=excluded.taker_usd_amount, + state=excluded.state `).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) @@ -94,7 +99,7 @@ func (s *Storage) Get(query TradeLogsQuery) ([]TradeLog, error) { types := v.Type() for i := 0; i < v.NumField(); i++ { tag := string(types.Field(i).Tag.Get("form")) - if tag == "from_time" || tag == "to_time" { + if tag == "from_time" || tag == "to_time" || tag == "limit" { continue } if v.Field(i).IsZero() { @@ -102,6 +107,9 @@ func (s *Storage) Get(query TradeLogsQuery) ([]TradeLog, error) { } builder = builder.Where(squirrel.Eq{tag: strings.ToLower(v.Field(i).String())}) } + if query.Limit != 0 { + builder = builder.Limit(query.Limit) + } q, p, err := builder.OrderBy("timestamp DESC").ToSql() if err != nil { return nil, err @@ -148,6 +156,11 @@ func tradelogsColumns() []string { "event_hash", "maker_traits", "expiration_date", + "maker_token_price", + "taker_token_price", + "maker_usd_amount", + "taker_usd_amount", + "state", } } diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index e7a41f1..0f25b30 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -56,6 +56,7 @@ func TestSimple(t *testing.T) { LogIndex: 198, Timestamp: 1671614015000, Expiry: 1719507125, + State: "new", }, }, Success: true, @@ -80,6 +81,7 @@ func TestSimple(t *testing.T) { LogIndex: 202, Timestamp: 1671614111000, Expiry: 1719507125, + State: "new", }, }, Success: true, diff --git a/pkg/storage/types.go b/pkg/storage/types.go index 001d347..647034d 100644 --- a/pkg/storage/types.go +++ b/pkg/storage/types.go @@ -7,23 +7,35 @@ import ( ) type TradeLog struct { - OrderHash string `db:"order_hash" json:"order_hash,omitempty"` - Maker string `db:"maker" json:"maker,omitempty"` - Taker string `db:"taker" json:"taker,omitempty"` - MakerToken string `db:"maker_token" json:"maker_token,omitempty"` - TakerToken string `db:"taker_token" json:"taker_token,omitempty"` - MakerTokenAmount string `db:"maker_token_amount" json:"maker_token_amount,omitempty"` - TakerTokenAmount string `db:"taker_token_amount" json:"taker_token_amount,omitempty"` - ContractAddress string `db:"contract_address" json:"contract_address,omitempty"` - BlockNumber uint64 `db:"block_number" json:"block_number,omitempty"` - TxHash string `db:"tx_hash" json:"tx_hash,omitempty"` - LogIndex uint64 `db:"log_index" json:"log_index,omitempty"` - Timestamp uint64 `db:"timestamp" json:"timestamp,omitempty"` - EventHash string `db:"event_hash" json:"event_hash,omitempty"` - MakerTraits string `db:"maker_traits" json:"maker_traits,omitempty"` - Expiry uint64 `json:"expiration_date" db:"expiration_date"` + OrderHash string `db:"order_hash" json:"order_hash,omitempty"` + Maker string `db:"maker" json:"maker,omitempty"` + Taker string `db:"taker" json:"taker,omitempty"` + MakerToken string `db:"maker_token" json:"maker_token,omitempty"` + TakerToken string `db:"taker_token" json:"taker_token,omitempty"` + MakerTokenAmount string `db:"maker_token_amount" json:"maker_token_amount,omitempty"` + TakerTokenAmount string `db:"taker_token_amount" json:"taker_token_amount,omitempty"` + ContractAddress string `db:"contract_address" json:"contract_address,omitempty"` + BlockNumber uint64 `db:"block_number" json:"block_number,omitempty"` + TxHash string `db:"tx_hash" json:"tx_hash,omitempty"` + LogIndex uint64 `db:"log_index" json:"log_index,omitempty"` + Timestamp uint64 `db:"timestamp" json:"timestamp,omitempty"` + EventHash string `db:"event_hash" json:"event_hash,omitempty"` + MakerTraits string `db:"maker_traits" json:"maker_traits,omitempty"` + Expiry uint64 `db:"expiration_date" json:"expiration_date"` + MakerTokenPrice float64 `db:"maker_token_price" json:"maker_token_price"` + TakerTokenPrice float64 `db:"taker_token_price" json:"taker_token_price"` + MakerUsdAmount float64 `db:"maker_usd_amount" json:"maker_usd_amount"` + TakerUsdAmount float64 `db:"taker_usd_amount" json:"taker_usd_amount"` + State TradeLogState `db:"state" json:"state"` } +type TradeLogState string + +const ( + TradeLogStateNew TradeLogState = "new" + TradeLogStateProcessed TradeLogState = "processed" +) + type TradeLogsQuery struct { FromTime uint64 `form:"from_time" json:"from_time,omitempty" binding:"required"` ToTime uint64 `form:"to_time" json:"to_time,omitempty" binding:"required"` @@ -34,9 +46,15 @@ type TradeLogsQuery struct { TakerToken string `form:"taker_token" json:"taker_token,omitempty"` OrderHash string `form:"order_hash" json:"order_hash,omitempty"` EventHash string `form:"event_hash" json:"event_hash,omitempty"` + State string `form:"state" json:"state,omitempty"` + Limit uint64 `form:"limit" json:"limit,omitempty"` } func (o *TradeLog) Serialize() []interface{} { + // set default state is new + if o.State == "" { + o.State = TradeLogStateNew + } return []interface{}{ o.OrderHash, strings.ToLower(o.Maker), @@ -53,6 +71,11 @@ func (o *TradeLog) Serialize() []interface{} { o.EventHash, o.MakerTraits, o.Expiry, + o.MakerTokenPrice, + o.TakerTokenPrice, + o.MakerUsdAmount, + o.TakerUsdAmount, + o.State, } }