Skip to content

Commit

Permalink
implement zbg parse_candlestick (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdoreWisdom authored Dec 24, 2023
1 parent c40a908 commit 8d718c4
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 6 deletions.
13 changes: 12 additions & 1 deletion crypto-msg-parser/src/exchanges/zbg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod zbg_swap;

use crypto_market_type::MarketType;

use crate::{OrderBookMsg, TradeMsg};
use crate::{CandlestickMsg, OrderBookMsg, TradeMsg};

use simple_error::SimpleError;

Expand Down Expand Up @@ -49,3 +49,14 @@ pub(crate) fn parse_l2(
zbg_swap::parse_l2(market_type, msg)
}
}

pub(crate) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
if market_type == MarketType::Spot {
zbg_spot::parse_candlestick(msg)
} else {
zbg_swap::parse_candlestick(market_type, msg)
}
}
76 changes: 75 additions & 1 deletion crypto-msg-parser/src/exchanges/zbg/zbg_spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::super::utils::{convert_timestamp, http_get};
use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;

use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide};

use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -524,6 +524,80 @@ pub(crate) fn parse_l2(msg: &str) -> Result<Vec<OrderBookMsg>, SimpleError> {
Ok(orderbooks)
}

// https://zbgapi.github.io/docs/spot/v1/en/#market-candlestick
// [K, symbol-id, symbol, timestamp, open, high, low, close, volume, change,
// dollar-rate, period, conversion, amount.
pub(crate) fn parse_candlestick(msg: &str) -> Result<Vec<CandlestickMsg>, SimpleError> {
let arr = if msg.starts_with(r#"[["K","#) {
serde_json::from_str::<Vec<Vec<String>>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to Vec<Vec<String>>"))
})?
} else if msg.starts_with(r#"["K","#) {
let tmp = serde_json::from_str::<Vec<String>>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to Vec<String>"))
})?;
vec![tmp]
} else {
return Err(SimpleError::new(format!("Invalid trade msg {msg}")));
};

let mut candlestick_msgs: Vec<CandlestickMsg> = arr
.into_iter()
.map(|candlestick_msg| {
assert_eq!(candlestick_msg[0], "K");
let timestamp = candlestick_msg[3].parse::<i64>().unwrap() * 1000;
let symbol = candlestick_msg[2].as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let period = candlestick_msg[11].as_str();
let m_seconds = match period.to_string().pop().unwrap() {
'M' => {
period.to_string().strip_suffix('M').unwrap().parse::<i64>().unwrap()
* 1000
* 60
}
'H' => {
period.to_string().strip_suffix('M').unwrap().parse::<i64>().unwrap()
* 1000
* 60
* 60
}
'D' => {
period.to_string().strip_suffix('D').unwrap().parse::<i64>().unwrap()
* 1000
* 24
* 60
* 60
}
_ => 0,
};
let begin_time = timestamp - m_seconds;

CandlestickMsg {
exchange: super::EXCHANGE_NAME.to_string(),
market_type: MarketType::Spot,
symbol: symbol.to_string(),
pair,
msg_type: MessageType::Candlestick,
timestamp,
begin_time,
open: candlestick_msg[4].parse::<f64>().unwrap(),
high: candlestick_msg[5].parse::<f64>().unwrap(),
low: candlestick_msg[6].parse::<f64>().unwrap(),
close: candlestick_msg[7].parse::<f64>().unwrap(),
volume: candlestick_msg[8].parse::<f64>().unwrap(),
period: period.to_string(),
quote_volume: Some(candlestick_msg[13].parse::<f64>().unwrap()),
json: msg.to_string(),
}
})
.collect();

if candlestick_msgs.len() == 1 {
candlestick_msgs[0].json = msg.to_string();
}
Ok(candlestick_msgs)
}

