diff --git a/README.md b/README.md index 620138f..448b174 100644 --- a/README.md +++ b/README.md @@ -259,4 +259,44 @@ Example: generate `Storage` interface to a struct name `MockStorage` mockery --dir=v2/pkg/storage/state --name=Storage --output=v2/mocks/ --structname=MockState --filename=State.go ``` -For more information `mockery --help` \ No newline at end of file +For more information `mockery --help` + +## Price-filler +### Refetch token price + +- **URL**: `/price_filler/refetch` +- **Method**: `POST` +- **Description**: Reset taker_token_price, maker_token_price, taker_usd_amount, maker_usd_amount of trades with faulty token. +- **URL Parameters**: + - `address` (string): The token address. + - `exchange` (string): The exchange name. + - `from_ts` (int): The starting timestamp (millisecond). + - `to_ts` (int): The ending timestamp (millisecond). +- **Response**: + - **200 OK**: Success. + ```json + { + "data": { + "token": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", + "exchange": "bebop", + "from": 1734204504800, + "to": 1734809304800, + "number of row updated": 2000 + }, + "success": true + } + ``` + - **400 Bad Request**: If the from_ts or to_ts is invalid. + ```json + { + "error": "", + "success": false + } + ``` + - **500 Internal Server Error**: If there is an error resetting token price. + ```json + { + "error": "", + "success": false + } + ``` \ No newline at end of file diff --git a/v2/internal/server/tradelogs.go b/v2/internal/server/tradelogs.go index 26762c9..574c5b0 100644 --- a/v2/internal/server/tradelogs.go +++ b/v2/internal/server/tradelogs.go @@ -17,6 +17,19 @@ var ( maxTimeRange = uint64(24 * time.Hour.Milliseconds()) ) +const ( + maxRangeEmptyAddress = 7 * 24 * time.Hour + maxRangeNonEmptyAddress = 30 * 24 * time.Hour +) + +type resetTokenPriceParams struct { + Address string `form:"address" json:"address"` + Exchange string `form:"exchange" json:"exchange"` + From int64 `form:"from_ts" json:"from_ts"` + To int64 `form:"to_ts" json:"to_ts"` + Rows int64 `form:"rows" json:"rows"` +} + type TradeLogs struct { r *gin.Engine bindAddr string @@ -65,6 +78,7 @@ func (s *TradeLogs) register() { s.r.POST("/makers", s.addMakerName) s.r.GET("/txorigin", s.getTxOrigin) s.r.POST("/txorigin", s.addTxOrigin) + s.r.POST("/price_filler/refetch", s.resetTokenPriceToRefetch) } func (s *TradeLogs) getTradeLogs(c *gin.Context) { @@ -190,3 +204,74 @@ func (s *TradeLogs) addTxOrigin(c *gin.Context) { "data": queries, }) } + +func (s *TradeLogs) resetTokenPriceToRefetch(c *gin.Context) { + var query resetTokenPriceParams + if err := c.ShouldBindJSON(&query); err != nil { + responseErr(c, http.StatusBadRequest, err) + return + } + + query, err := validateResetTokenPriceParams(query) + if err != nil { + responseErr(c, http.StatusBadRequest, err) + return + } + + for _, storage := range s.storage { + if storage.Exchange() != query.Exchange { + continue + } + rows, err := storage.ResetTokenPriceToRefetch(query.Address, query.From, query.To) + if err != nil { + responseErr(c, http.StatusInternalServerError, err) + return + } + query.Rows = rows + c.JSON(http.StatusOK, gin.H{ + "success": true, + "data": query, + }) + return + } + responseErr(c, http.StatusBadRequest, fmt.Errorf("exchange not found")) +} + +func validateResetTokenPriceParams(query resetTokenPriceParams) (resetTokenPriceParams, error) { + now := time.Now() + if query.Address == "" && query.From == 0 && query.To == 0 { + return query, fmt.Errorf("address is empty and no valid time range provided") + } + + // Validate `From` timestamp: it cannot be in the future or negative. + if query.From > now.UnixMilli() || query.From < 0 { + return query, fmt.Errorf("invalid from_ts") + } + + // Validate `To` timestamp: it can not be negative, + // and if provided, it must be greater than or equal to `From`. + if query.To < 0 || (query.To > 0 && query.To < query.From) { + return query, fmt.Errorf("invalid to_ts") + } + timeRange := maxRangeEmptyAddress + if len(query.Address) > 0 { + timeRange = maxRangeNonEmptyAddress + } + + if query.From == 0 && query.To == 0 { + query.From = now.Add(-timeRange).UnixMilli() + query.To = now.UnixMilli() + return query, nil + } + + // If `From` is provided but `To` is not, calculate `To` as `From + max range` + if query.From > 0 && query.To == 0 { + query.To = min(time.UnixMilli(query.From).Add(timeRange).UnixMilli(), now.UnixMilli()) + return query, nil + } + + // Adjust `From` as `To - max range + query.To = min(query.To, now.UnixMilli()) + query.From = max(time.UnixMilli(query.To).Add(-timeRange).UnixMilli(), query.From) + return query, nil +} diff --git a/v2/mocks/Storage.go b/v2/mocks/Storage.go index 9d88575..dca5fbd 100644 --- a/v2/mocks/Storage.go +++ b/v2/mocks/Storage.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.2. DO NOT EDIT. +// Code generated by mockery v2.50.4. DO NOT EDIT. package mocks @@ -30,7 +30,7 @@ func (_m *MockStorage) Delete(blocks []uint64) error { return r0 } -// Exchange provides a mock function with given fields: +// Exchange provides a mock function with no fields func (_m *MockStorage) Exchange() string { ret := _m.Called() @@ -126,6 +126,34 @@ func (_m *MockStorage) Insert(orders []types.TradeLog) error { return r0 } +// ResetTokenPriceToRefetch provides a mock function with given fields: token, from, to +func (_m *MockStorage) ResetTokenPriceToRefetch(token string, from int64, to int64) (int64, error) { + ret := _m.Called(token, from, to) + + if len(ret) == 0 { + panic("no return value specified for ResetTokenPriceToRefetch") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(string, int64, int64) (int64, error)); ok { + return rf(token, from, to) + } + if rf, ok := ret.Get(0).(func(string, int64, int64) int64); ok { + r0 = rf(token, from, to) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(string, int64, int64) error); ok { + r1 = rf(token, from, to) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewMockStorage creates a new instance of MockStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockStorage(t interface { diff --git a/v2/pkg/storage/tradelogs/bebop/storage.go b/v2/pkg/storage/tradelogs/bebop/storage.go index d0d56dd..d4f87f9 100644 --- a/v2/pkg/storage/tradelogs/bebop/storage.go +++ b/v2/pkg/storage/tradelogs/bebop/storage.go @@ -212,3 +212,36 @@ func tradeLogColumns() []string { "expiration", } } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/hashflow_v3/storage.go b/v2/pkg/storage/tradelogs/hashflow_v3/storage.go index be7d076..073572d 100644 --- a/v2/pkg/storage/tradelogs/hashflow_v3/storage.go +++ b/v2/pkg/storage/tradelogs/hashflow_v3/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/kyberswap/storage.go b/v2/pkg/storage/tradelogs/kyberswap/storage.go index 0a19b67..0c73849 100644 --- a/v2/pkg/storage/tradelogs/kyberswap/storage.go +++ b/v2/pkg/storage/tradelogs/kyberswap/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go b/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go index 8db8a0f..ea430e0 100644 --- a/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go +++ b/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/oneinch_v6/storage.go b/v2/pkg/storage/tradelogs/oneinch_v6/storage.go index 44ab3df..6dd496d 100644 --- a/v2/pkg/storage/tradelogs/oneinch_v6/storage.go +++ b/v2/pkg/storage/tradelogs/oneinch_v6/storage.go @@ -218,3 +218,36 @@ func tradeLogColumns() []string { "expiration", } } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/pancakeswap/storage.go b/v2/pkg/storage/tradelogs/pancakeswap/storage.go index cf193ee..4bbd427 100644 --- a/v2/pkg/storage/tradelogs/pancakeswap/storage.go +++ b/v2/pkg/storage/tradelogs/pancakeswap/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/paraswap/storage.go b/v2/pkg/storage/tradelogs/paraswap/storage.go index fb4cb7e..288c769 100644 --- a/v2/pkg/storage/tradelogs/paraswap/storage.go +++ b/v2/pkg/storage/tradelogs/paraswap/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/types/storage.go b/v2/pkg/storage/tradelogs/types/storage.go index 79043bc..892f398 100644 --- a/v2/pkg/storage/tradelogs/types/storage.go +++ b/v2/pkg/storage/tradelogs/types/storage.go @@ -6,4 +6,5 @@ type Storage interface { GetEmptyPrice(limit uint64) ([]TradeLog, error) Delete(blocks []uint64) error Exchange() string + ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) } diff --git a/v2/pkg/storage/tradelogs/uniswapx/storage.go b/v2/pkg/storage/tradelogs/uniswapx/storage.go index 08274d3..d9c6938 100644 --- a/v2/pkg/storage/tradelogs/uniswapx/storage.go +++ b/v2/pkg/storage/tradelogs/uniswapx/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/zxotc/storage.go b/v2/pkg/storage/tradelogs/zxotc/storage.go index df4f50b..c67c1cc 100644 --- a/v2/pkg/storage/tradelogs/zxotc/storage.go +++ b/v2/pkg/storage/tradelogs/zxotc/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +} diff --git a/v2/pkg/storage/tradelogs/zxrfqv3/storage.go b/v2/pkg/storage/tradelogs/zxrfqv3/storage.go index 527e79e..1f3ccf5 100644 --- a/v2/pkg/storage/tradelogs/zxrfqv3/storage.go +++ b/v2/pkg/storage/tradelogs/zxrfqv3/storage.go @@ -128,3 +128,36 @@ func (s *Storage) Delete(blocks []uint64) error { } return nil } + +func (s *Storage) ResetTokenPriceToRefetch(token string, from, to int64) (int64, error) { + builder := squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar). + Update(s.tableName()). + Set("maker_token_price", nil). + Set("taker_token_price", nil). + Set("maker_usd_amount", nil). + Set("taker_usd_amount", nil) + if token != "" { + builder = builder.Where(squirrel.Or{ + squirrel.Eq{"maker_token": token}, + squirrel.Eq{"taker_token": token}, + }) + } + builder = builder.Where(squirrel.And{ + squirrel.GtOrEq{"timestamp": from}, + squirrel.LtOrEq{"timestamp": to}, + }) + q, p, err := builder.ToSql() + + if err != nil { + return 0, fmt.Errorf("build query error: %w", err) + } + result, err := s.db.Exec(q, p...) + if err != nil { + return 0, fmt.Errorf("run query error: %w", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("fetch rows affected error: %w", err) + } + return rowsAffected, nil +}