Skip to content

Commit

Permalink
feat(data_feeder): fetching binance stream
Browse files Browse the repository at this point in the history
  • Loading branch information
loocapro committed Sep 9, 2024
1 parent 1241414 commit 2cc806d
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 128 deletions.
95 changes: 3 additions & 92 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ enr = "0.12"
futures.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"

tungstenite = { version = "0.21", features = ["native-tls"] }

thiserror = "1"

# misc
clap = "4"
Expand Down
14 changes: 0 additions & 14 deletions oracle/src/data_feeder/mod.rs

This file was deleted.

2 changes: 1 addition & 1 deletion oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_node_ethereum::EthereumNode;

mod cli_ext;
mod data_feeder;
mod exex;
mod network;
mod offchain_data;
mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";
Expand Down
96 changes: 96 additions & 0 deletions oracle/src/offchain_data/binance/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use ticker::{BinanceResponse, Ticker};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

use super::DataFeeder;

mod ticker;

#[derive(Error, Debug)]
pub(crate) enum BinanceDataFeederError {
/// Error connecting to the WebSocket.
#[error("error connecting to WebSocket")]
Connection(#[from] tokio_tungstenite::tungstenite::Error),
/// Error decoding the message.
#[error("error decoding message")]
Decode(#[from] serde_json::Error),
}

/// This structure controls the interaction with the Binance WebSocket API.
pub(crate) struct BinanceDataFeeder {
/// The URL of the Binance WebSocket API.
url: String,
/// The WebSocket stream.
inner: WebSocketStream<MaybeTlsStream<TcpStream>>,
}

impl BinanceDataFeeder {
/// Creates a new BinanceDataFeeder instance.
pub(crate) async fn new(symbols: Vec<String>) -> Result<Self, BinanceDataFeederError> {
let query = symbols
.iter()
.map(|symbol| format!("{}@ticker", symbol))
.collect::<Vec<String>>()
.join("/");

let url = format!("wss://stream.binance.com/stream?streams={}", query);
let (client, _) = connect_async(url.to_string()).await?;

Ok(Self { url, inner: client })
}
}

/// We implement the Stream trait for the BinanceDataFeeder struct
/// in order to encode the messages received from the WebSocket into our Ticker struct.
impl Stream for BinanceDataFeeder {
type Item = Result<Ticker, BinanceDataFeederError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
let msg = msg.into_text()?;
let resp: BinanceResponse = serde_json::from_str(&msg)?;
Poll::Ready(Some(Ok(resp.data)))
}
Poll::Ready(Some(Err(e))) => {
// we should handle reconnections here
Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

impl DataFeeder for BinanceDataFeeder {
type Item = Ticker;

/// Converts the Binance feeder into a stream of `Ticker` items.
fn into_stream(
self,
) -> Pin<Box<dyn Stream<Item = Result<Self::Item, BinanceDataFeederError>> + Send>> {
Box::pin(self)
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;

#[tokio::test]
async fn can_connect() {
let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()];
let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap();
let msg = feeder.next().await.unwrap().unwrap();
assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
use serde::{Deserialize, Serialize};

/// Struct representing the full JSON response from Binance
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct BinanceResponse {
/// Stream name (e.g., "ethusdt@ticker")
stream: String,
/// The ticker data nested inside the `data` field
pub data: Ticker,
}

/// Binance ticker data
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct Ticker {
Expand All @@ -11,7 +20,7 @@ pub(crate) struct Ticker {
event_time: u64,
/// Trading pair symbol
#[serde(rename = "s")]
symbol: String,
pub(crate) symbol: String,
/// Price change over the last 24 hours
#[serde(rename = "p")]
price_change: String,
Expand Down Expand Up @@ -73,22 +82,3 @@ pub(crate) struct Ticker {
#[serde(rename = "n")]
num_trades: u64,
}

pub(crate) struct BinanceDataFeeder {
symbols: Vec<String>,
}

impl BinanceDataFeeder {
pub(crate) fn new(symbols: Vec<String>) -> Self {
Self { symbols }
}
async fn connect(
&self,
) -> tokio_tungstenite::tungstenite::Result<
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
> {
let url = format!("wss://stream.binance.com:9443/ws/{}@ticker", self.symbol.to_lowercase());
let (ws_stream, _) = connect_async(url).await?;
Ok(ws_stream)
}
}
19 changes: 19 additions & 0 deletions oracle/src/offchain_data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::pin::Pin;

use binance::BinanceDataFeederError;
use futures::stream::Stream;

pub(crate) mod binance;

/// This trait must be implemented by any data feeder that wants to be used by the Oracle.
///
/// It takes care of feeding off-chain data to the Oracle.
pub(crate) trait DataFeeder {
/// The Item type that the stream will return.
type Item;

/// Converts the data feeder into a stream of items.
fn into_stream(
self,
) -> Pin<Box<dyn Stream<Item = Result<Self::Item, BinanceDataFeederError>> + Send>>;
}

0 comments on commit 2cc806d

Please sign in to comment.