#[cfg(test)]
mod tests {
use super::fetch_symbol_info;
Expand Down
66 changes: 65 additions & 1 deletion crypto-msg-parser/src/exchanges/zbg/zbg_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crypto_market_type::MarketType;
use crypto_msg_type::MessageType;

use super::super::utils::{convert_timestamp, http_get};
use crypto_message::{Order, OrderBookMsg, TradeMsg, TradeSide};
use crypto_message::{CandlestickMsg, Order, OrderBookMsg, TradeMsg, TradeSide};

use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,6 +135,17 @@ struct RawOrderbookMsg {
extra: HashMap<String, Value>,
}

//https://zbgapi.github.io/docs/future/v1/en/#public-contract-kline
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawCandlestickMsg {
contractId: i64,
range: String,
lines: Vec<[Value; 6]>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}

pub(super) fn extract_symbol(_market_type: MarketType, msg: &str) -> Result<String, SimpleError> {
if msg.contains("datas") && msg.contains("resMsg") {
// RESTful
Expand Down Expand Up @@ -304,6 +315,59 @@ pub(crate) fn parse_l2(
Ok(vec![orderbook])
}

// https://zbgapi.github.io/docs/future/v1/en/#public-contract-kline
pub(crate) fn parse_candlestick(
market_type: MarketType,
msg: &str,
) -> Result<Vec<CandlestickMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<Vec<Value>>(msg)
.map_err(|_e| SimpleError::new(format!("Failed to deserialize {msg} to Vec<Value>")))?;
assert_eq!(ws_msg[0].as_str().unwrap(), "future_kline");
let raw_candlestick_msg: RawCandlestickMsg = serde_json::from_value(ws_msg[1].clone())
.map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {} to RawCandlestickMsg", ws_msg[1]))
})?;
let contract_info = SWAP_CONTRACT_MAP.get(&raw_candlestick_msg.contractId).unwrap();
let symbol = contract_info.symbol.as_str();
let pair = crypto_pair::normalize_pair(symbol, EXCHANGE_NAME).unwrap();
let range = raw_candlestick_msg.range;

let candlestick_msgs: Vec<CandlestickMsg> = raw_candlestick_msg
.lines
.into_iter()
.map(|line| {
//let timestamp = line[0].clone().as_i64().unwrap();
let timestamp = line[0].as_i64().unwrap();
let open = line[1].as_str().unwrap().parse::<f64>().unwrap();
let high = line[2].as_str().unwrap().parse::<f64>().unwrap();
let low = line[3].as_str().unwrap().parse::<f64>().unwrap();
let close = line[4].as_str().unwrap().parse::<f64>().unwrap();
let size = line[5].as_str().unwrap().parse::<f64>().unwrap();
let (volume, quote_volume) =
calc_quantity_and_volume(market_type, contract_info.contract_id, open, size);

CandlestickMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
msg_type: MessageType::Candlestick,
symbol: symbol.to_string(),
pair: pair.clone(),
timestamp,
period: range.clone(),
begin_time: timestamp - range.parse::<i64>().unwrap(),
open,
high,
low,
close,
volume,
quote_volume: Some(quote_volume),
json: msg.to_string(),
}
})
.collect();
Ok(candlestick_msgs)
}

