From 33290073f635b2f56bfd41179e8c5877da33ae31 Mon Sep 17 00:00:00 2001 From: Mencius Fang Date: Sun, 31 Dec 2023 12:01:09 +0800 Subject: [PATCH] implement parse_candlestick for zb and fix bug for zbg_spot.rs (#9) * implement parse_candlestick for zb and fix bug for zbg_spot.rs * Polished code style * Sorted by alphabetical order --------- Co-authored-by: soulmachine --- crypto-msg-parser/src/exchanges/zb/mod.rs | 12 +++ crypto-msg-parser/src/exchanges/zb/zb_spot.rs | 88 ++++++++++++++++++- crypto-msg-parser/src/exchanges/zb/zb_swap.rs | 83 ++++++++++++++++- .../src/exchanges/zbg/zbg_spot.rs | 6 +- .../src/exchanges/zbg/zbg_swap.rs | 3 +- crypto-msg-parser/src/lib.rs | 5 +- crypto-msg-parser/tests/zb.rs | 34 ++++++- 7 files changed, 220 insertions(+), 11 deletions(-) diff --git a/crypto-msg-parser/src/exchanges/zb/mod.rs b/crypto-msg-parser/src/exchanges/zb/mod.rs index 3a72fe4..ff782b7 100644 --- a/crypto-msg-parser/src/exchanges/zb/mod.rs +++ b/crypto-msg-parser/src/exchanges/zb/mod.rs @@ -4,6 +4,7 @@ mod zb_swap; use std::collections::HashMap; use crypto_market_type::MarketType; +use crypto_message::CandlestickMsg; use serde_json::Value; use crate::{OrderBookMsg, TradeMsg}; @@ -86,3 +87,14 @@ pub(crate) fn parse_l2_topk( zb_swap::parse_l2(market_type, msg) } } + +pub(crate) fn parse_candlestick( + market_type: MarketType, + msg: &str, +) -> Result, SimpleError> { + if market_type == MarketType::Spot { + zb_spot::parse_candlestick(msg) + } else { + zb_swap::parse_candlestick(market_type, msg) + } +} diff --git a/crypto-msg-parser/src/exchanges/zb/zb_spot.rs b/crypto-msg-parser/src/exchanges/zb/zb_spot.rs index 8b39f97..0744277 100644 --- a/crypto-msg-parser/src/exchanges/zb/zb_spot.rs +++ b/crypto-msg-parser/src/exchanges/zb/zb_spot.rs @@ -1,10 +1,12 @@ use crypto_market_type::MarketType; -use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide}; +use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide}; use crypto_msg_type::MessageType; use serde_json::Value; use simple_error::SimpleError; use std::collections::HashMap; +use crate::exchanges::utils::calc_quantity_and_volume; + use super::EXCHANGE_NAME; use serde::{Deserialize, Serialize}; @@ -88,6 +90,20 @@ struct L2TopKMsg { extra: HashMap, } +#[derive(Serialize, Deserialize)] +#[allow(non_snake_case)] +struct RawCandlestickMsg { + datas: Data, + channel: String, + isSuc: Value, +} + +#[derive(Serialize, Deserialize)] +#[allow(non_snake_case)] +struct Data { + data: Vec<[Value; 6]>, +} + pub(super) fn parse_trade(msg: &str) -> Result, SimpleError> { let ws_msg = serde_json::from_str::>(msg).map_err(|_e| { SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg")) @@ -165,3 +181,73 @@ pub(super) fn parse_l2_topk(msg: &str) -> Result, SimpleError> }; Ok(vec![orderbook]) } + +// * https://www.zb.com/en/api #Market GetKline +pub(super) fn parse_candlestick(msg: &str) -> Result, SimpleError> { + let ws_msg = serde_json::from_str::(msg).map_err(SimpleError::from)?; + + let (symbol, period) = { + let mut arr = ws_msg.channel.split('_'); + (arr.next().unwrap(), arr.last().unwrap()) + }; + let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap(); + + let mut m_seconds = 0; + if period.ends_with("min") { + m_seconds = period.strip_suffix("min").unwrap().parse::().unwrap() * 60 * 1000; + } else if period.ends_with("hour") { + m_seconds = period.strip_suffix("hour").unwrap().parse::().unwrap() * 60 * 60 * 1000; + } else if period.ends_with("day") { + m_seconds = + period.strip_suffix("day").unwrap().parse::().unwrap() * 60 * 60 * 24 * 1000; + } else if period.ends_with("week") { + m_seconds = + period.strip_suffix("week").unwrap().parse::().unwrap() * 60 * 60 * 24 * 7 * 1000; + } + + let arr = ws_msg.datas.data; + let mut candlestick_msgs: Vec = arr + .into_iter() + .map(|candlestick_msg| { + let timestamp = candlestick_msg[0].as_i64().unwrap(); + let begin_time = timestamp - m_seconds; + + let open = candlestick_msg[1].as_f64().unwrap(); + let high = candlestick_msg[2].as_f64().unwrap(); + let low = candlestick_msg[3].as_f64().unwrap(); + let close = candlestick_msg[4].as_f64().unwrap(); + let price = (open + high + low + close) / 4.0; + let quantity = candlestick_msg[5].as_f64().unwrap(); + let (volume, quote_volume, _none) = calc_quantity_and_volume( + EXCHANGE_NAME, + MarketType::Spot, + pair.as_str(), + price, + quantity, + ); + + CandlestickMsg { + exchange: super::EXCHANGE_NAME.to_string(), + market_type: MarketType::Spot, + symbol: symbol.to_string(), + pair: pair.clone(), + msg_type: MessageType::Candlestick, + timestamp, + begin_time, + open, + high, + low, + close, + volume, + period: period.to_string(), + quote_volume: Some(crate::round(quote_volume)), + json: msg.to_string(), + } + }) + .collect(); + + if candlestick_msgs.len() == 1 { + candlestick_msgs[0].json = msg.to_string(); + } + Ok(candlestick_msgs) +} diff --git a/crypto-msg-parser/src/exchanges/zb/zb_swap.rs b/crypto-msg-parser/src/exchanges/zb/zb_swap.rs index 0cee866..520871e 100644 --- a/crypto-msg-parser/src/exchanges/zb/zb_swap.rs +++ b/crypto-msg-parser/src/exchanges/zb/zb_swap.rs @@ -1,5 +1,5 @@ use crypto_market_type::MarketType; -use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide}; +use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide}; use crypto_msg_type::MessageType; use super::EXCHANGE_NAME; @@ -138,6 +138,15 @@ struct Level2Msg { extra: HashMap, } +#[derive(Serialize, Deserialize)] +#[allow(non_snake_case)] +struct RawCandlestickMsg { + channel: String, + data: Vec<[Value; 6]>, + #[serde(rename = "type")] + type_: Option, // Whole +} + /// Docs: /// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#84-increment-depth /// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#84-increment-depth @@ -192,3 +201,75 @@ pub(super) fn parse_l2( }; Ok(vec![orderbook]) } + +/// Docs: +/// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#85-candlestick +pub(super) fn parse_candlestick( + market_type: MarketType, + msg: &str, +) -> Result, SimpleError> { + let ws_msg = serde_json::from_str::(msg).map_err(|_e| { + SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg")) + })?; + + let (symbol, period) = { + let mut arr = ws_msg.channel.split('.'); + (arr.next().unwrap(), arr.last().unwrap().split('_').last().unwrap()) + }; + let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap(); + + let mut m_seconds = 0; + if period.ends_with('M') { + m_seconds = period.strip_suffix('M').unwrap().parse::().unwrap() * 60 * 1000; + } else if period.ends_with('H') { + m_seconds = period.strip_suffix('H').unwrap().parse::().unwrap() * 60 * 60 * 1000; + } else if period.ends_with('D') { + m_seconds = period.strip_suffix('D').unwrap().parse::().unwrap() * 60 * 60 * 24 * 1000; + } + + let arr = ws_msg.data; + let mut candlestick_msgs: Vec = arr + .into_iter() + .map(|candlestick_msg| { + let timestamp = candlestick_msg[5].as_i64().unwrap() * 1000; + let begin_time = timestamp - m_seconds; + let open = candlestick_msg[0].as_f64().unwrap(); + let high = candlestick_msg[1].as_f64().unwrap(); + let low = candlestick_msg[2].as_f64().unwrap(); + let close = candlestick_msg[3].as_f64().unwrap(); + let price = (open + high + low + close) / 4.0; + let quantity = candlestick_msg[4].as_f64().unwrap(); + + let (volume, quote_volume, _none) = calc_quantity_and_volume( + EXCHANGE_NAME, + market_type, + pair.as_str(), + price, + quantity, + ); + + CandlestickMsg { + exchange: super::EXCHANGE_NAME.to_string(), + market_type, + symbol: symbol.to_string(), + pair: pair.clone(), + msg_type: MessageType::Candlestick, + timestamp, + begin_time, + open, + high, + low, + close, + volume, + period: period.to_string(), + quote_volume: Some(crate::round(quote_volume)), + json: msg.to_string(), + } + }) + .collect(); + + if candlestick_msgs.len() == 1 { + candlestick_msgs[0].json = msg.to_string(); + } + Ok(candlestick_msgs) +} diff --git a/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs b/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs index 20a1a1c..d77b570 100644 --- a/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs +++ b/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs @@ -529,9 +529,7 @@ pub(crate) fn parse_l2(msg: &str) -> Result, SimpleError> { // dollar-rate, period, conversion, amount. pub(crate) fn parse_candlestick(msg: &str) -> Result, SimpleError> { let arr = if msg.starts_with(r#"[["K","#) { - serde_json::from_str::>>(msg).map_err(|_e| { - SimpleError::new(format!("Failed to deserialize {msg} to Vec>")) - })? + serde_json::from_str::>>(msg).map_err(SimpleError::from)? } else if msg.starts_with(r#"["K","#) { let tmp = serde_json::from_str::>(msg).map_err(|_e| { SimpleError::new(format!("Failed to deserialize {msg} to Vec")) @@ -556,7 +554,7 @@ pub(crate) fn parse_candlestick(msg: &str) -> Result, Simple * 60 } 'H' => { - period.to_string().strip_suffix('M').unwrap().parse::().unwrap() + period.to_string().strip_suffix('H').unwrap().parse::().unwrap() * 1000 * 60 * 60 diff --git a/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs b/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs index d53f706..138c6d6 100644 --- a/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs +++ b/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs @@ -320,8 +320,7 @@ pub(crate) fn parse_candlestick( market_type: MarketType, msg: &str, ) -> Result, SimpleError> { - let ws_msg = serde_json::from_str::>(msg) - .map_err(|_e| SimpleError::new(format!("Failed to deserialize {msg} to Vec")))?; + let ws_msg = serde_json::from_str::>(msg).map_err(SimpleError::from)?; assert_eq!(ws_msg[0].as_str().unwrap(), "future_kline"); let raw_candlestick_msg: RawCandlestickMsg = serde_json::from_value(ws_msg[1].clone()) .map_err(|_e| { diff --git a/crypto-msg-parser/src/lib.rs b/crypto-msg-parser/src/lib.rs index 2800eaa..aad4b43 100644 --- a/crypto-msg-parser/src/lib.rs +++ b/crypto-msg-parser/src/lib.rs @@ -262,20 +262,21 @@ pub fn parse_candlestick( match exchange { "binance" => exchanges::binance::parse_candlestick(market_type, msg), "bitfinex" => exchanges::bitfinex::parse_candlestick(market_type, msg), - "bitmex" => exchanges::bitmex::parse_candlestick(market_type, msg), "bitget" => exchanges::bitget::parse_candlestick(market_type, msg), + "bitmex" => exchanges::bitmex::parse_candlestick(market_type, msg), "bybit" => exchanges::bybit::parse_candlestick(market_type, msg), + "deribit" => exchanges::deribit::parse_candlestick(market_type, msg), "gate" => exchanges::gate::parse_candlestick(market_type, msg), "huobi" => exchanges::huobi::parse_candlestick(market_type, msg), "kraken" => exchanges::kraken::parse_candlestick(market_type, msg), "kucoin" => exchanges::kucoin::parse_candlestick(market_type, msg), - "deribit" => exchanges::deribit::parse_candlestick(market_type, msg), "mexc" => exchanges::mexc::parse_candlestick(market_type, msg, received_at), "okx" => exchanges::okx::parse_candlestick( market_type, msg, received_at.expect("OKX candlestick messages don't have timestamp"), ), + "zb" => exchanges::zb::parse_candlestick(market_type, msg), "zbg" => exchanges::zbg::parse_candlestick(market_type, msg), _ => Err(SimpleError::new(format!("Unknown exchange {exchange}"))), } diff --git a/crypto-msg-parser/tests/zb.rs b/crypto-msg-parser/tests/zb.rs index 102058a..d93c85f 100644 --- a/crypto-msg-parser/tests/zb.rs +++ b/crypto-msg-parser/tests/zb.rs @@ -365,7 +365,7 @@ mod ticker { mod candlestick { use super::EXCHANGE_NAME; use crypto_market_type::MarketType; - use crypto_msg_parser::{extract_symbol, extract_timestamp}; + use crypto_msg_parser::{extract_symbol, extract_timestamp, parse_candlestick}; #[test] fn spot() { @@ -377,6 +377,22 @@ mod candlestick { 1653782160000, extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap() ); + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap(); + assert_eq!(2, arr.len()); + let candlestick_msg = &arr[0]; + //[1653782100000,29055.22,29055.22,29030.81,29032.9,19.3130] + assert_eq!("btcusdt", candlestick_msg.symbol); + assert_eq!("BTC/USDT", candlestick_msg.pair); + assert_eq!(1653782100000, candlestick_msg.timestamp); + assert_eq!("1min", candlestick_msg.period); + assert_eq!(1653782040000, candlestick_msg.begin_time); + + assert_eq!(29055.22, candlestick_msg.open); + assert_eq!(29055.22, candlestick_msg.high); + assert_eq!(29030.81, candlestick_msg.low); + assert_eq!(29032.9, candlestick_msg.close); + assert_eq!(19.3130, candlestick_msg.volume); + assert_eq!(Some(560917.8397375), candlestick_msg.quote_volume); } #[test] @@ -392,6 +408,22 @@ mod candlestick { 1653783840000, extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap() ); + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg, None).unwrap(); + assert_eq!(1, arr.len()); + let candlestick_msg = &arr[0]; + //[28993.54,28996.39,28992.58,28994.78,0.921,1653783840] + assert_eq!("BTC_USDT", candlestick_msg.symbol); + assert_eq!("BTC/USDT", candlestick_msg.pair); + assert_eq!(1653783840000, candlestick_msg.timestamp); + assert_eq!("1M", candlestick_msg.period); + assert_eq!(1653783780000, candlestick_msg.begin_time); + + assert_eq!(28993.54, candlestick_msg.open); + assert_eq!(28996.39, candlestick_msg.high); + assert_eq!(28992.58, candlestick_msg.low); + assert_eq!(28994.78, candlestick_msg.close); + assert_eq!(0.921, candlestick_msg.volume); + assert_eq!(Some(26703.7710225), candlestick_msg.quote_volume); } }