From 2cc806d433ff23592ebc7265697a08eb4ca3b412 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Sun, 8 Sep 2024 18:35:36 +0200 Subject: [PATCH] feat(data_feeder): fetching binance stream --- Cargo.lock | 95 +----------------- oracle/Cargo.toml | 4 +- oracle/src/data_feeder/mod.rs | 14 --- oracle/src/main.rs | 2 +- oracle/src/offchain_data/binance/mod.rs | 96 +++++++++++++++++++ .../binance/ticker.rs} | 30 ++---- oracle/src/offchain_data/mod.rs | 19 ++++ 7 files changed, 132 insertions(+), 128 deletions(-) delete mode 100644 oracle/src/data_feeder/mod.rs create mode 100644 oracle/src/offchain_data/binance/mod.rs rename oracle/src/{data_feeder/binance.rs => offchain_data/binance/ticker.rs} (79%) create mode 100644 oracle/src/offchain_data/mod.rs diff --git a/Cargo.lock b/Cargo.lock index faa7490..023187a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2392,21 +2392,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -3961,23 +3946,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "native-tls" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nibble_vec" version = "0.1.0" @@ -4253,50 +4221,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl" -version = "0.10.66" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.77", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.103" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "option-ext" version = "0.2.0" @@ -4326,9 +4256,10 @@ dependencies = [ "reth-tracing", "serde", "serde_json", + "thiserror", "tokio", "tokio-stream", - "tungstenite 0.21.0", + "tokio-tungstenite", ] [[package]] @@ -8603,7 +8534,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite 0.23.0", + "tungstenite", "webpki-roots", ] @@ -8945,26 +8876,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "native-tls", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.23.0" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 62a04c5..4692a65 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -30,8 +30,10 @@ enr = "0.12" futures.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-tungstenite = "0.23" -tungstenite = { version = "0.21", features = ["native-tls"] } + +thiserror = "1" # misc clap = "4" diff --git a/oracle/src/data_feeder/mod.rs b/oracle/src/data_feeder/mod.rs deleted file mode 100644 index f2f6742..0000000 --- a/oracle/src/data_feeder/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -use futures::stream::Stream; -use serde::de::DeserializeOwned; - -pub(crate) mod binance; - -/// This trait must be implemented by any data feeder that wants to be used by the Oracle. -pub(crate) trait DataFeeder { - /// The `into_stream` method should return a stream of items that implement - /// `serde::Deserialize`. This uses `DeserializeOwned` for deserialization of the stream - /// items. - fn into_stream(&self) -> Box + Unpin> - where - T: DeserializeOwned + 'static; -} diff --git a/oracle/src/main.rs b/oracle/src/main.rs index f745c98..617cd66 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -8,9 +8,9 @@ use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; mod cli_ext; -mod data_feeder; mod exex; mod network; +mod offchain_data; mod oracle; const ORACLE_EXEX_ID: &str = "exex-oracle"; diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs new file mode 100644 index 0000000..047dce7 --- /dev/null +++ b/oracle/src/offchain_data/binance/mod.rs @@ -0,0 +1,96 @@ +use futures::{Stream, StreamExt}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; +use ticker::{BinanceResponse, Ticker}; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use super::DataFeeder; + +mod ticker; + +#[derive(Error, Debug)] +pub(crate) enum BinanceDataFeederError { + /// Error connecting to the WebSocket. + #[error("error connecting to WebSocket")] + Connection(#[from] tokio_tungstenite::tungstenite::Error), + /// Error decoding the message. + #[error("error decoding message")] + Decode(#[from] serde_json::Error), +} + +/// This structure controls the interaction with the Binance WebSocket API. +pub(crate) struct BinanceDataFeeder { + /// The URL of the Binance WebSocket API. + url: String, + /// The WebSocket stream. + inner: WebSocketStream>, +} + +impl BinanceDataFeeder { + /// Creates a new BinanceDataFeeder instance. + pub(crate) async fn new(symbols: Vec) -> Result { + let query = symbols + .iter() + .map(|symbol| format!("{}@ticker", symbol)) + .collect::>() + .join("/"); + + let url = format!("wss://stream.binance.com/stream?streams={}", query); + let (client, _) = connect_async(url.to_string()).await?; + + Ok(Self { url, inner: client }) + } +} + +/// We implement the Stream trait for the BinanceDataFeeder struct +/// in order to encode the messages received from the WebSocket into our Ticker struct. +impl Stream for BinanceDataFeeder { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + let msg = msg.into_text()?; + let resp: BinanceResponse = serde_json::from_str(&msg)?; + Poll::Ready(Some(Ok(resp.data))) + } + Poll::Ready(Some(Err(e))) => { + // we should handle reconnections here + Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl DataFeeder for BinanceDataFeeder { + type Item = Ticker; + + /// Converts the Binance feeder into a stream of `Ticker` items. + fn into_stream( + self, + ) -> Pin> + Send>> { + Box::pin(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[tokio::test] + async fn can_connect() { + let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; + let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap(); + let msg = feeder.next().await.unwrap().unwrap(); + assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT"); + } +} diff --git a/oracle/src/data_feeder/binance.rs b/oracle/src/offchain_data/binance/ticker.rs similarity index 79% rename from oracle/src/data_feeder/binance.rs rename to oracle/src/offchain_data/binance/ticker.rs index 10e8fe6..427fbfc 100644 --- a/oracle/src/data_feeder/binance.rs +++ b/oracle/src/offchain_data/binance/ticker.rs @@ -1,5 +1,14 @@ use serde::{Deserialize, Serialize}; +/// Struct representing the full JSON response from Binance +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct BinanceResponse { + /// Stream name (e.g., "ethusdt@ticker") + stream: String, + /// The ticker data nested inside the `data` field + pub data: Ticker, +} + /// Binance ticker data #[derive(Serialize, Deserialize, Debug)] pub(crate) struct Ticker { @@ -11,7 +20,7 @@ pub(crate) struct Ticker { event_time: u64, /// Trading pair symbol #[serde(rename = "s")] - symbol: String, + pub(crate) symbol: String, /// Price change over the last 24 hours #[serde(rename = "p")] price_change: String, @@ -73,22 +82,3 @@ pub(crate) struct Ticker { #[serde(rename = "n")] num_trades: u64, } - -pub(crate) struct BinanceDataFeeder { - symbols: Vec, -} - -impl BinanceDataFeeder { - pub(crate) fn new(symbols: Vec) -> Self { - Self { symbols } - } - async fn connect( - &self, - ) -> tokio_tungstenite::tungstenite::Result< - tokio_tungstenite::WebSocketStream, - > { - let url = format!("wss://stream.binance.com:9443/ws/{}@ticker", self.symbol.to_lowercase()); - let (ws_stream, _) = connect_async(url).await?; - Ok(ws_stream) - } -} diff --git a/oracle/src/offchain_data/mod.rs b/oracle/src/offchain_data/mod.rs new file mode 100644 index 0000000..8a50c0b --- /dev/null +++ b/oracle/src/offchain_data/mod.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; + +use binance::BinanceDataFeederError; +use futures::stream::Stream; + +pub(crate) mod binance; + +/// This trait must be implemented by any data feeder that wants to be used by the Oracle. +/// +/// It takes care of feeding off-chain data to the Oracle. +pub(crate) trait DataFeeder { + /// The Item type that the stream will return. + type Item; + + /// Converts the data feeder into a stream of items. + fn into_stream( + self, + ) -> Pin> + Send>>; +}