#[cfg(test)]
mod tests {
use super::fetch_swap_contracts;
Expand Down
1 change: 1 addition & 0 deletions crypto-msg-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub fn parse_candlestick(
msg,
received_at.expect("OKX candlestick messages don't have timestamp"),
),
"zbg" => exchanges::zbg::parse_candlestick(market_type, msg),
_ => Err(SimpleError::new(format!("Unknown exchange {exchange}"))),
}
}
Expand Down
71 changes: 68 additions & 3 deletions crypto-msg-parser/tests/zbg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mod l2_event {
mod candlestick {
use super::EXCHANGE_NAME;
use crypto_market_type::MarketType;
use crypto_msg_parser::{extract_symbol, extract_timestamp};
use crypto_msg_parser::{extract_symbol, extract_timestamp, parse_candlestick};

#[test]
fn spot_snapshot() {
Expand All @@ -320,6 +320,22 @@ mod candlestick {
1654155660000,
extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap()
);
let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap();
assert_eq!(2, arr.len());
let candlestick_msg = &arr[0];

assert_eq!("btc_usdt", candlestick_msg.symbol);
assert_eq!("BTC/USDT", candlestick_msg.pair);
assert_eq!(1654155660000, candlestick_msg.timestamp);
assert_eq!("1M", candlestick_msg.period);
assert_eq!(1654155600000, candlestick_msg.begin_time);

assert_eq!(30013.78, candlestick_msg.open);
assert_eq!(30017.31, candlestick_msg.high);
assert_eq!(30003.01, candlestick_msg.low);
assert_eq!(30014.64, candlestick_msg.close);
assert_eq!(0.0227, candlestick_msg.volume);
assert_eq!(Some(0.0), candlestick_msg.quote_volume);
}

#[test]
Expand All @@ -332,6 +348,22 @@ mod candlestick {
1654125240000,
extract_timestamp(EXCHANGE_NAME, MarketType::Spot, raw_msg).unwrap().unwrap()
);
let arr = parse_candlestick(EXCHANGE_NAME, MarketType::Spot, raw_msg, None).unwrap();
assert_eq!(1, arr.len());
let candlestick_msg = &arr[0];

assert_eq!("btc_usdt", candlestick_msg.symbol);
assert_eq!("BTC/USDT", candlestick_msg.pair);
assert_eq!(1654125240000, candlestick_msg.timestamp);
assert_eq!("1M", candlestick_msg.period);
assert_eq!(1654125180000, candlestick_msg.begin_time);

assert_eq!(29947.03, candlestick_msg.open);
assert_eq!(29976.14, candlestick_msg.high);
assert_eq!(29937.94, candlestick_msg.low);
assert_eq!(29939.95, candlestick_msg.close);
assert_eq!(0.6417, candlestick_msg.volume);
assert_eq!(Some(0.0), candlestick_msg.quote_volume);
}

#[test]
Expand All @@ -340,13 +372,30 @@ mod candlestick {

assert_eq!(
"BTC_USD-R",
extract_symbol(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap()
extract_symbol(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg).unwrap()
);

assert_eq!(
1652804340000,
extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap()
extract_timestamp(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg).unwrap().unwrap()
);

let arr = parse_candlestick(EXCHANGE_NAME, MarketType::InverseSwap, raw_msg, None).unwrap();
assert_eq!(2, arr.len());
let candlestick_msg = &arr[1];

assert_eq!("BTC_USD-R", candlestick_msg.symbol);
assert_eq!("BTC/USD", candlestick_msg.pair);
assert_eq!(1652804340000, candlestick_msg.timestamp);
assert_eq!("60000", candlestick_msg.period);
assert_eq!(1652804280000, candlestick_msg.begin_time);
//[1652804340000,"30005","30005.5","29975.5","29976","6186"]
assert_eq!(30005.0, candlestick_msg.open);
assert_eq!(30005.5, candlestick_msg.high);
assert_eq!(29975.5, candlestick_msg.low);
assert_eq!(29976.0, candlestick_msg.close);
assert_eq!(6186.0 / 30005.0, candlestick_msg.volume);
assert_eq!(Some(6186.0), candlestick_msg.quote_volume);
}

#[test]
Expand All @@ -362,6 +411,22 @@ mod candlestick {
1648876680000,
extract_timestamp(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg).unwrap().unwrap()
);
let arr = parse_candlestick(EXCHANGE_NAME, MarketType::LinearSwap, raw_msg, None).unwrap();
assert_eq!(2, arr.len());
let candlestick_msg = &arr[1];

assert_eq!("BTC_USDT", candlestick_msg.symbol);
assert_eq!("BTC/USDT", candlestick_msg.pair);
assert_eq!(1648876680000, candlestick_msg.timestamp);
assert_eq!("180000", candlestick_msg.period);
assert_eq!(1648876500000, candlestick_msg.begin_time);

assert_eq!(46550.0, candlestick_msg.open);
assert_eq!(46615.0, candlestick_msg.high);
assert_eq!(46542.0, candlestick_msg.low);
assert_eq!(46613.5, candlestick_msg.close);
assert_eq!(1640.0 * 0.01, candlestick_msg.volume);
assert_eq!(Some(1640.0 * 0.01 * 46550.0), candlestick_msg.quote_volume);
}
}

Expand Down

0 comments on commit 8d718c4

Please sign in to comment.