Skip to content

Commit

Permalink
Merge pull request #38 from KyberNetwork/TRD-251-tradelogs-allow-regi…
Browse files Browse the repository at this point in the history
…ster-webhook-on-new-trade

Trd 251 tradelogs allow register webhook on new trade
  • Loading branch information
ngocthanh1389 authored Apr 12, 2024
2 parents dac2d5a + e4ecca5 commit a80ec8c
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 40 deletions.
8 changes: 6 additions & 2 deletions cmd/tradelogs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func run(c *cli.Context) error {
oneinchv6.MustNewParser(traceCalls),
}

w, err := worker.New(l, s, listener, parsers...)
tradeLogChan := make(chan storage.TradeLog, 1000)
w, err := worker.New(l, s, listener, tradeLogChan, parsers...)
if err != nil {
l.Errorw("Error while init worker")
return err
Expand All @@ -126,7 +127,10 @@ func run(c *cli.Context) error {
}
}()

httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name))
bc := tradelogs.NewBroadcaster(tradeLogChan)
go bc.BroadcastLog()
go bc.CheckDisconnect()
httpTradelogs := tradelogs.New(l, s, c.String(libapp.HTTPServerFlag.Name), bc)
go func() {
if err := httpTradelogs.Run(); err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions cmd/tradelogs/migrations/00004_add_maker_trait.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE tradelogs ADD maker_traits TEXT NOT NULL DEFAULT '';
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.21.1
require (
cloud.google.com/go/bigquery v1.56.0
github.com/KyberNetwork/cclog v1.1.0
github.com/KyberNetwork/tradinglib v0.4.19
github.com/TheZeroSlave/zapsentry v1.20.2
github.com/ethereum/go-ethereum v1.13.14
github.com/gammazero/workerpool v1.1.3
github.com/getsentry/sentry-go v0.26.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
Expand All @@ -34,7 +34,6 @@ require (
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/bits-and-blooms/bitset v1.13.0 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand All @@ -48,7 +47,6 @@ require (
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gammazero/deque v0.2.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down Expand Up @@ -107,7 +105,7 @@ require (
github.com/go-playground/validator/v10 v10.17.0 // indirect
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/KyberNetwork/cclog v1.1.0 h1:3gqKpSayABuTjS4J7H8qcQHrlfyggpjmHUnCYQT0
github.com/KyberNetwork/cclog v1.1.0/go.mod h1:vf9+yocFGEyqqObn4Gr9rtj4ffnuKVpKWiSCtfsxIHg=
github.com/KyberNetwork/evmlistener v0.4.7 h1:SlJwzqngj2N2ot/1M391GbwE/A6wyKfV9z1RIlxZpyI=
github.com/KyberNetwork/evmlistener v0.4.7/go.mod h1:7ylrHTrF9bRJRcVdw02f4P2k3ohc1/gKGQ3HgaYtfE8=
github.com/KyberNetwork/tradinglib v0.4.19 h1:pVyRacHZY9xMhEiJHjVPKVhscdSQjoG2pIR291FnKSo=
github.com/KyberNetwork/tradinglib v0.4.19/go.mod h1:HwQjz6Iv2Nn/cQYnToYCZytjhzxLHs60JMXLeiZKcvc=
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=
github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
Expand Down Expand Up @@ -155,10 +157,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff h1:tY80oXqGNY4FhTFhk+o9oFHGINQ/+vhlm8HFzi6znCI=
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww=
Expand Down Expand Up @@ -465,6 +463,8 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
Expand Down
87 changes: 87 additions & 0 deletions internal/server/tradelogs/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package server

import (
"fmt"
"sync"
"time"

"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/gorilla/websocket"
"github.com/rs/xid"
"go.uber.org/zap"
)

type Con struct {
id string
ws *websocket.Conn
eventHash string
maker string
}

type Broadcaster struct {
mu sync.Mutex
l *zap.SugaredLogger
clients map[string]map[string]Con
tradeLogChan chan storage.TradeLog
}

func NewBroadcaster(tradeChan chan storage.TradeLog) *Broadcaster {
return &Broadcaster{
l: zap.S(),
clients: make(map[string]map[string]Con),
tradeLogChan: tradeChan,
}
}

func (b *Broadcaster) BroadcastLog() {
for log := range b.tradeLogChan {
b.mu.Lock()
cons := b.clients[combine(log.EventHash, log.Maker)]
for _, c := range cons {
if err := c.ws.WriteJSON(log); err != nil {
b.l.Errorw("error when send msg", "err", err)
}
}
b.mu.Unlock()
}
}

func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) {
id := xid.New().String()
b.l.Infow("connected socket", "id", id)
b.mu.Lock()
cons, ok := b.clients[combine(event, maker)]
if !ok {
cons = map[string]Con{}
}
cons[id] = Con{
id: id,
ws: conn,
maker: maker,
eventHash: event,
}
b.clients[combine(event, maker)] = cons
b.mu.Unlock()
}

func combine(event, maker string) string {
return fmt.Sprintf("%s-%s", event, maker)
}

func (b *Broadcaster) CheckDisconnect() {
for {
b.mu.Lock()
for _, cons := range b.clients {
for id, c := range cons {
if _, _, err := c.ws.ReadMessage(); err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseNoStatusReceived) {
b.l.Infow("socket is closed", "id", id)
delete(cons, id)
}
}
}
}
b.mu.Unlock()
time.Sleep(time.Minute)
}
}
40 changes: 37 additions & 3 deletions internal/server/tradelogs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,39 @@ import (
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)

var (
maxTimeRange uint64 = uint64(7 * 24 * time.Hour.Milliseconds())
wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Allow connections from any Origin
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)

type RegisterRequest struct {
EventHash string `form:"event_hash"`
Maker string `form:"maker"`
}

// Server to serve the service.
type Server struct {
r *gin.Engine
bindAddr string
l *zap.SugaredLogger
s *storage.Storage
bc *Broadcaster
}

// New returns a new server.
func New(l *zap.SugaredLogger, s *storage.Storage, bindAddr string) *Server {
func New(l *zap.SugaredLogger, s *storage.Storage, bindAddr string,
bc *Broadcaster) *Server {
engine := gin.New()
engine.Use(gin.Recovery())

Expand All @@ -34,6 +50,7 @@ func New(l *zap.SugaredLogger, s *storage.Storage, bindAddr string) *Server {
bindAddr: bindAddr,
l: l,
s: s,
bc: bc,
}

gin.SetMode(gin.ReleaseMode)
Expand All @@ -54,14 +71,14 @@ func (s *Server) Run() error {
func (s *Server) register() {
pprof.Register(s.r, "/debug")
s.r.GET("/tradelogs", s.getTradeLogs)

s.r.GET("/eventlogws", s.registerEventLogWS)
}

func responseErr(c *gin.Context, status int, err error) {
c.JSON(http.StatusBadRequest, gin.H{
"success": false,
"error": err.Error(),
"status": status,
"status": status,
})
}

Expand All @@ -88,3 +105,20 @@ func (s *Server) getTradeLogs(c *gin.Context) {
"data": data,
})
}

func (s *Server) registerEventLogWS(c *gin.Context) {
var param RegisterRequest
if err := c.BindQuery(&param); err != nil {
responseErr(c, http.StatusBadRequest, err)
return
}

s.l.Infow("receive ws", "param", param)
conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
s.l.Errorw("Failed to set websocket upgrade", "error", err)
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws"))
return
}
s.bc.addConn(param.EventHash, param.Maker, conn)
}
53 changes: 33 additions & 20 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,43 @@ type EVMLog struct {
}

type Worker struct {
listener *evmlistenerclient.Client
l *zap.SugaredLogger
s *storage.Storage
p map[string]parser.Parser
errLogs lru.BasicLRU[string, EVMLog]
listener *evmlistenerclient.Client
l *zap.SugaredLogger
s *storage.Storage
p map[string]parser.Parser
errLogs lru.BasicLRU[string, EVMLog]
tradeLogChan chan storage.TradeLog
}

func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, parsers ...parser.Parser) (*Worker, error) {
func New(l *zap.SugaredLogger, s *storage.Storage, listener *evmlistenerclient.Client, tradeLogChan chan storage.TradeLog,
parsers ...parser.Parser) (*Worker, error) {
p := make(map[string]parser.Parser)
for _, ps := range parsers {
for _, topic := range ps.Topics() {
p[topic] = ps
}
}
return &Worker{
listener: listener,
l: l,
s: s,
p: p,
errLogs: lru.NewBasicLRU[string, EVMLog](1000),
listener: listener,
l: l,
s: s,
p: p,
errLogs: lru.NewBasicLRU[string, EVMLog](1000),
tradeLogChan: tradeLogChan,
}, nil
}

func (w *Worker) Run(ctx context.Context) error {
retryTimer := time.NewTicker(evmlistenerclient.BlockTime)
for {
select {
case <-retryTimer.C:
if err := w.retryParseLog(); err != nil {
w.l.Errorw("error when retry parse log", "err", err)
return err
}
default:
}
m, err := w.listener.GConsume(ctx)
if err != nil {
w.l.Errorw("Error while consume in group")
Expand All @@ -63,14 +74,6 @@ func (w *Worker) Run(ctx context.Context) error {
w.l.Errorw("Error when ack msg", "error", err)
return err
}
select {
case <-retryTimer.C:
if err := w.retryParseLog(); err != nil {
w.l.Errorw("error when retry parse log", "err", err)
return err
}
default:
}
}
}
func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
Expand Down Expand Up @@ -119,14 +122,19 @@ func (w *Worker) processMessages(m []evmlistenerclient.Message) error {
if err := w.s.Insert(insertOrders); err != nil {
return err
}
for _, log := range insertOrders {
w.tradeLogChan <- log
}
}

return nil
}

func (w *Worker) retryParseLog() error {
insertOrders := []storage.TradeLog{}
for _, k := range w.errLogs.Keys() {
keys := w.errLogs.Keys()
w.l.Infow("start retry logs", "len", len(keys))
for _, k := range keys {
l, ok := w.errLogs.Peek(k)
if !ok {
continue
Expand All @@ -137,8 +145,10 @@ func (w *Worker) retryParseLog() error {
}
order, err := ps.Parse(convert.ToETHLog(l.log), l.ts)
if err != nil {
w.l.Errorw("error when retry log", "log", l.log, "err", err)
continue
}

w.l.Infow("retry log successfully", "key", k, "parser", ps.Exchange())
w.errLogs.Remove(k)
insertOrders = append(insertOrders, order)
Expand All @@ -147,5 +157,8 @@ func (w *Worker) retryParseLog() error {
if err := w.s.Insert(insertOrders); err != nil {
return err
}
for _, log := range insertOrders {
w.tradeLogChan <- log
}
return nil
}
Loading

0 comments on commit a80ec8c

Please sign in to comment.