Skip to content

Commit

Permalink
implement parse_candlestick for zb and fix bug for zbg_spot.rs (#9)
Browse files Browse the repository at this point in the history
* implement parse_candlestick for zb and fix bug for zbg_spot.rs

* Polished code style

* Sorted by alphabetical order

---------

Co-authored-by: soulmachine <[email protected]>
  • Loading branch information
AdoreWisdom and soulmachine authored Dec 31, 2023
1 parent 8d718c4 commit 3329007
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 11 deletions.
12 changes: 12 additions & 0 deletions crypto-msg-parser/src/exchanges/zb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod zb_swap;
use std::collections::HashMap;

use crypto_market_type::MarketType;
use crypto_message::CandlestickMsg;
use serde_json::Value;

use crate::{OrderBookMsg, TradeMsg};
Expand Down Expand Up @@ -86,3 +87,14 @@ pub(crate) fn parse_l2_topk(
zb_swap::parse_l2(market_type, msg)
}
}

pub(crate) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
if market_type == MarketType::Spot {
zb_spot::parse_candlestick(msg)
} else {
zb_swap::parse_candlestick(market_type, msg)
}
}
88 changes: 87 additions & 1 deletion crypto-msg-parser/src/exchanges/zb/zb_spot.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crypto_market_type::MarketType;
use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_msg_type::MessageType;
use serde_json::Value;
use simple_error::SimpleError;
use std::collections::HashMap;

use crate::exchanges::utils::calc_quantity_and_volume;

use super::EXCHANGE_NAME;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -88,6 +90,20 @@ struct L2TopKMsg {
extra: HashMap<String, Value>,
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawCandlestickMsg {
datas: Data,
channel: String,
isSuc: Value,
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct Data {
data: Vec<[Value; 6]>,
}

pub(super) fn parse_trade(msg: &str) -> Result<Vec<TradeMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<WebsocketMsg<RawTradeMsg>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<RawTradeMsg>"))
Expand Down Expand Up @@ -165,3 +181,73 @@ pub(super) fn parse_l2_topk(msg: &str) -> Result<Vec<OrderBookMsg>, SimpleError>
};
Ok(vec![orderbook])
}

// * https://www.zb.com/en/api #Market GetKline
pub(super) fn parse_candlestick(msg: &str) -> Result<Vec<CandlestickMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<RawCandlestickMsg>(msg).map_err(SimpleError::from)?;

let (symbol, period) = {
let mut arr = ws_msg.channel.split('_');
(arr.next().unwrap(), arr.last().unwrap())
};
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();

let mut m_seconds = 0;
if period.ends_with("min") {
m_seconds = period.strip_suffix("min").unwrap().parse::<i64>().unwrap() * 60 * 1000;
} else if period.ends_with("hour") {
m_seconds = period.strip_suffix("hour").unwrap().parse::<i64>().unwrap() * 60 * 60 * 1000;
} else if period.ends_with("day") {
m_seconds =
period.strip_suffix("day").unwrap().parse::<i64>().unwrap() * 60 * 60 * 24 * 1000;
} else if period.ends_with("week") {
m_seconds =
period.strip_suffix("week").unwrap().parse::<i64>().unwrap() * 60 * 60 * 24 * 7 * 1000;
}

let arr = ws_msg.datas.data;
let mut candlestick_msgs: Vec<CandlestickMsg> = arr
.into_iter()
.map(|candlestick_msg| {
let timestamp = candlestick_msg[0].as_i64().unwrap();
let begin_time = timestamp - m_seconds;

let open = candlestick_msg[1].as_f64().unwrap();
let high = candlestick_msg[2].as_f64().unwrap();
let low = candlestick_msg[3].as_f64().unwrap();
let close = candlestick_msg[4].as_f64().unwrap();
let price = (open + high + low + close) / 4.0;
let quantity = candlestick_msg[5].as_f64().unwrap();
let (volume, quote_volume, _none) = calc_quantity_and_volume(
EXCHANGE_NAME,
MarketType::Spot,
pair.as_str(),
price,
quantity,
);

CandlestickMsg {
exchange: super::EXCHANGE_NAME.to_string(),
market_type: MarketType::Spot,
symbol: symbol.to_string(),
pair: pair.clone(),
msg_type: MessageType::Candlestick,
timestamp,
begin_time,
open,
high,
low,
close,
volume,
period: period.to_string(),
quote_volume: Some(crate::round(quote_volume)),
json: msg.to_string(),
}
})
.collect();

