Skip to content

Commit

Permalink
implement binance parrse l2 snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
AdoreWisdom committed Jul 7, 2024
1 parent 4ee4356 commit d785f48
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 4 deletions.
120 changes: 120 additions & 0 deletions crypto-msg-parser/src/exchanges/binance/binance_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ struct WebsocketMsg<T: Sized> {
data: T,
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawL2SnapshotInverseMsg {
lastUpdateId: u64, // Last update ID
E: i64,
T: i64,
symbol: String,
pair: String,
bids: Vec<RawOrder>,
asks: Vec<RawOrder>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}

#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawL2SnapshotLinearMsg {
lastUpdateId: u64, // Last update ID
E: i64,
T: i64,
bids: Vec<RawOrder>,
asks: Vec<RawOrder>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}

pub(super) fn parse_trade(
market_type: MarketType,
msg: &str,
Expand Down Expand Up @@ -216,6 +242,100 @@ pub(super) fn parse_l2_topk(
}
}

///
pub(super) fn parse_l2_snapshot(
market_type: MarketType,
msg: &str,
symbol: Option<&str>,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
match market_type {
MarketType::InverseFuture | MarketType::InverseSwap => {
parse_l2_snapshot_inverse(market_type, msg)
}

MarketType::LinearFuture | MarketType::LinearSwap => {
parse_l2_snapshot_linear(market_type, msg, symbol)
}
_ => Err(SimpleError::new("Not implemented")),
}
}

pub(super) fn parse_l2_snapshot_inverse(
market_type: MarketType,
msg: &str,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<RawL2SnapshotInverseMsg>(msg).map_err(SimpleError::from)?;
let pair = crypto_pair::normalize_pair(&ws_msg.symbol, EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", &ws_msg.symbol, msg))
})?;

let parse_order = |raw_order: &RawOrder| -> Order {
let price = raw_order[0].parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
price,
raw_order[1].parse::<f64>().unwrap(),
);
Order { price, quantity_base, quantity_quote, quantity_contract }
};

let orderbook = OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: ws_msg.symbol.clone(),
pair: pair.clone(),
msg_type: MessageType::L2Snapshot,
timestamp: ws_msg.E,
seq_id: Some(ws_msg.lastUpdateId),
prev_seq_id: None,
asks: ws_msg.asks.iter().map(parse_order).collect::<Vec<Order>>(),
bids: ws_msg.bids.iter().map(parse_order).collect::<Vec<Order>>(),
snapshot: true,
json: msg.to_string(),
};
Ok(vec![orderbook])
}

pub(super) fn parse_l2_snapshot_linear(
market_type: MarketType,
msg: &str,
symbol: Option<&str>,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<RawL2SnapshotLinearMsg>(msg).map_err(SimpleError::from)?;
let pair = crypto_pair::normalize_pair(symbol.unwrap(), EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", symbol.unwrap(), msg))
})?;

let parse_order = |raw_order: &RawOrder| -> Order {
let price = raw_order[0].parse::<f64>().unwrap();
let (quantity_base, quantity_quote, quantity_contract) = calc_quantity_and_volume(
EXCHANGE_NAME,
market_type,
&pair,
price,
raw_order[1].parse::<f64>().unwrap(),
);
Order { price, quantity_base, quantity_quote, quantity_contract }
};

let orderbook = OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type,
symbol: symbol.unwrap().to_string(),
pair: pair.clone(),
msg_type: MessageType::L2Snapshot,
timestamp: ws_msg.E,
seq_id: Some(ws_msg.lastUpdateId),
prev_seq_id: None,
asks: ws_msg.asks.iter().map(parse_order).collect::<Vec<Order>>(),
bids: ws_msg.bids.iter().map(parse_order).collect::<Vec<Order>>(),
snapshot: true,
json: msg.to_string(),
};
Ok(vec![orderbook])
}
/// docs:
/// * https://binance-docs.github.io/apidocs/spot/en/#all-book-tickers-stream
/// * https://binance-docs.github.io/apidocs/futures/en/#all-book-tickers-stream
Expand Down
54 changes: 54 additions & 0 deletions crypto-msg-parser/src/exchanges/binance/binance_spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ struct WebsocketMsg<T: Sized> {
data: T,
}

// See https://binance-docs.github.io/apidocs/spot/en/#order-book
#[derive(Serialize, Deserialize)]
#[allow(non_snake_case)]
struct RawL2SnapshotMsg {
lastUpdateId: u64, // Last update ID
bids: Vec<RawOrder>,
asks: Vec<RawOrder>,
#[serde(flatten)]
extra: HashMap<String, Value>,
}

