From 35f2025d0465e0896c9795ee6b99cd7ac850db66 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 31 Oct 2024 23:36:26 +0900 Subject: [PATCH] feat: support all providers --- node/pkg/common/types/types.go | 4 ++ .../providers/bithumb/utils.go | 17 ++++---- .../providers/coinbase/coinbase.go | 6 ++- .../providers/coinbase/utils.go | 32 ++++++++------- .../providers/coinex/utils.go | 17 ++++---- .../providers/coinone/coinone.go | 7 +++- .../providers/coinone/utils.go | 29 +++++++------ .../providers/crypto/utils.go | 18 ++++---- .../providers/gateio/gateio.go | 8 +++- .../providers/gateio/utils.go | 28 ++++++++----- .../providers/gemini/utils.go | 41 +++++++++++-------- .../websocketfetcher/providers/gopax/gopax.go | 7 +++- .../websocketfetcher/providers/gopax/utils.go | 35 ++++++++-------- .../websocketfetcher/providers/huobi/huobi.go | 6 ++- .../websocketfetcher/providers/huobi/utils.go | 27 +++++++----- .../providers/korbit/korbit.go | 7 +++- .../providers/korbit/utils.go | 32 +++++++++------ .../providers/kraken/utils.go | 18 ++++---- .../providers/kucoin/kucoin.go | 10 ++++- .../providers/kucoin/utils.go | 25 +++++++---- .../websocketfetcher/providers/lbank/lbank.go | 6 ++- .../websocketfetcher/providers/lbank/utils.go | 25 ++++++----- .../websocketfetcher/providers/mexc/utils.go | 20 +++++---- .../websocketfetcher/providers/okx/utils.go | 20 +++++---- .../providers/orangex/orangex.go | 28 +++++++------ .../websocketfetcher/providers/upbit/upbit.go | 8 +++- .../websocketfetcher/providers/upbit/utils.go | 25 +++++++---- .../websocketfetcher/providers/xt/utils.go | 24 +++++++---- node/pkg/websocketfetcher/providers/xt/xt.go | 7 +++- 29 files changed, 328 insertions(+), 209 deletions(-) diff --git a/node/pkg/common/types/types.go b/node/pkg/common/types/types.go index 35a797d8a..2852fd35b 100644 --- a/node/pkg/common/types/types.go +++ b/node/pkg/common/types/types.go @@ -86,6 +86,10 @@ func (m *LatestFeedDataMap) SetLatestFeedData(feedData []*FeedData) error { m.Mu.Lock() defer m.Mu.Unlock() for _, data := range feedData { + if data == nil { + continue + } + prev, ok := m.FeedDataMap[data.FeedID] if ok && prev.Timestamp.After(*data.Timestamp) { continue diff --git a/node/pkg/websocketfetcher/providers/bithumb/utils.go b/node/pkg/websocketfetcher/providers/bithumb/utils.go index e01e5f2e1..19dfd9f31 100644 --- a/node/pkg/websocketfetcher/providers/bithumb/utils.go +++ b/node/pkg/websocketfetcher/providers/bithumb/utils.go @@ -14,7 +14,7 @@ const dateLayout = "20060102" const timeLayout = "150405" // currently not referenced since Transaction api does not support volume data -func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[string]int32) ([]*common.FeedData, error) { +func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[string][]int32) ([]*common.FeedData, error) { feedData := []*common.FeedData{} loc, err := time.LoadLocation("Asia/Seoul") if err != nil { @@ -41,18 +41,21 @@ func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[str splitted := strings.Split(transaction.Symbol, "_") symbol := splitted[0] + "-" + splitted[1] - id, exists := feedMap[symbol] + ids, exists := feedMap[symbol] if !exists { log.Warn().Str("Player", "bithumb").Str("symbol", symbol).Msg("feed not found") continue } - feedData = append(feedData, &common.FeedData{ - FeedID: id, - Value: price, - Timestamp: ×tamp, - }) + for _, id := range ids { + feedData = append(feedData, &common.FeedData{ + FeedID: id, + Value: price, + Timestamp: ×tamp, + }) + } } + return feedData, nil } diff --git a/node/pkg/websocketfetcher/providers/coinbase/coinbase.go b/node/pkg/websocketfetcher/providers/coinbase/coinbase.go index ee6e4bed6..48d29747f 100644 --- a/node/pkg/websocketfetcher/providers/coinbase/coinbase.go +++ b/node/pkg/websocketfetcher/providers/coinbase/coinbase.go @@ -53,12 +53,14 @@ func (c *CoinbaseFetcher) handleMessage(ctx context.Context, message map[string] return nil } - feedData, err := TickerToFeedData(ticker, c.FeedMap) + feedDataList, err := TickerToFeedData(ticker, c.FeedMap) if err != nil { return err } - c.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + c.FeedDataBuffer <- feedData + } return nil } diff --git a/node/pkg/websocketfetcher/providers/coinbase/utils.go b/node/pkg/websocketfetcher/providers/coinbase/utils.go index 55f157607..21dcce506 100644 --- a/node/pkg/websocketfetcher/providers/coinbase/utils.go +++ b/node/pkg/websocketfetcher/providers/coinbase/utils.go @@ -9,8 +9,11 @@ import ( "github.com/rs/zerolog/log" ) -func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) +func TickerToFeedData(ticker Ticker, feedMap map[string][]int32) ([]*common.FeedData, error) { + ids, exists := feedMap[strings.ToUpper(ticker.ProductID)] + if !exists { + return nil, fmt.Errorf("feed not found") + } timestamp, err := time.Parse(time.RFC3339Nano, ticker.Time) if err != nil { @@ -18,26 +21,25 @@ func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (*common.FeedData timestamp = time.Now() } - id, exists := feedMap[strings.ToUpper(ticker.ProductID)] - if !exists { - return feedData, fmt.Errorf("feed not found") - } - value, err := common.PriceStringToFloat64(ticker.Price) if err != nil { - return feedData, err + return nil, err } volume, err := common.VolumeStringToFloat64(ticker.Volume24h) if err != nil { - return feedData, err + return nil, err } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - - return feedData, nil + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + result = append(result, feedData) + } + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/coinex/utils.go b/node/pkg/websocketfetcher/providers/coinex/utils.go index d5ec13e17..6abb084c1 100644 --- a/node/pkg/websocketfetcher/providers/coinex/utils.go +++ b/node/pkg/websocketfetcher/providers/coinex/utils.go @@ -7,12 +7,11 @@ import ( "github.com/rs/zerolog/log" ) -func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.FeedData, error) { +func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { feedDataList := []*common.FeedData{} for _, item := range data.Params { for key, value := range item { - feedData := new(common.FeedData) id, exists := feedMap[key] if !exists { log.Warn().Str("Player", "Coinex").Str("key", key).Msg("feed not found") @@ -29,11 +28,15 @@ func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common. continue } timestamp := time.Now() - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - feedData.Volume = volume - feedDataList = append(feedDataList, feedData) + + for _, id := range id { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + feedData.Volume = volume + feedDataList = append(feedDataList, feedData) + } } } diff --git a/node/pkg/websocketfetcher/providers/coinone/coinone.go b/node/pkg/websocketfetcher/providers/coinone/coinone.go index 0b54a395f..2ba87f76b 100644 --- a/node/pkg/websocketfetcher/providers/coinone/coinone.go +++ b/node/pkg/websocketfetcher/providers/coinone/coinone.go @@ -66,13 +66,16 @@ func (c *CoinoneFetcher) handleMessage(ctx context.Context, message map[string]a if raw.ResponseType != "DATA" { return nil } - feedData, err := DataToFeedData(raw.Data, c.FeedMap) + feedDataList, err := DataToFeedData(raw.Data, c.FeedMap) if err != nil { log.Error().Str("Player", "Coinone").Err(err).Msg("error in DataToFeedData") return err } - c.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + c.FeedDataBuffer <- feedData + } + return nil } diff --git a/node/pkg/websocketfetcher/providers/coinone/utils.go b/node/pkg/websocketfetcher/providers/coinone/utils.go index 5c725a74a..fcf3ee273 100644 --- a/node/pkg/websocketfetcher/providers/coinone/utils.go +++ b/node/pkg/websocketfetcher/providers/coinone/utils.go @@ -8,27 +8,32 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func DataToFeedData(data Data, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) +func DataToFeedData(data Data, feedMap map[string][]int32) ([]*common.FeedData, error) { + ids, exists := feedMap[strings.ToUpper(data.TargetCurrency)+"-"+strings.ToUpper(data.QuoteCurrency)] + if !exists { + return nil, fmt.Errorf("feed not found") + } timestamp := time.UnixMilli(data.Timestamp) value, err := common.PriceStringToFloat64(data.Last) if err != nil { - return feedData, err + return nil, err } volume, err := common.VolumeStringToFloat64(data.TargetVolume) if err != nil { - return feedData, err + return nil, err } - id, exists := feedMap[strings.ToUpper(data.TargetCurrency)+"-"+strings.ToUpper(data.QuoteCurrency)] - if !exists { - return feedData, fmt.Errorf("feed not found") + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + result = append(result, feedData) } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/crypto/utils.go b/node/pkg/websocketfetcher/providers/crypto/utils.go index 1d9f878b0..18ac70cca 100644 --- a/node/pkg/websocketfetcher/providers/crypto/utils.go +++ b/node/pkg/websocketfetcher/providers/crypto/utils.go @@ -8,7 +8,7 @@ import ( "github.com/rs/zerolog/log" ) -func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.FeedData, error) { +func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { feedData := []*common.FeedData{} for _, tick := range data.Result.Data { @@ -36,18 +36,20 @@ func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common. base := rawSymbol[0] quote := rawSymbol[1] - id, exists := feedMap[base+"-"+quote] + ids, exists := feedMap[base+"-"+quote] if !exists { log.Warn().Str("Player", "cryptodotcom").Str("symbol", base+"-"+quote).Msg("feed not found") continue } - feedData = append(feedData, &common.FeedData{ - FeedID: id, - Value: value, - Timestamp: ×tamp, - Volume: volume, - }) + for _, id := range ids { + feedData = append(feedData, &common.FeedData{ + FeedID: id, + Value: value, + Timestamp: ×tamp, + Volume: volume, + }) + } } return feedData, nil diff --git a/node/pkg/websocketfetcher/providers/gateio/gateio.go b/node/pkg/websocketfetcher/providers/gateio/gateio.go index 290b84df9..e28906d67 100644 --- a/node/pkg/websocketfetcher/providers/gateio/gateio.go +++ b/node/pkg/websocketfetcher/providers/gateio/gateio.go @@ -59,12 +59,16 @@ func (f *GateioFetcher) handleMessage(ctx context.Context, message map[string]an return nil } - feedData, err := ResponseToFeedData(response, f.FeedMap) + feedDataList, err := ResponseToFeedData(response, f.FeedMap) if err != nil { log.Error().Str("Player", "Gateio").Err(err).Msg("error in ResponseToFeedData") return err } - f.FeedDataBuffer <- feedData + + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } + return nil } diff --git a/node/pkg/websocketfetcher/providers/gateio/utils.go b/node/pkg/websocketfetcher/providers/gateio/utils.go index 5e621fff4..79d2e22d5 100644 --- a/node/pkg/websocketfetcher/providers/gateio/utils.go +++ b/node/pkg/websocketfetcher/providers/gateio/utils.go @@ -8,29 +8,35 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func ResponseToFeedData(data Response, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) +func ResponseToFeedData(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { timestamp := time.Unix(data.Time, 0) price, err := common.PriceStringToFloat64(data.Result.Last) if err != nil { - return feedData, err + return nil, err } volume, err := common.VolumeStringToFloat64(data.Result.BaseVolume) if err != nil { - return feedData, err + return nil, err } key := strings.Replace(data.Result.CurrencyPair, "_", "-", 1) - id, exists := feedMap[key] + ids, exists := feedMap[key] if !exists { - return feedData, fmt.Errorf("feed not found") + return nil, fmt.Errorf("feed not found") } - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + feedData.Volume = volume + + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/gemini/utils.go b/node/pkg/websocketfetcher/providers/gemini/utils.go index 48aa769d1..5fe678274 100644 --- a/node/pkg/websocketfetcher/providers/gemini/utils.go +++ b/node/pkg/websocketfetcher/providers/gemini/utils.go @@ -9,13 +9,13 @@ import ( "github.com/rs/zerolog/log" ) -func TradeResponseToFeedDataList(data Response, feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) ([]*common.FeedData, error) { +func TradeResponseToFeedDataList(data Response, feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) ([]*common.FeedData, error) { feedDataList := []*common.FeedData{} timestamp := time.UnixMilli(*data.TimestampMs) for _, event := range data.Events { - feedData := new(common.FeedData) - id, exists := feedMap[event.Symbol] + + ids, exists := feedMap[event.Symbol] if !exists { log.Warn().Str("Player", "Gemini").Str("key", event.Symbol).Msg("feed not found") continue @@ -26,24 +26,27 @@ func TradeResponseToFeedDataList(data Response, feedMap map[string]int32, volume log.Warn().Str("Player", "Gemini").Err(err).Msg("error in PriceStringToFloat64") continue } - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - volumeData, exists := volumeCacheMap.Map[id] - if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) { - feedData.Volume = 0 - } else { - feedData.Volume = volumeData.Volume - } - feedDataList = append(feedDataList, feedData) + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + volumeData, exists := volumeCacheMap.Map[id] + if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) { + feedData.Volume = 0 + } else { + feedData.Volume = volumeData.Volume + } + feedDataList = append(feedDataList, feedData) + } } return feedDataList, nil } -func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) { - for symbol, id := range feedMap { +func FetchVolumes(feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) { + for symbol, ids := range feedMap { endpoint := TICKER_ENDPOINT + strings.ToLower(symbol) result, err := request.Request[HttpTickerResponse](request.WithEndpoint(endpoint), request.WithTimeout(common.VolumeFetchTimeout)) if err != nil { @@ -72,9 +75,11 @@ func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMa } volumeCacheMap.Mutex.Lock() - volumeCacheMap.Map[id] = common.VolumeCache{ - UpdatedAt: timestamp, - Volume: volume, + for _, id := range ids { + volumeCacheMap.Map[id] = common.VolumeCache{ + UpdatedAt: timestamp, + Volume: volume, + } } volumeCacheMap.Mutex.Unlock() } diff --git a/node/pkg/websocketfetcher/providers/gopax/gopax.go b/node/pkg/websocketfetcher/providers/gopax/gopax.go index a3db88ec8..041437f16 100644 --- a/node/pkg/websocketfetcher/providers/gopax/gopax.go +++ b/node/pkg/websocketfetcher/providers/gopax/gopax.go @@ -74,7 +74,7 @@ func (f *GopaxFetcher) handleMessage(ctx context.Context, message map[string]any return fmt.Errorf("failed to parse response: %v", response) } for _, ticker := range tickers { - feedData, err := TickerToFeedData(ticker, f.FeedMap) + feedDataList, err := TickerToFeedData(ticker, f.FeedMap) if err != nil { if errors.Is(err, errorSentinel.ErrFetcherFeedNotFound) { continue @@ -82,7 +82,10 @@ func (f *GopaxFetcher) handleMessage(ctx context.Context, message map[string]any log.Error().Err(err).Str("Player", "Gopax").Msg("error in gopax.handleMessage, failed to convert ticker to feed data") continue } - f.FeedDataBuffer <- feedData + + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } } return nil diff --git a/node/pkg/websocketfetcher/providers/gopax/utils.go b/node/pkg/websocketfetcher/providers/gopax/utils.go index 7e1db9021..a2909cb8b 100644 --- a/node/pkg/websocketfetcher/providers/gopax/utils.go +++ b/node/pkg/websocketfetcher/providers/gopax/utils.go @@ -10,10 +10,10 @@ import ( "github.com/rs/zerolog/log" ) -func InitialResponseToFeedData(initialData InitialResponse, feedMap map[string]int32) []*common.FeedData { - feedDataList := []*common.FeedData{} +func InitialResponseToFeedData(initialData InitialResponse, feedMap map[string][]int32) []*common.FeedData { + result := []*common.FeedData{} for _, data := range initialData.Data { - feedData, err := TickerToFeedData(data, feedMap) + feedDataList, err := TickerToFeedData(data, feedMap) if err != nil { if !errors.Is(err, errorSentinel.ErrFetcherFeedNotFound) { log.Warn().Str("Player", "Gopax").Err(err).Msg("error in TickerToFeedData") @@ -21,26 +21,29 @@ func InitialResponseToFeedData(initialData InitialResponse, feedMap map[string]i continue } - feedDataList = append(feedDataList, feedData) + result = append(result, feedDataList...) } - return feedDataList + return result } -func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) - - id, exists := feedMap[ticker.Name] +func TickerToFeedData(ticker Ticker, feedMap map[string][]int32) ([]*common.FeedData, error) { + ids, exists := feedMap[ticker.Name] if !exists { - return feedData, errorSentinel.ErrFetcherFeedNotFound + return nil, errorSentinel.ErrFetcherFeedNotFound } - feedData.FeedID = id timestamp := time.UnixMilli(ticker.Timestamp) - feedData.Timestamp = ×tamp - value := common.FormatFloat64Price(ticker.Price) - feedData.Value = value - feedData.Volume = ticker.Volume - return feedData, nil + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Timestamp = ×tamp + feedData.Value = value + feedData.Volume = ticker.Volume + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/huobi/huobi.go b/node/pkg/websocketfetcher/providers/huobi/huobi.go index a1186a408..27d25455d 100644 --- a/node/pkg/websocketfetcher/providers/huobi/huobi.go +++ b/node/pkg/websocketfetcher/providers/huobi/huobi.go @@ -72,13 +72,15 @@ func (f *HuobiFetcher) handleMessage(ctx context.Context, message map[string]any log.Error().Str("Player", "Huobi").Err(err).Msg("error in huobi.handleMessage, failed to parse response") return err } - feedData, err := ResponseToFeedData(response, f.FeedMap) + feedDataList, err := ResponseToFeedData(response, f.FeedMap) if err != nil { log.Error().Str("Player", "Huobi").Err(err).Msg("error in huobi.handleMessage, failed to convert response to feed data") return err } - f.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } } return nil diff --git a/node/pkg/websocketfetcher/providers/huobi/utils.go b/node/pkg/websocketfetcher/providers/huobi/utils.go index c398c9079..1313d5bae 100644 --- a/node/pkg/websocketfetcher/providers/huobi/utils.go +++ b/node/pkg/websocketfetcher/providers/huobi/utils.go @@ -9,8 +9,7 @@ import ( "github.com/rs/zerolog/log" ) -func ResponseToFeedData(response Response, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) +func ResponseToFeedData(response Response, feedMap map[string][]int32) ([]*common.FeedData, error) { timestamp := time.UnixMilli(response.Ts) price := common.FormatFloat64Price(response.Tick.LastPrice) @@ -18,19 +17,27 @@ func ResponseToFeedData(response Response, feedMap map[string]int32) (*common.Fe splitted := strings.Split(response.Ch, ".") if len(splitted) < 3 || splitted[2] != "ticker" { log.Error().Str("Ch", response.Ch).Msg("invalid response") - return feedData, fmt.Errorf("invalid response") + return nil, fmt.Errorf("invalid response") } rawSymbol := splitted[1] symbol := strings.ToUpper(rawSymbol) - id, exists := feedMap[symbol] + ids, exists := feedMap[symbol] if !exists { - return feedData, fmt.Errorf("feed not found") + return nil, fmt.Errorf("feed not found") } - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - feedData.Volume = response.Tick.Amount - return feedData, nil + + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + feedData.Volume = response.Tick.Amount + + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/korbit/korbit.go b/node/pkg/websocketfetcher/providers/korbit/korbit.go index 5e98ea1b7..664d7f3b9 100644 --- a/node/pkg/websocketfetcher/providers/korbit/korbit.go +++ b/node/pkg/websocketfetcher/providers/korbit/korbit.go @@ -62,13 +62,16 @@ func (k *KorbitFetcher) handleMessage(ctx context.Context, message map[string]an return nil } - feedData, err := DataToFeedData(raw.Data, k.FeedMap) + feedDataList, err := DataToFeedData(raw.Data, k.FeedMap) if err != nil { log.Error().Str("Player", "Korbit").Err(err).Msg("error in DataToFeedData") return err } - k.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + k.FeedDataBuffer <- feedData + } + return nil } diff --git a/node/pkg/websocketfetcher/providers/korbit/utils.go b/node/pkg/websocketfetcher/providers/korbit/utils.go index 3649ca301..cd786ef89 100644 --- a/node/pkg/websocketfetcher/providers/korbit/utils.go +++ b/node/pkg/websocketfetcher/providers/korbit/utils.go @@ -8,34 +8,40 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func DataToFeedData(data Ticker, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) - +func DataToFeedData(data Ticker, feedMap map[string][]int32) ([]*common.FeedData, error) { timestamp := time.UnixMilli(data.Timestamp) value, err := common.PriceStringToFloat64(data.Last) if err != nil { - return feedData, err + return nil, err } volume, err := common.VolumeStringToFloat64(data.Volume) if err != nil { - return feedData, err + return nil, err } rawPair := strings.Split(data.CurrencyPair, "_") if len(rawPair) < 2 { - return feedData, fmt.Errorf("invalid feed name") + return nil, fmt.Errorf("invalid feed name") } target := rawPair[0] quote := rawPair[1] - id, exists := feedMap[strings.ToUpper(target)+"-"+strings.ToUpper(quote)] + ids, exists := feedMap[strings.ToUpper(target)+"-"+strings.ToUpper(quote)] if !exists { - return feedData, fmt.Errorf("feed not found") + return nil, fmt.Errorf("feed not found") } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/kraken/utils.go b/node/pkg/websocketfetcher/providers/kraken/utils.go index 574698fb1..919cc6e51 100644 --- a/node/pkg/websocketfetcher/providers/kraken/utils.go +++ b/node/pkg/websocketfetcher/providers/kraken/utils.go @@ -7,25 +7,27 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func ResponseToFeedData(response Response, feedMap map[string]int32) []*common.FeedData { +func ResponseToFeedData(response Response, feedMap map[string][]int32) []*common.FeedData { feedDataList := []*common.FeedData{} for _, data := range response.Data { symbol := strings.ReplaceAll(data.Symbol, "/", "-") - id, exists := feedMap[symbol] + ids, exists := feedMap[symbol] if !exists { continue } - feedData := new(common.FeedData) value := common.FormatFloat64Price(data.Price) timestamp := time.Now() volume := data.Volume - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - feedDataList = append(feedDataList, feedData) + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + feedDataList = append(feedDataList, feedData) + } } return feedDataList } diff --git a/node/pkg/websocketfetcher/providers/kucoin/kucoin.go b/node/pkg/websocketfetcher/providers/kucoin/kucoin.go index 512a867e2..4dd5e7afb 100644 --- a/node/pkg/websocketfetcher/providers/kucoin/kucoin.go +++ b/node/pkg/websocketfetcher/providers/kucoin/kucoin.go @@ -65,9 +65,15 @@ func (f *KucoinFetcher) handleMessage(ctx context.Context, message map[string]an return nil } - feedData := RawDataToFeedData(raw, f.FeedMap) + feedDataList, err := RawDataToFeedData(raw, f.FeedMap) + if err != nil { + log.Error().Str("Player", "Kucoin").Err(err).Msg("error in kucoin.handleMessage") + return err + } - f.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } return nil } diff --git a/node/pkg/websocketfetcher/providers/kucoin/utils.go b/node/pkg/websocketfetcher/providers/kucoin/utils.go index 08e5922b6..6764edd0d 100644 --- a/node/pkg/websocketfetcher/providers/kucoin/utils.go +++ b/node/pkg/websocketfetcher/providers/kucoin/utils.go @@ -1,24 +1,35 @@ package kucoin import ( + "fmt" "time" "bisonai.com/miko/node/pkg/websocketfetcher/common" + "github.com/rs/zerolog/log" ) -func RawDataToFeedData(raw SymbolSnapshotRaw, feedMap map[string]int32) *common.FeedData { +func RawDataToFeedData(raw SymbolSnapshotRaw, feedMap map[string][]int32) ([]*common.FeedData, error) { snapshot := raw.Data.Data symbol := snapshot.Symbol - id := feedMap[symbol] + ids, ok := feedMap[symbol] + if !ok { + log.Warn().Str("Player", "Kucoin").Str("symbol", symbol).Msg("feed not found") + return nil, fmt.Errorf("feed not found") + } timestamp := time.UnixMilli(snapshot.Time) value := common.FormatFloat64Price(snapshot.Price) volume := snapshot.Volume - return &common.FeedData{ - FeedID: id, - Value: value, - Timestamp: ×tamp, - Volume: volume, + result := []*common.FeedData{} + for _, id := range ids { + result = append(result, &common.FeedData{ + FeedID: id, + Value: value, + Timestamp: ×tamp, + Volume: volume, + }) } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/lbank/lbank.go b/node/pkg/websocketfetcher/providers/lbank/lbank.go index eeafba163..08d186ff0 100644 --- a/node/pkg/websocketfetcher/providers/lbank/lbank.go +++ b/node/pkg/websocketfetcher/providers/lbank/lbank.go @@ -65,13 +65,15 @@ func (f *LbankFetcher) handleMessage(ctx context.Context, message map[string]any if response.Type != "tick" { return nil } - feedData, err := ResponseToFeedData(response, f.FeedMap) + feedDataList, err := ResponseToFeedData(response, f.FeedMap) if err != nil { log.Error().Str("Player", "Lbank").Err(err).Msg("error in ResponseToFeedData") return err } - f.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } return nil } diff --git a/node/pkg/websocketfetcher/providers/lbank/utils.go b/node/pkg/websocketfetcher/providers/lbank/utils.go index 11bd89007..fba048a59 100644 --- a/node/pkg/websocketfetcher/providers/lbank/utils.go +++ b/node/pkg/websocketfetcher/providers/lbank/utils.go @@ -10,27 +10,32 @@ import ( const layout = "2006-01-02T15:04:05.000" -func ResponseToFeedData(data Response, feedMap map[string]int32) (*common.FeedData, error) { +func ResponseToFeedData(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { loc, _ := time.LoadLocation("Asia/Shanghai") - feedData := new(common.FeedData) timestampRaw, err := time.ParseInLocation(layout, data.TS, loc) if err != nil { - return feedData, err + return nil, err } timestamp := timestampRaw.UTC() value := common.FormatFloat64Price(data.Tick.Latest) symbol := strings.ToUpper(strings.ReplaceAll(data.Pair, "_", "-")) volume := data.Tick.Vol - id, exists := feedMap[symbol] + ids, exists := feedMap[symbol] if !exists { - return feedData, fmt.Errorf("feed not found") + return nil, fmt.Errorf("feed not found") } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/mexc/utils.go b/node/pkg/websocketfetcher/providers/mexc/utils.go index 57a4f1af3..12bd1a8d5 100644 --- a/node/pkg/websocketfetcher/providers/mexc/utils.go +++ b/node/pkg/websocketfetcher/providers/mexc/utils.go @@ -6,19 +6,17 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func ResponseToFeedDataList(response BatchResponse, feedMap map[string]int32) ([]*common.FeedData, error) { +func ResponseToFeedDataList(response BatchResponse, feedMap map[string][]int32) ([]*common.FeedData, error) { feedDataList := []*common.FeedData{} timestamp := time.UnixMilli(int64(response.Time)) for _, item := range response.Data { - id, exists := feedMap[item.Symbol] + ids, exists := feedMap[item.Symbol] if !exists { continue } - feedData := new(common.FeedData) - value, err := common.PriceStringToFloat64(item.Price) if err != nil { return feedDataList, err @@ -30,11 +28,15 @@ func ResponseToFeedDataList(response BatchResponse, feedMap map[string]int32) ([ return feedDataList, err } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - feedDataList = append(feedDataList, feedData) + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + feedDataList = append(feedDataList, feedData) + } } + return feedDataList, nil } diff --git a/node/pkg/websocketfetcher/providers/okx/utils.go b/node/pkg/websocketfetcher/providers/okx/utils.go index 60413147f..6b9f3f7e6 100644 --- a/node/pkg/websocketfetcher/providers/okx/utils.go +++ b/node/pkg/websocketfetcher/providers/okx/utils.go @@ -8,15 +8,14 @@ import ( "github.com/rs/zerolog/log" ) -func ResponseToFeedData(response Response, feedMap map[string]int32) []*common.FeedData { +func ResponseToFeedData(response Response, feedMap map[string][]int32) []*common.FeedData { feedDataList := []*common.FeedData{} for _, data := range response.Data { - id, exists := feedMap[data.InstId] + ids, exists := feedMap[data.InstId] if !exists { continue } - feedData := new(common.FeedData) value, err := common.PriceStringToFloat64(data.Price) if err != nil { log.Error().Err(err).Str("Player", "OKX").Msg("error in PriceStringToFloat64") @@ -34,11 +33,16 @@ func ResponseToFeedData(response Response, feedMap map[string]int32) []*common.F continue } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - feedDataList = append(feedDataList, feedData) + for _, id := range ids { + feedData := new(common.FeedData) + + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + + feedDataList = append(feedDataList, feedData) + } } return feedDataList } diff --git a/node/pkg/websocketfetcher/providers/orangex/orangex.go b/node/pkg/websocketfetcher/providers/orangex/orangex.go index 00cf1664d..c62cb06f9 100644 --- a/node/pkg/websocketfetcher/providers/orangex/orangex.go +++ b/node/pkg/websocketfetcher/providers/orangex/orangex.go @@ -108,17 +108,16 @@ func (f *OrangeXFetcher) handleMessage(ctx context.Context, message map[string]a return nil } - feedData, err := TickerResponseToFeedData(raw, f.FeedMap) + feedDataList, err := TickerResponseToFeedData(raw, f.FeedMap) if err != nil { log.Error().Str("Player", "OrangeX").Err(err).Msg("error in orangex.handleMessage") return err } - if feedData == nil { - return nil + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData } - f.FeedDataBuffer <- feedData return nil } @@ -126,14 +125,13 @@ func (f *OrangeXFetcher) Run(ctx context.Context) { f.Ws.Run(ctx, f.handleMessage) } -func TickerResponseToFeedData(data TickerResponse, feedMap map[string]int32) (*common.FeedData, error) { - id, exists := feedMap[data.Params.Data.InstrumentName] +func TickerResponseToFeedData(data TickerResponse, feedMap map[string][]int32) ([]*common.FeedData, error) { + ids, exists := feedMap[data.Params.Data.InstrumentName] if !exists { log.Warn().Str("Player", "OrangeX").Any("data", data).Str("key", data.Params.Data.InstrumentName).Msg("feed not found") return nil, nil } - feedData := new(common.FeedData) value, err := common.PriceStringToFloat64(data.Params.Data.LastPrice) if err != nil { log.Error().Str("Player", "OrangeX").Err(err).Msg("error in PriceStringToFloat64") @@ -152,9 +150,15 @@ func TickerResponseToFeedData(data TickerResponse, feedMap map[string]int32) (*c return nil, err } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/upbit/upbit.go b/node/pkg/websocketfetcher/providers/upbit/upbit.go index b390b6d95..82e50dd7d 100644 --- a/node/pkg/websocketfetcher/providers/upbit/upbit.go +++ b/node/pkg/websocketfetcher/providers/upbit/upbit.go @@ -55,12 +55,16 @@ func (f *UpbitFetcher) handleMessage(ctx context.Context, message map[string]int log.Error().Str("Player", "Upbit").Err(err).Msg("error in upbit.handleMessage") return err } - feedData, err := ResponseToFeedData(response, f.FeedMap) + feedDataList, err := ResponseToFeedData(response, f.FeedMap) if err != nil { log.Error().Str("Player", "Upbit").Err(err).Msg("error in upbit.handleMessage") return err } - f.FeedDataBuffer <- feedData + + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } + return nil } diff --git a/node/pkg/websocketfetcher/providers/upbit/utils.go b/node/pkg/websocketfetcher/providers/upbit/utils.go index d10ddffdc..d7eaa2ff4 100644 --- a/node/pkg/websocketfetcher/providers/upbit/utils.go +++ b/node/pkg/websocketfetcher/providers/upbit/utils.go @@ -8,8 +8,7 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func ResponseToFeedData(data Response, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) +func ResponseToFeedData(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) { timestamp := time.UnixMilli(data.TradeTimestamp) price := common.FormatFloat64Price(data.TradePrice) @@ -20,13 +19,21 @@ func ResponseToFeedData(data Response, feedMap map[string]int32) (*common.FeedDa base := splitted[1] quote := splitted[0] - id, exists := feedMap[strings.ToUpper(base)+"-"+strings.ToUpper(quote)] + ids, exists := feedMap[strings.ToUpper(base)+"-"+strings.ToUpper(quote)] if !exists { - return feedData, fmt.Errorf("feed not found") + return nil, fmt.Errorf("feed not found") } - feedData.FeedID = id - feedData.Value = price - feedData.Timestamp = ×tamp - feedData.Volume = *volume - return feedData, nil + + result := []*common.FeedData{} + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = price + feedData.Timestamp = ×tamp + feedData.Volume = *volume + + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/xt/utils.go b/node/pkg/websocketfetcher/providers/xt/utils.go index 0dc70057a..bc20c662d 100644 --- a/node/pkg/websocketfetcher/providers/xt/utils.go +++ b/node/pkg/websocketfetcher/providers/xt/utils.go @@ -8,11 +8,9 @@ import ( "bisonai.com/miko/node/pkg/websocketfetcher/common" ) -func ResponseToFeedData(response Response, feedMap map[string]int32) (*common.FeedData, error) { - feedData := new(common.FeedData) - +func ResponseToFeedData(response Response, feedMap map[string][]int32) ([]*common.FeedData, error) { symbol := strings.ToUpper(strings.ReplaceAll(response.Data.Symbol, "_", "-")) - id, exists := feedMap[symbol] + ids, exists := feedMap[symbol] if !exists { return nil, fmt.Errorf("feed not found") } @@ -26,9 +24,17 @@ func ResponseToFeedData(response Response, feedMap map[string]int32) (*common.Fe return nil, err } - feedData.FeedID = id - feedData.Value = value - feedData.Timestamp = ×tamp - feedData.Volume = volume - return feedData, nil + result := []*common.FeedData{} + + for _, id := range ids { + feedData := new(common.FeedData) + feedData.FeedID = id + feedData.Value = value + feedData.Timestamp = ×tamp + feedData.Volume = volume + + result = append(result, feedData) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/providers/xt/xt.go b/node/pkg/websocketfetcher/providers/xt/xt.go index 5dcb6fdab..6106b556e 100644 --- a/node/pkg/websocketfetcher/providers/xt/xt.go +++ b/node/pkg/websocketfetcher/providers/xt/xt.go @@ -68,13 +68,16 @@ func (f *XtFetcher) handleMessage(ctx context.Context, message map[string]any) e return nil } - feedData, err := ResponseToFeedData(raw, f.FeedMap) + feedDataList, err := ResponseToFeedData(raw, f.FeedMap) if err != nil { log.Error().Str("Player", "Xt").Err(err).Msg("error in xt.handleMessage, failed to convert response to feed data") return err } - f.FeedDataBuffer <- feedData + for _, feedData := range feedDataList { + f.FeedDataBuffer <- feedData + } + return nil }