if candlestick_msgs.len() == 1 {
candlestick_msgs[0].json = msg.to_string();
}
Ok(candlestick_msgs)
}
83 changes: 82 additions & 1 deletion crypto-msg-parser/src/exchanges/zb/zb_swap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crypto_market_type::MarketType;
use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_msg_type::MessageType;

use super::EXCHANGE_NAME;
Expand Down Expand Up @@ -138,6 +138,15 @@ struct Level2Msg {
extra: HashMap<String, Value>,
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawCandlestickMsg {
channel: String,
data: Vec<[Value; 6]>,
#[serde(rename = "type")]
type_: Option<String>, // Whole
}

/// Docs:
/// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#84-increment-depth
/// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#84-increment-depth
Expand Down Expand Up @@ -192,3 +201,75 @@ pub(super) fn parse_l2(
};
Ok(vec![orderbook])
}

/// Docs:
/// * https://github.com/ZBFuture/docs/blob/main/API%20V2%20_en.md#85-candlestick
pub(super) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<RawCandlestickMsg>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to WebsocketMsg<RawCandlestickMsg>"))
})?;

let (symbol, period) = {
let mut arr = ws_msg.channel.split('.');
(arr.next().unwrap(), arr.last().unwrap().split('_').last().unwrap())
};
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();

let mut m_seconds = 0;
if period.ends_with('M') {
m_seconds = period.strip_suffix('M').unwrap().parse::<i64>().unwrap() * 60 * 1000;
} else if period.ends_with('H') {
m_seconds = period.strip_suffix('H').unwrap().parse::<i64>().unwrap() * 60 * 60 * 1000;
} else if period.ends_with('D') {
m_seconds = period.strip_suffix('D').unwrap().parse::<i64>().unwrap() * 60 * 60 * 24 * 1000;
}

let arr = ws_msg.data;
let mut candlestick_msgs: Vec<CandlestickMsg> = arr
.into_iter()
.map(|candlestick_msg| {
let timestamp = candlestick_msg[5].as_i64().unwrap() * 1000;
let begin_time = timestamp - m_seconds;
let open = candlestick_msg[0].as_f64().unwrap();
let high = candlestick_msg[1].as_f64().unwrap();
let low = candlestick_msg[2].as_f64().unwrap();
let close = candlestick_msg[3].as_f64().unwrap();
let price = (open + high + low + close) / 4.0;
let quantity = candlestick_msg[4].as_f64().unwrap();

let (volume, quote_volume, _none) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
pair.as_str(),
price,
quantity,
);

CandlestickMsg {
exchange: super::EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.to_string(),
pair: pair.clone(),
msg_type: MessageType::Candlestick,
timestamp,
begin_time,
open,
high,
low,
close,
volume,
period: period.to_string(),
quote_volume: Some(crate::round(quote_volume)),
json: msg.to_string(),
}
})
.collect();

