diff --git a/crypto-msg-parser/src/exchanges/zbg/mod.rs b/crypto-msg-parser/src/exchanges/zbg/mod.rs index 3b07668..da755f5 100644 --- a/crypto-msg-parser/src/exchanges/zbg/mod.rs +++ b/crypto-msg-parser/src/exchanges/zbg/mod.rs @@ -3,7 +3,7 @@ mod zbg_swap; use crypto_market_type::MarketType; -use crate::{OrderBookMsg, TradeMsg}; +use crate::{CandlestickMsg, OrderBookMsg, TradeMsg}; use simple_error::SimpleError; @@ -49,3 +49,14 @@ pub(crate) fn parse_l2( zbg_swap::parse_l2(market_type, msg) } } + +pub(crate) fn parse_candlestick( + market_type: MarketType, + msg: &str, +) -> Result, SimpleError> { + if market_type == MarketType::Spot { + zbg_spot::parse_candlestick(msg) + } else { + zbg_swap::parse_candlestick(market_type, msg) + } +} diff --git a/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs b/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs index 6012c32..20a1a1c 100644 --- a/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs +++ b/crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs @@ -4,7 +4,7 @@ use super::super::utils::{convert_timestamp, http_get}; use crypto_market_type::MarketType; use crypto_msg_type::MessageType; -use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide}; +use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -524,6 +524,80 @@ pub(crate) fn parse_l2(msg: &str) -> Result, SimpleError> { Ok(orderbooks) } +// https://zbgapi.github.io/docs/spot/v1/en/#market-candlestick +// [K, symbol-id, symbol, timestamp, open, high, low, close, volume, change, +// dollar-rate, period, conversion, amount. +pub(crate) fn parse_candlestick(msg: &str) -> Result, SimpleError> { + let arr = if msg.starts_with(r#"[["K","#) { + serde_json::from_str::>>(msg).map_err(|_e| { + SimpleError::new(format!("Failed to deserialize {msg} to Vec>")) + })? + } else if msg.starts_with(r#"["K","#) { + let tmp = serde_json::from_str::>(msg).map_err(|_e| { + SimpleError::new(format!("Failed to deserialize {msg} to Vec")) + })?; + vec![tmp] + } else { + return Err(SimpleError::new(format!("Invalid trade msg {msg}"))); + }; + + let mut candlestick_msgs: Vec = arr + .into_iter() + .map(|candlestick_msg| { + assert_eq!(candlestick_msg[0], "K"); + let timestamp = candlestick_msg[3].parse::().unwrap() * 1000; + let symbol = candlestick_msg[2].as_str(); + let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap(); + let period = candlestick_msg[11].as_str(); + let m_seconds = match period.to_string().pop().unwrap() { + 'M' => { + period.to_string().strip_suffix('M').unwrap().parse::().unwrap() + * 1000 + * 60 + } + 'H' => { + period.to_string().strip_suffix('M').unwrap().parse::().unwrap() + * 1000 + * 60 + * 60 + } + 'D' => { + period.to_string().strip_suffix('D').unwrap().parse::().unwrap() + * 1000 + * 24 + * 60 + * 60 + } + _ => 0, + }; + let begin_time = timestamp - m_seconds; + + CandlestickMsg { + exchange: super::EXCHANGE_NAME.to_string(), + market_type: MarketType::Spot, + symbol: symbol.to_string(), + pair, + msg_type: MessageType::Candlestick, + timestamp, + begin_time, + open: candlestick_msg[4].parse::().unwrap(), + high: candlestick_msg[5].parse::().unwrap(), + low: candlestick_msg[6].parse::().unwrap(), + close: candlestick_msg[7].parse::().unwrap(), + volume: candlestick_msg[8].parse::().unwrap(), + period: period.to_string(), + quote_volume: Some(candlestick_msg[13].parse::().unwrap()), + json: msg.to_string(), + } + }) + .collect(); + + if candlestick_msgs.len() == 1 { + candlestick_msgs[0].json = msg.to_string(); + } + Ok(candlestick_msgs) +} + #[cfg(test)] mod tests { use super::fetch_symbol_info; diff --git a/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs b/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs index 8e3f1e6..d53f706 100644 --- a/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs +++ b/crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs @@ -2,7 +2,7 @@ use crypto_market_type::MarketType; use crypto_msg_type::MessageType; use super::super::utils::{convert_timestamp, http_get}; -use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide}; +use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -135,6 +135,17 @@ struct RawOrderbookMsg { extra: HashMap, } +//https://zbgapi.github.io/docs/future/v1/en/#public-contract-kline +#[derive(Serialize, Deserialize)] +#[allow(non_snake_case)] +struct RawCandlestickMsg { + contractId: i64, + range: String, + lines: Vec<[Value; 6]>, + #[serde(flatten)] + extra: HashMap, +} + pub(super) fn extract_symbol(_market_type: MarketType, msg: &str) -> Result { if msg.contains("datas") && msg.contains("resMsg") { // RESTful @@ -304,6 +315,59 @@ pub(crate) fn parse_l2( Ok(vec![orderbook]) } +// https://zbgapi.github.io/docs/future/v1/en/#public-contract-kline +pub(crate) fn parse_candlestick( + market_type: MarketType, + msg: &str, +) -> Result, SimpleError> { + let ws_msg = serde_json::from_str::>(msg) + .map_err(|_e| SimpleError::new(format!("Failed to deserialize {msg} to Vec")))?; + assert_eq!(ws_msg[0].as_str().unwrap(), "future_kline"); + let raw_candlestick_msg: RawCandlestickMsg = serde_json::from_value(ws_msg[1].clone()) + .map_err(|_e| { + SimpleError::new(format!("Failed to deserialize {} to RawCandlestickMsg", ws_msg[1])) + })?; + let contract_info = SWAP_CONTRACT_MAP.get(&raw_candlestick_msg.contractId).unwrap(); + let symbol = contract_info.symbol.as_str(); + let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap(); + let range = raw_candlestick_msg.range; + + let candlestick_msgs: Vec = raw_candlestick_msg + .lines + .into_iter() + .map(|line| { + //let timestamp = line[0].clone().as_i64().unwrap(); + let timestamp = line[0].as_i64().unwrap(); + let open = line[1].as_str().unwrap().parse::().unwrap(); + let high = line[2].as_str().unwrap().parse::().unwrap(); + let low = line[3].as_str().unwrap().parse::().unwrap(); + let close = line[4].as_str().unwrap().parse::().unwrap(); + let size = line[5].as_str().unwrap().parse::().unwrap(); + let (volume, quote_volume) = + calc_quantity_and_volume(market_type, contract_info.contract_id, open, size); + + CandlestickMsg { + exchange: EXCHANGE_NAME.to_string(), + market_type, + msg_type: MessageType::Candlestick, + symbol: symbol.to_string(), + pair: pair.clone(), + timestamp, + period: range.clone(), + begin_time: timestamp - range.parse::().unwrap(), + open, + high, + low, + close, + volume, + quote_volume: Some(quote_volume), + json: msg.to_string(), + } + }) + .collect(); + Ok(candlestick_msgs) +} + #[cfg(test)] mod tests { use super::fetch_swap_contracts; diff --git a/crypto-msg-parser/src/lib.rs b/crypto-msg-parser/src/lib.rs index f73e204..2800eaa 100644 --- a/crypto-msg-parser/src/lib.rs +++ b/crypto-msg-parser/src/lib.rs @@ -276,6 +276,7 @@ pub fn parse_candlestick( msg, received_at.expect("OKX candlestick messages don't have timestamp"), ), + "zbg" => exchanges::zbg::parse_candlestick(market_type, msg), _ => Err(SimpleError::new(format!("Unknown exchange {exchange}"))), } } diff --git a/crypto-msg-parser/tests/zbg.rs b/crypto-msg-parser/tests/zbg.rs index 8efbc02..89a4ce8 100644 --- a/crypto-msg-parser/tests/zbg.rs +++ b/crypto-msg-parser/tests/zbg.rs @@ -308,7 +308,7 @@ mod l2_event { mod candlestick { use super::EXCHANGE_NAME; use crypto_market_type::MarketType; - use crypto_msg_parser::{extract_symbol, extract_timestamp}; + use crypto_msg_parser::{extract_symbol, extract_timestamp, parse_candlestick}; #[test] fn spot_snapshot() { @@ -320,6 +320,22 @@ mod candlestick { 1654155660000, extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap() ); + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap(); + assert_eq!(2, arr.len()); + let candlestick_msg = &arr[0]; + + assert_eq!("btc_usdt", candlestick_msg.symbol); + assert_eq!("BTC/USDT", candlestick_msg.pair); + assert_eq!(1654155660000, candlestick_msg.timestamp); + assert_eq!("1M", candlestick_msg.period); + assert_eq!(1654155600000, candlestick_msg.begin_time); + + assert_eq!(30013.78, candlestick_msg.open); + assert_eq!(30017.31, candlestick_msg.high); + assert_eq!(30003.01, candlestick_msg.low); + assert_eq!(30014.64, candlestick_msg.close); + assert_eq!(0.0227, candlestick_msg.volume); + assert_eq!(Some(0.0), candlestick_msg.quote_volume); } #[test] @@ -332,6 +348,22 @@ mod candlestick { 1654125240000, extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap() ); + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap(); + assert_eq!(1, arr.len()); + let candlestick_msg = &arr[0]; + + assert_eq!("btc_usdt", candlestick_msg.symbol); + assert_eq!("BTC/USDT", candlestick_msg.pair); + assert_eq!(1654125240000, candlestick_msg.timestamp); + assert_eq!("1M", candlestick_msg.period); + assert_eq!(1654125180000, candlestick_msg.begin_time); + + assert_eq!(29947.03, candlestick_msg.open); + assert_eq!(29976.14, candlestick_msg.high); + assert_eq!(29937.94, candlestick_msg.low); + assert_eq!(29939.95, candlestick_msg.close); + assert_eq!(0.6417, candlestick_msg.volume); + assert_eq!(Some(0.0), candlestick_msg.quote_volume); } #[test] @@ -340,13 +372,30 @@ mod candlestick { assert_eq!( "BTC_USD-R", - extract_symbol(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap() + extract_symbol(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg).unwrap() ); assert_eq!( 1652804340000, - extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap() + extract_timestamp(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg).unwrap().unwrap() ); + + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg, None).unwrap(); + assert_eq!(2, arr.len()); + let candlestick_msg = &arr[1]; + + assert_eq!("BTC_USD-R", candlestick_msg.symbol); + assert_eq!("BTC/USD", candlestick_msg.pair); + assert_eq!(1652804340000, candlestick_msg.timestamp); + assert_eq!("60000", candlestick_msg.period); + assert_eq!(1652804280000, candlestick_msg.begin_time); + //[1652804340000,"30005","30005.5","29975.5","29976","6186"] + assert_eq!(30005.0, candlestick_msg.open); + assert_eq!(30005.5, candlestick_msg.high); + assert_eq!(29975.5, candlestick_msg.low); + assert_eq!(29976.0, candlestick_msg.close); + assert_eq!(6186.0 / 30005.0, candlestick_msg.volume); + assert_eq!(Some(6186.0), candlestick_msg.quote_volume); } #[test] @@ -362,6 +411,22 @@ mod candlestick { 1648876680000, extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap() ); + let arr = parse_candlestick(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg, None).unwrap(); + assert_eq!(2, arr.len()); + let candlestick_msg = &arr[1]; + + assert_eq!("BTC_USDT", candlestick_msg.symbol); + assert_eq!("BTC/USDT", candlestick_msg.pair); + assert_eq!(1648876680000, candlestick_msg.timestamp); + assert_eq!("180000", candlestick_msg.period); + assert_eq!(1648876500000, candlestick_msg.begin_time); + + assert_eq!(46550.0, candlestick_msg.open); + assert_eq!(46615.0, candlestick_msg.high); + assert_eq!(46542.0, candlestick_msg.low); + assert_eq!(46613.5, candlestick_msg.close); + assert_eq!(1640.0 * 0.01, candlestick_msg.volume); + assert_eq!(Some(1640.0 * 0.01 * 46550.0), candlestick_msg.quote_volume); } }