// The @depth20 payload of spot has quite different format from contracts.
pub(super) fn parse_l2_topk(
msg: &str,
Expand Down Expand Up @@ -70,3 +81,46 @@ pub(super) fn parse_l2_topk(
};
Ok(vec![orderbook])
}

// binance l2 snapshot data is quite large
pub(super) fn parse_l2_snapshot(
msg: &str,
symbol: Option<&str>,
received_at: Option<i64>,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ws_msg = serde_json::from_str::<RawL2SnapshotMsg>(msg).map_err(|_e| {
SimpleError::new(format!("Failed to deserialize {msg} to RawL2SnapshotMsg"))
})?;

let pair = crypto_pair::normalize_pair(symbol.unwrap(), EXCHANGE_NAME).ok_or_else(|| {
SimpleError::new(format!("Failed to normalize {} from {}", symbol.unwrap(), msg))
})?;
let timestamp = received_at.expect("Binance spot L2 Snapshot doesn't have timestamp");

let parse_order = |raw_order: &RawOrder| -> Order {
let price = raw_order[0].parse::<f64>().unwrap();
let quantity_base = raw_order[1].parse::<f64>().unwrap();
Order {
price,
quantity_base,
quantity_quote: price * quantity_base,
quantity_contract: None,
}
};

let orderbook = OrderBookMsg {
exchange: EXCHANGE_NAME.to_string(),
market_type: MarketType::Spot,
symbol: symbol.unwrap().to_string(),
pair,
msg_type: MessageType::L2Snapshot,
timestamp,
seq_id: Some(ws_msg.lastUpdateId),
prev_seq_id: None,
asks: ws_msg.asks.iter().map(parse_order).collect::<Vec<Order>>(),
bids: ws_msg.bids.iter().map(parse_order).collect::<Vec<Order>>(),
snapshot: true,
json: msg.to_string(),
};
Ok(vec![orderbook])
}
19 changes: 18 additions & 1 deletion crypto-msg-parser/src/exchanges/binance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ pub(crate) fn extract_timestamp(msg: &str) -> Result<Option<i64>, SimpleError> {

pub(crate) fn get_msg_type(msg: &str) -> MessageType {
if let Ok(obj) = serde_json::from_str::<HashMap<String, Value>>(msg) {
if let Some(stream) = obj.get("stream").unwrap().as_str() {
if obj.get("stream").is_none() {
MessageType::L2Snapshot
} else if let Some(stream) = obj.get("stream").unwrap().as_str() {
if stream.ends_with("@aggTrade") {
MessageType::Trade
} else if stream.ends_with("@depth") || stream.ends_with("@depth@100ms") {
Expand Down Expand Up @@ -168,3 +170,18 @@ pub(crate) fn parse_candlestick(
) -> Result<Vec<CandlestickMsg>, SimpleError> {
binance_all::parse_candlestick(market_type, msg)
}

pub(crate) fn parse_l2_snapshot(
market_type: MarketType,
msg: &str,
symbol: Option<&str>,
received_at: Option<i64>,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
if market_type == MarketType::EuropeanOption {
Err(SimpleError::new("Not implemented"))
} else if market_type == MarketType::Spot {
binance_spot::parse_l2_snapshot(msg, symbol, received_at)
} else {
binance_all::parse_l2_snapshot(market_type, msg, symbol)
}
}
28 changes: 28 additions & 0 deletions crypto-msg-parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,34 @@ pub fn parse_candlestick(
}
}

/// Parse level2 snapshot orderbook messages.
pub fn parse_l2_snapshot(
exchange: &str,
market_type: MarketType,
msg: &str,
symbol: Option<&str>,
received_at: Option<i64>,
) -> Result<Vec<OrderBookMsg>, SimpleError> {
let ret = match exchange {
"binance" => exchanges::binance::parse_l2_snapshot(market_type, msg, symbol, received_at),
_ => Err(SimpleError::new(format!("Unknown exchange {exchange}"))),
};
match ret {
Ok(mut orderbooks) => {
for orderbook in orderbooks.iter_mut() {
if orderbook.snapshot {
// sorted in ascending order by price
orderbook.asks.sort_by(|a, b| a.price.partial_cmp(&b.price).unwrap());
// sorted in descending order by price
orderbook.bids.sort_by(|a, b| b.price.partial_cmp(&a.price).unwrap());
}
}
Ok(orderbooks)
}
Err(_) => ret,
}
}

/// Infer the message type from the message.
pub fn get_msg_type(exchange: &str, msg: &str) -> MessageType {
match exchange {
Expand Down
Loading

0 comments on commit d785f48

Please sign in to comment.