if candlestick_msgs.len() == 1 {
candlestick_msgs[0].json = msg.to_string();
}
Ok(candlestick_msgs)
}
6 changes: 2 additions & 4 deletions crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,7 @@ pub(crate) fn parse_l2(msg: &str) -> Result<Vec<OrderBookMsg>, SimpleError> {
// dollar-rate, period, conversion, amount.
pub(crate) fn parse_candlestick(msg: &str) -> Result<Vec<CandlestickMsg>, SimpleError> {
let arr = if msg.starts_with(r#"[["K","#) {
serde_json::from_str::<Vec<Vec<String>>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to Vec<Vec<String>>"))
})?
serde_json::from_str::<Vec<Vec<String>>>(msg).map_err(SimpleError::from)?
} else if msg.starts_with(r#"["K","#) {
let tmp = serde_json::from_str::<Vec<String>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to Vec<String>"))
Expand All @@ -556,7 +554,7 @@ pub(crate) fn parse_candlestick(msg: &str) -> Result<Vec<CandlestickMsg>, Simple
* 60
}
'H' => {
period.to_string().strip_suffix('M').unwrap().parse::<i64>().unwrap()
period.to_string().strip_suffix('H').unwrap().parse::<i64>().unwrap()
* 1000
* 60
* 60
Expand Down
3 changes: 1 addition & 2 deletions crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ pub(crate) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<Vec<Value>>(msg)
.map_err(|_e| SimpleError::new(format!("Failed to deserialize {msg} to Vec<Value>")))?;
let ws_msg = serde_json::from_str::<Vec<Value>>(msg).map_err(SimpleError::from)?;
assert_eq!(ws_msg[0].as_str().unwrap(), "future_kline");
let raw_candlestick_msg: RawCandlestickMsg = serde_json::from_value(ws_msg[1].clone())
.map_err(|_e| {
Expand Down
5 changes: 3 additions & 2 deletions crypto-msg-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,21 @@ pub fn parse_candlestick(
match exchange {
"binance" => exchanges::binance::parse_candlestick(market_type, msg),
"bitfinex" => exchanges::bitfinex::parse_candlestick(market_type, msg),
"bitmex" => exchanges::bitmex::parse_candlestick(market_type, msg),
"bitget" => exchanges::bitget::parse_candlestick(market_type, msg),
"bitmex" => exchanges::bitmex::parse_candlestick(market_type, msg),
"bybit" => exchanges::bybit::parse_candlestick(market_type, msg),
"deribit" => exchanges::deribit::parse_candlestick(market_type, msg),
"gate" => exchanges::gate::parse_candlestick(market_type, msg),
"huobi" => exchanges::huobi::parse_candlestick(market_type, msg),
"kraken" => exchanges::kraken::parse_candlestick(market_type, msg),
"kucoin" => exchanges::kucoin::parse_candlestick(market_type, msg),
"deribit" => exchanges::deribit::parse_candlestick(market_type, msg),
"mexc" => exchanges::mexc::parse_candlestick(market_type, msg, received_at),
"okx" => exchanges::okx::parse_candlestick(
market_type,
msg,
received_at.expect("OKX candlestick messages don't have timestamp"),
),
"zb" => exchanges::zb::parse_candlestick(market_type, msg),
"zbg" => exchanges::zbg::parse_candlestick(market_type, msg),
_ => Err(SimpleError::new(format!("Unknown exchange {exchange}"))),
}
Expand Down
34 changes: 33 additions & 1 deletion crypto-msg-parser/tests/zb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ mod ticker {
mod candlestick {
use super::EXCHANGE_NAME;
use crypto_market_type::MarketType;
use crypto_msg_parser::{extract_symbol, extract_timestamp};
use crypto_msg_parser::{extract_symbol, extract_timestamp, parse_candlestick};

#[test]
fn spot() {
Expand All @@ -377,6 +377,22 @@ mod candlestick {
1653782160000,
extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap()
);
let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap();
assert_eq!(2, arr.len());
let candlestick_msg = &arr[0];
//[1653782100000,29055.22,29055.22,29030.81,29032.9,19.3130]
assert_eq!("btcusdt", candlestick_msg.symbol);
assert_eq!("BTC/USDT", candlestick_msg.pair);
assert_eq!(1653782100000, candlestick_msg.timestamp);
assert_eq!("1min", candlestick_msg.period);
assert_eq!(1653782040000, candlestick_msg.begin_time);

assert_eq!(29055.22, candlestick_msg.open);
assert_eq!(29055.22, candlestick_msg.high);
assert_eq!(29030.81, candlestick_msg.low);
assert_eq!(29032.9, candlestick_msg.close);
assert_eq!(19.3130, candlestick_msg.volume);
assert_eq!(Some(560917.8397375), candlestick_msg.quote_volume);
}

#[test]
Expand All @@ -392,6 +408,22 @@ mod candlestick {
1653783840000,
extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap()
);
let arr = parse_candlestick(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg, None).unwrap();
assert_eq!(1, arr.len());
let candlestick_msg = &arr[0];
//[28993.54,28996.39,28992.58,28994.78,0.921,1653783840]
assert_eq!("BTC_USDT", candlestick_msg.symbol);
assert_eq!("BTC/USDT", candlestick_msg.pair);
assert_eq!(1653783840000, candlestick_msg.timestamp);
assert_eq!("1M", candlestick_msg.period);
assert_eq!(1653783780000, candlestick_msg.begin_time);

assert_eq!(28993.54, candlestick_msg.open);
assert_eq!(28996.39, candlestick_msg.high);
assert_eq!(28992.58, candlestick_msg.low);
assert_eq!(28994.78, candlestick_msg.close);
assert_eq!(0.921, candlestick_msg.volume);
assert_eq!(Some(26703.7710225), candlestick_msg.quote_volume);
}
}

Expand Down

0 comments on commit 3329007

Please sign in to comment.