From 9345a614b6c02cdf4b4a830b1f960b35b9c3a6a6 Mon Sep 17 00:00:00 2001 From: Nguyen Thuy Linh Date: Tue, 24 Dec 2024 14:56:25 +0700 Subject: [PATCH] Add Expiration Column for RFQ Trades (#112) * add expiration column for rfq trades * handle mtm client nil pointer * update mtm client request --- .../00012_add_expiration_column.up.sql | 13 +++++ v2/pkg/handler/trade_logs.go | 54 ++++++++++-------- v2/pkg/mtm/client.go | 16 +++--- v2/pkg/parser/zxrfqv3/test/expected_rfq.json | 2 +- v2/pkg/storage/tradelogs/bebop/storage.go | 5 +- .../storage/tradelogs/hashflow_v3/storage.go | 26 +-------- v2/pkg/storage/tradelogs/kyberswap/storage.go | 24 +------- .../tradelogs/kyberswap_rfq/storage.go | 26 +-------- .../storage/tradelogs/oneinch_v6/storage.go | 5 +- .../storage/tradelogs/pancakeswap/storage.go | 24 +------- v2/pkg/storage/tradelogs/paraswap/storage.go | 26 +-------- v2/pkg/storage/tradelogs/types/trade_log.go | 57 ++++++++++++++++++- v2/pkg/storage/tradelogs/uniswapx/storage.go | 24 +------- v2/pkg/storage/tradelogs/zxotc/storage.go | 26 +-------- v2/pkg/storage/tradelogs/zxrfqv3/storage.go | 26 +-------- 15 files changed, 125 insertions(+), 229 deletions(-) create mode 100644 v2/cmd/migrations/00012_add_expiration_column.up.sql diff --git a/v2/cmd/migrations/00012_add_expiration_column.up.sql b/v2/cmd/migrations/00012_add_expiration_column.up.sql new file mode 100644 index 0000000..053474f --- /dev/null +++ b/v2/cmd/migrations/00012_add_expiration_column.up.sql @@ -0,0 +1,13 @@ +alter table tradelogs_bebop add column expiration bigint; + +alter table tradelogs_hashflow_v3 add column expiration bigint; + +alter table tradelogs_kyberswap_rfq add column expiration bigint; + +alter table tradelogs_oneinch_v6 add column expiration bigint; + +alter table tradelogs_paraswap add column expiration bigint; + +alter table tradelogs_zerox add column expiration bigint; + +alter table tradelogs_zerox_v3 add column expiration bigint; \ No newline at end of file diff --git a/v2/pkg/handler/trade_logs.go b/v2/pkg/handler/trade_logs.go index 0f4735b..c380fb5 100644 --- a/v2/pkg/handler/trade_logs.go +++ b/v2/pkg/handler/trade_logs.go @@ -89,6 +89,8 @@ func (h *TradeLogHandler) ProcessBlockWithExclusion(blockHash string, blockNumbe func (h *TradeLogHandler) processForTradelog(calls []types.TransactionCallFrame, blockHash string, blockNumber uint64, timestamp uint64, exclusions sets.Set[string]) error { logIndexStart := 0 + var result []storageTypes.TradeLog + for i, call := range calls { logIndexStart = assignLogIndexes(&call.CallFrame, logIndexStart) metadata := logMetadata{ @@ -109,33 +111,39 @@ func (h *TradeLogHandler) processForTradelog(calls []types.TransactionCallFrame, tradeLogs[j].InteractContract = call.CallFrame.To } - err := h.storage.Insert(tradeLogs) + result = append(result, tradeLogs...) + } + + if len(result) == 0 { + return nil + } + + err := h.storage.Insert(result) + if err != nil { + return fmt.Errorf("write to storage error: %w", err) + } + h.l.Infow("successfully insert trade logs", "blockNumber", blockNumber, "number", len(result)) + + passCount, failCount := 0, 0 + for _, log := range result { + msgBytes, err := json.Marshal(kafka.Message{ + Type: kafka.MessageTypeTradeLog, + Data: log, + }) if err != nil { - return fmt.Errorf("write to storage error: %w", err) + h.l.Errorw(" error when marshal trade log to json", "blockNumber", blockNumber, "log", log, "err", err) + failCount++ + continue } - h.l.Infow("successfully insert trade logs", "blockNumber", blockNumber, "number", len(tradeLogs)) - - passCount, failCount := 0, 0 - for _, log := range tradeLogs { - msgBytes, err := json.Marshal(kafka.Message{ - Type: kafka.MessageTypeTradeLog, - Data: log, - }) - if err != nil { - h.l.Errorw(" error when marshal trade log to json", "blockNumber", blockNumber, "log", log, "err", err) - failCount++ - continue - } - err = h.publisher.Publish(h.kafkaTopic, msgBytes) - if err != nil { - h.l.Errorw("error when publish trade log to kafka", "blockNumber", blockNumber, "log", log, "err", err) - failCount++ - continue - } - passCount++ + err = h.publisher.Publish(h.kafkaTopic, msgBytes) + if err != nil { + h.l.Errorw("error when publish trade log to kafka", "blockNumber", blockNumber, "log", log, "err", err) + failCount++ + continue } - h.l.Infow("successfully publish trade logs", "blockNumber", blockNumber, "success", passCount, "fail", failCount) + passCount++ } + h.l.Infow("successfully publish trade logs", "blockNumber", blockNumber, "success", passCount, "fail", failCount) return nil } diff --git a/v2/pkg/mtm/client.go b/v2/pkg/mtm/client.go index 4dbc090..e845075 100644 --- a/v2/pkg/mtm/client.go +++ b/v2/pkg/mtm/client.go @@ -43,8 +43,8 @@ func (m *MtmClient) GetListTokens(ctx context.Context) ([]Token, error) { return nil, fmt.Errorf("new request error: %w", err) } req.Header.Set("Accept", "application/json") - client := &http.Client{} - resp, err := client.Do(req) + + resp, err := m.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("do request error: %w", err) } @@ -74,19 +74,19 @@ func (m *MtmClient) GetHistoricalRate( ""e=" + quote + "&chain_id=" + strconv.FormatInt(chainId, 10) + "&time=" + strconv.FormatInt(ts.Unix(), 10) - req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, m.baseURL+path+params, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, m.baseURL+path+params, nil) if err != nil { return 0, fmt.Errorf("new request error: %w", err) } req.Header.Set("Accept", "application/json") - client := &http.Client{} - resp, err := client.Do(req) - if resp.StatusCode == http.StatusTooManyRequests { // 429 - return 0, ErrRateLimit - } + + resp, err := m.httpClient.Do(req) if err != nil { return 0, fmt.Errorf("do request error: %w", err) } + if resp.StatusCode == http.StatusTooManyRequests { // 429 + return 0, ErrRateLimit + } defer resp.Body.Close() var rate RateV3Response diff --git a/v2/pkg/parser/zxrfqv3/test/expected_rfq.json b/v2/pkg/parser/zxrfqv3/test/expected_rfq.json index eb56764..256c33a 100644 --- a/v2/pkg/parser/zxrfqv3/test/expected_rfq.json +++ b/v2/pkg/parser/zxrfqv3/test/expected_rfq.json @@ -12,5 +12,5 @@ "event_hash": "0x0000000000000000000000000000000000000000000000000000000000000000", "log_index": 380, "timestamp": 1000, - "expiration_date": 1720731029 + "expiration": 1720731029 } \ No newline at end of file diff --git a/v2/pkg/storage/tradelogs/bebop/storage.go b/v2/pkg/storage/tradelogs/bebop/storage.go index c11c7b4..d0d56dd 100644 --- a/v2/pkg/storage/tradelogs/bebop/storage.go +++ b/v2/pkg/storage/tradelogs/bebop/storage.go @@ -68,7 +68,8 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { maker_token_price=excluded.maker_token_price, taker_token_price=excluded.taker_token_price, maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount + taker_usd_amount=excluded.taker_usd_amount, + expiration=excluded.expiration `).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) @@ -179,6 +180,7 @@ func tradeLogSerialize(o *storageTypes.TradeLog) []interface{} { o.TakerTokenPrice, o.MakerUsdAmount, o.TakerUsdAmount, + o.Expiry, } } @@ -207,5 +209,6 @@ func tradeLogColumns() []string { "taker_token_price", "maker_usd_amount", "taker_usd_amount", + "expiration", } } diff --git a/v2/pkg/storage/tradelogs/hashflow_v3/storage.go b/v2/pkg/storage/tradelogs/hashflow_v3/storage.go index d5c8cfd..be7d076 100644 --- a/v2/pkg/storage/tradelogs/hashflow_v3/storage.go +++ b/v2/pkg/storage/tradelogs/hashflow_v3/storage.go @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.RFQTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - maker_token_origin_amount=excluded.maker_token_origin_amount, - taker_token_origin_amount=excluded.taker_token_origin_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/kyberswap/storage.go b/v2/pkg/storage/tradelogs/kyberswap/storage.go index 18f779b..0a19b67 100644 --- a/v2/pkg/storage/tradelogs/kyberswap/storage.go +++ b/v2/pkg/storage/tradelogs/kyberswap/storage.go @@ -44,29 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.CommonTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.CommonTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go b/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go index 8e8cb58..8db8a0f 100644 --- a/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go +++ b/v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.RFQTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - maker_token_origin_amount=excluded.maker_token_origin_amount, - taker_token_origin_amount=excluded.taker_token_origin_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/oneinch_v6/storage.go b/v2/pkg/storage/tradelogs/oneinch_v6/storage.go index 8fa47c5..44ab3df 100644 --- a/v2/pkg/storage/tradelogs/oneinch_v6/storage.go +++ b/v2/pkg/storage/tradelogs/oneinch_v6/storage.go @@ -69,7 +69,8 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { maker_usd_amount=excluded.maker_usd_amount, taker_usd_amount=excluded.taker_usd_amount, maker_traits=excluded.maker_traits, - type=excluded.type + type=excluded.type, + expiration=excluded.expiration `).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) @@ -184,6 +185,7 @@ func tradeLogSerialize(o *storageTypes.TradeLog) []interface{} { o.TakerUsdAmount, o.MakerTraits, o.Type, + o.Expiry, } } @@ -213,5 +215,6 @@ func tradeLogColumns() []string { "taker_usd_amount", "maker_traits", "type", + "expiration", } } diff --git a/v2/pkg/storage/tradelogs/pancakeswap/storage.go b/v2/pkg/storage/tradelogs/pancakeswap/storage.go index 3a317c0..cf193ee 100644 --- a/v2/pkg/storage/tradelogs/pancakeswap/storage.go +++ b/v2/pkg/storage/tradelogs/pancakeswap/storage.go @@ -44,29 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.CommonTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.CommonTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/paraswap/storage.go b/v2/pkg/storage/tradelogs/paraswap/storage.go index fb5ab69..fb4cb7e 100644 --- a/v2/pkg/storage/tradelogs/paraswap/storage.go +++ b/v2/pkg/storage/tradelogs/paraswap/storage.go @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.RFQTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - maker_token_origin_amount=excluded.maker_token_origin_amount, - taker_token_origin_amount=excluded.taker_token_origin_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/types/trade_log.go b/v2/pkg/storage/tradelogs/types/trade_log.go index 1de1296..784668e 100644 --- a/v2/pkg/storage/tradelogs/types/trade_log.go +++ b/v2/pkg/storage/tradelogs/types/trade_log.go @@ -27,7 +27,7 @@ type TradeLog struct { TxOrigin string `db:"tx_origin" json:"tx_origin,omitempty"` InteractContract string `db:"interact_contract" json:"interact_contract,omitempty"` MakerTraits json.RawMessage `db:"maker_traits" json:"maker_traits,omitempty"` - Expiry uint64 `db:"expiration_date" json:"expiration_date"` + Expiry uint64 `db:"expiration" json:"expiration,omitempty"` MakerTokenPrice *float64 `db:"maker_token_price" json:"maker_token_price"` TakerTokenPrice *float64 `db:"taker_token_price" json:"taker_token_price"` MakerUsdAmount *float64 `db:"maker_usd_amount" json:"maker_usd_amount"` @@ -88,6 +88,31 @@ func CommonTradeLogColumns() []string { } } +func CommonTradeLogSuffix() string { + return `ON CONFLICT(block_number, log_index) DO UPDATE + SET + order_hash=excluded.order_hash, + maker=excluded.maker, + taker=excluded.taker, + maker_token=excluded.maker_token, + taker_token=excluded.taker_token, + maker_token_amount=excluded.maker_token_amount, + taker_token_amount=excluded.taker_token_amount, + contract_address=excluded.contract_address, + block_number=excluded.block_number, + tx_hash=excluded.tx_hash, + log_index=excluded.log_index, + timestamp=excluded.timestamp, + event_hash=excluded.event_hash, + tx_origin=excluded.tx_origin, + message_sender=excluded.message_sender, + interact_contract=excluded.interact_contract, + maker_token_price=excluded.maker_token_price, + taker_token_price=excluded.taker_token_price, + maker_usd_amount=excluded.maker_usd_amount, + taker_usd_amount=excluded.taker_usd_amount` +} + // RFQTradeLogSerialize used for exchanges supporting RFQ trades and partial fill func RFQTradeLogSerialize(o *TradeLog) []interface{} { return []interface{}{ @@ -113,6 +138,7 @@ func RFQTradeLogSerialize(o *TradeLog) []interface{} { o.TakerTokenPrice, o.MakerUsdAmount, o.TakerUsdAmount, + o.Expiry, } } @@ -141,5 +167,34 @@ func RFQTradeLogColumns() []string { "taker_token_price", "maker_usd_amount", "taker_usd_amount", + "expiration", } } + +func RFQTradeLogSuffix() string { + return `ON CONFLICT(block_number, log_index) DO UPDATE + SET + order_hash=excluded.order_hash, + maker=excluded.maker, + taker=excluded.taker, + maker_token=excluded.maker_token, + taker_token=excluded.taker_token, + maker_token_amount=excluded.maker_token_amount, + taker_token_amount=excluded.taker_token_amount, + maker_token_origin_amount=excluded.maker_token_origin_amount, + taker_token_origin_amount=excluded.taker_token_origin_amount, + contract_address=excluded.contract_address, + block_number=excluded.block_number, + tx_hash=excluded.tx_hash, + log_index=excluded.log_index, + timestamp=excluded.timestamp, + event_hash=excluded.event_hash, + tx_origin=excluded.tx_origin, + message_sender=excluded.message_sender, + interact_contract=excluded.interact_contract, + maker_token_price=excluded.maker_token_price, + taker_token_price=excluded.taker_token_price, + maker_usd_amount=excluded.maker_usd_amount, + taker_usd_amount=excluded.taker_usd_amount, + expiration=excluded.expiration` +} diff --git a/v2/pkg/storage/tradelogs/uniswapx/storage.go b/v2/pkg/storage/tradelogs/uniswapx/storage.go index 59533ad..08274d3 100644 --- a/v2/pkg/storage/tradelogs/uniswapx/storage.go +++ b/v2/pkg/storage/tradelogs/uniswapx/storage.go @@ -44,29 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.CommonTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.CommonTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/zxotc/storage.go b/v2/pkg/storage/tradelogs/zxotc/storage.go index 4197ac9..df4f50b 100644 --- a/v2/pkg/storage/tradelogs/zxotc/storage.go +++ b/v2/pkg/storage/tradelogs/zxotc/storage.go @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.RFQTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - maker_token_origin_amount=excluded.maker_token_origin_amount, - taker_token_origin_amount=excluded.taker_token_origin_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err diff --git a/v2/pkg/storage/tradelogs/zxrfqv3/storage.go b/v2/pkg/storage/tradelogs/zxrfqv3/storage.go index 798af1d..527e79e 100644 --- a/v2/pkg/storage/tradelogs/zxrfqv3/storage.go +++ b/v2/pkg/storage/tradelogs/zxrfqv3/storage.go @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error { storageTypes.RFQTradeLogSerialize(&order)..., ) } - q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE - SET - order_hash=excluded.order_hash, - maker=excluded.maker, - taker=excluded.taker, - maker_token=excluded.maker_token, - taker_token=excluded.taker_token, - maker_token_amount=excluded.maker_token_amount, - taker_token_amount=excluded.taker_token_amount, - maker_token_origin_amount=excluded.maker_token_origin_amount, - taker_token_origin_amount=excluded.taker_token_origin_amount, - contract_address=excluded.contract_address, - block_number=excluded.block_number, - tx_hash=excluded.tx_hash, - log_index=excluded.log_index, - timestamp=excluded.timestamp, - event_hash=excluded.event_hash, - tx_origin=excluded.tx_origin, - message_sender=excluded.message_sender, - interact_contract=excluded.interact_contract, - maker_token_price=excluded.maker_token_price, - taker_token_price=excluded.taker_token_price, - maker_usd_amount=excluded.maker_usd_amount, - taker_usd_amount=excluded.taker_usd_amount - `).ToSql() + q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql() if err != nil { s.l.Errorw("Error build insert", "error", err) return err