Skip to content

Commit

Permalink
Add Expiration Column for RFQ Trades (#112)
Browse files Browse the repository at this point in the history
* add expiration column for rfq trades

* handle mtm client nil pointer

* update mtm client request
  • Loading branch information
linhnt3400 authored Dec 24, 2024
1 parent b8af2cd commit 9345a61
Show file tree
Hide file tree
Showing 15 changed files with 125 additions and 229 deletions.
13 changes: 13 additions & 0 deletions v2/cmd/migrations/00012_add_expiration_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
alter table tradelogs_bebop add column expiration bigint;

alter table tradelogs_hashflow_v3 add column expiration bigint;

alter table tradelogs_kyberswap_rfq add column expiration bigint;

alter table tradelogs_oneinch_v6 add column expiration bigint;

alter table tradelogs_paraswap add column expiration bigint;

alter table tradelogs_zerox add column expiration bigint;

alter table tradelogs_zerox_v3 add column expiration bigint;
54 changes: 31 additions & 23 deletions v2/pkg/handler/trade_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (h *TradeLogHandler) ProcessBlockWithExclusion(blockHash string, blockNumbe

func (h *TradeLogHandler) processForTradelog(calls []types.TransactionCallFrame, blockHash string, blockNumber uint64, timestamp uint64, exclusions sets.Set[string]) error {
logIndexStart := 0
var result []storageTypes.TradeLog

for i, call := range calls {
logIndexStart = assignLogIndexes(&call.CallFrame, logIndexStart)
metadata := logMetadata{
Expand All @@ -109,33 +111,39 @@ func (h *TradeLogHandler) processForTradelog(calls []types.TransactionCallFrame,
tradeLogs[j].InteractContract = call.CallFrame.To
}

err := h.storage.Insert(tradeLogs)
result = append(result, tradeLogs...)
}

if len(result) == 0 {
return nil
}

err := h.storage.Insert(result)
if err != nil {
return fmt.Errorf("write to storage error: %w", err)
}
h.l.Infow("successfully insert trade logs", "blockNumber", blockNumber, "number", len(result))

passCount, failCount := 0, 0
for _, log := range result {
msgBytes, err := json.Marshal(kafka.Message{
Type: kafka.MessageTypeTradeLog,
Data: log,
})
if err != nil {
return fmt.Errorf("write to storage error: %w", err)
h.l.Errorw(" error when marshal trade log to json", "blockNumber", blockNumber, "log", log, "err", err)
failCount++
continue
}
h.l.Infow("successfully insert trade logs", "blockNumber", blockNumber, "number", len(tradeLogs))

passCount, failCount := 0, 0
for _, log := range tradeLogs {
msgBytes, err := json.Marshal(kafka.Message{
Type: kafka.MessageTypeTradeLog,
Data: log,
})
if err != nil {
h.l.Errorw(" error when marshal trade log to json", "blockNumber", blockNumber, "log", log, "err", err)
failCount++
continue
}
err = h.publisher.Publish(h.kafkaTopic, msgBytes)
if err != nil {
h.l.Errorw("error when publish trade log to kafka", "blockNumber", blockNumber, "log", log, "err", err)
failCount++
continue
}
passCount++
err = h.publisher.Publish(h.kafkaTopic, msgBytes)
if err != nil {
h.l.Errorw("error when publish trade log to kafka", "blockNumber", blockNumber, "log", log, "err", err)
failCount++
continue
}
h.l.Infow("successfully publish trade logs", "blockNumber", blockNumber, "success", passCount, "fail", failCount)
passCount++
}
h.l.Infow("successfully publish trade logs", "blockNumber", blockNumber, "success", passCount, "fail", failCount)

return nil
}
Expand Down
16 changes: 8 additions & 8 deletions v2/pkg/mtm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (m *MtmClient) GetListTokens(ctx context.Context) ([]Token, error) {
return nil, fmt.Errorf("new request error: %w", err)
}
req.Header.Set("Accept", "application/json")
client := &http.Client{}
resp, err := client.Do(req)

resp, err := m.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do request error: %w", err)
}
Expand Down Expand Up @@ -74,19 +74,19 @@ func (m *MtmClient) GetHistoricalRate(
"&quote=" + quote +
"&chain_id=" + strconv.FormatInt(chainId, 10) +
"&time=" + strconv.FormatInt(ts.Unix(), 10)
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, m.baseURL+path+params, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, m.baseURL+path+params, nil)
if err != nil {
return 0, fmt.Errorf("new request error: %w", err)
}
req.Header.Set("Accept", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if resp.StatusCode == http.StatusTooManyRequests { // 429
return 0, ErrRateLimit
}

resp, err := m.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("do request error: %w", err)
}
if resp.StatusCode == http.StatusTooManyRequests { // 429
return 0, ErrRateLimit
}
defer resp.Body.Close()

var rate RateV3Response
Expand Down
2 changes: 1 addition & 1 deletion v2/pkg/parser/zxrfqv3/test/expected_rfq.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"event_hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"log_index": 380,
"timestamp": 1000,
"expiration_date": 1720731029
"expiration": 1720731029
}
5 changes: 4 additions & 1 deletion v2/pkg/storage/tradelogs/bebop/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
taker_usd_amount=excluded.taker_usd_amount,
expiration=excluded.expiration
`).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
Expand Down Expand Up @@ -179,6 +180,7 @@ func tradeLogSerialize(o *storageTypes.TradeLog) []interface{} {
o.TakerTokenPrice,
o.MakerUsdAmount,
o.TakerUsdAmount,
o.Expiry,
}
}

Expand Down Expand Up @@ -207,5 +209,6 @@ func tradeLogColumns() []string {
"taker_token_price",
"maker_usd_amount",
"taker_usd_amount",
"expiration",
}
}
26 changes: 1 addition & 25 deletions v2/pkg/storage/tradelogs/hashflow_v3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
storageTypes.RFQTradeLogSerialize(&order)...,
)
}
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
order_hash=excluded.order_hash,
maker=excluded.maker,
taker=excluded.taker,
maker_token=excluded.maker_token,
taker_token=excluded.taker_token,
maker_token_amount=excluded.maker_token_amount,
taker_token_amount=excluded.taker_token_amount,
maker_token_origin_amount=excluded.maker_token_origin_amount,
taker_token_origin_amount=excluded.taker_token_origin_amount,
contract_address=excluded.contract_address,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
log_index=excluded.log_index,
timestamp=excluded.timestamp,
event_hash=excluded.event_hash,
tx_origin=excluded.tx_origin,
message_sender=excluded.message_sender,
interact_contract=excluded.interact_contract,
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
`).ToSql()
q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
Expand Down
24 changes: 1 addition & 23 deletions v2/pkg/storage/tradelogs/kyberswap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
storageTypes.CommonTradeLogSerialize(&order)...,
)
}
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
order_hash=excluded.order_hash,
maker=excluded.maker,
taker=excluded.taker,
maker_token=excluded.maker_token,
taker_token=excluded.taker_token,
maker_token_amount=excluded.maker_token_amount,
taker_token_amount=excluded.taker_token_amount,
contract_address=excluded.contract_address,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
log_index=excluded.log_index,
timestamp=excluded.timestamp,
event_hash=excluded.event_hash,
tx_origin=excluded.tx_origin,
message_sender=excluded.message_sender,
interact_contract=excluded.interact_contract,
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
`).ToSql()
q, p, err := b.Suffix(storageTypes.CommonTradeLogSuffix()).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
Expand Down
26 changes: 1 addition & 25 deletions v2/pkg/storage/tradelogs/kyberswap_rfq/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
storageTypes.RFQTradeLogSerialize(&order)...,
)
}
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
order_hash=excluded.order_hash,
maker=excluded.maker,
taker=excluded.taker,
maker_token=excluded.maker_token,
taker_token=excluded.taker_token,
maker_token_amount=excluded.maker_token_amount,
taker_token_amount=excluded.taker_token_amount,
maker_token_origin_amount=excluded.maker_token_origin_amount,
taker_token_origin_amount=excluded.taker_token_origin_amount,
contract_address=excluded.contract_address,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
log_index=excluded.log_index,
timestamp=excluded.timestamp,
event_hash=excluded.event_hash,
tx_origin=excluded.tx_origin,
message_sender=excluded.message_sender,
interact_contract=excluded.interact_contract,
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
`).ToSql()
q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
Expand Down
5 changes: 4 additions & 1 deletion v2/pkg/storage/tradelogs/oneinch_v6/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount,
maker_traits=excluded.maker_traits,
type=excluded.type
type=excluded.type,
expiration=excluded.expiration
`).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
Expand Down Expand Up @@ -184,6 +185,7 @@ func tradeLogSerialize(o *storageTypes.TradeLog) []interface{} {
o.TakerUsdAmount,
o.MakerTraits,
o.Type,
o.Expiry,
}
}

Expand Down Expand Up @@ -213,5 +215,6 @@ func tradeLogColumns() []string {
"taker_usd_amount",
"maker_traits",
"type",
"expiration",
}
}
24 changes: 1 addition & 23 deletions v2/pkg/storage/tradelogs/pancakeswap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
storageTypes.CommonTradeLogSerialize(&order)...,
)
}
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
order_hash=excluded.order_hash,
maker=excluded.maker,
taker=excluded.taker,
maker_token=excluded.maker_token,
taker_token=excluded.taker_token,
maker_token_amount=excluded.maker_token_amount,
taker_token_amount=excluded.taker_token_amount,
contract_address=excluded.contract_address,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
log_index=excluded.log_index,
timestamp=excluded.timestamp,
event_hash=excluded.event_hash,
tx_origin=excluded.tx_origin,
message_sender=excluded.message_sender,
interact_contract=excluded.interact_contract,
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
`).ToSql()
q, p, err := b.Suffix(storageTypes.CommonTradeLogSuffix()).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
Expand Down
26 changes: 1 addition & 25 deletions v2/pkg/storage/tradelogs/paraswap/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,7 @@ func (s *Storage) Insert(orders []storageTypes.TradeLog) error {
storageTypes.RFQTradeLogSerialize(&order)...,
)
}
q, p, err := b.Suffix(`ON CONFLICT(block_number, log_index) DO UPDATE
SET
order_hash=excluded.order_hash,
maker=excluded.maker,
taker=excluded.taker,
maker_token=excluded.maker_token,
taker_token=excluded.taker_token,
maker_token_amount=excluded.maker_token_amount,
taker_token_amount=excluded.taker_token_amount,
maker_token_origin_amount=excluded.maker_token_origin_amount,
taker_token_origin_amount=excluded.taker_token_origin_amount,
contract_address=excluded.contract_address,
block_number=excluded.block_number,
tx_hash=excluded.tx_hash,
log_index=excluded.log_index,
timestamp=excluded.timestamp,
event_hash=excluded.event_hash,
tx_origin=excluded.tx_origin,
message_sender=excluded.message_sender,
interact_contract=excluded.interact_contract,
maker_token_price=excluded.maker_token_price,
taker_token_price=excluded.taker_token_price,
maker_usd_amount=excluded.maker_usd_amount,
taker_usd_amount=excluded.taker_usd_amount
`).ToSql()
q, p, err := b.Suffix(storageTypes.RFQTradeLogSuffix()).ToSql()
if err != nil {
s.l.Errorw("Error build insert", "error", err)
return err
Expand Down
Loading

0 comments on commit 9345a61

Please sign in to comment.