From 1241414a34bc93d4b778a5aff59487326e5ce471 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Sun, 8 Sep 2024 16:45:22 +0200 Subject: [PATCH 01/14] moving on --- Cargo.lock | 95 ++++++++++++++++++++++++++++++- oracle/Cargo.toml | 4 ++ oracle/src/data_feeder/binance.rs | 94 ++++++++++++++++++++++++++++++ oracle/src/data_feeder/mod.rs | 14 +++++ oracle/src/main.rs | 1 + 5 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 oracle/src/data_feeder/binance.rs create mode 100644 oracle/src/data_feeder/mod.rs diff --git a/Cargo.lock b/Cargo.lock index eb92281..faa7490 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2392,6 +2392,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -3946,6 +3961,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -4221,12 +4253,50 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4254,8 +4324,11 @@ dependencies = [ "reth-primitives", "reth-rpc-types", "reth-tracing", + "serde", + "serde_json", "tokio", "tokio-stream", + "tungstenite 0.21.0", ] [[package]] @@ -8530,7 +8603,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.23.0", "webpki-roots", ] @@ -8872,6 +8945,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.23.0" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 419bfc4..62a04c5 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -31,6 +31,10 @@ futures.workspace = true tokio.workspace = true tokio-stream.workspace = true +tungstenite = { version = "0.21", features = ["native-tls"] } + # misc clap = "4" eyre.workspace = true +serde = "1" +serde_json = "1" diff --git a/oracle/src/data_feeder/binance.rs b/oracle/src/data_feeder/binance.rs new file mode 100644 index 0000000..10e8fe6 --- /dev/null +++ b/oracle/src/data_feeder/binance.rs @@ -0,0 +1,94 @@ +use serde::{Deserialize, Serialize}; + +/// Binance ticker data +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct Ticker { + /// Event type (e.g., "24hrTicker") + #[serde(rename = "e")] + event_type: String, + /// Event time (timestamp) + #[serde(rename = "E")] + event_time: u64, + /// Trading pair symbol + #[serde(rename = "s")] + symbol: String, + /// Price change over the last 24 hours + #[serde(rename = "p")] + price_change: String, + /// Price change percent + #[serde(rename = "P")] + price_change_percent: String, + /// Weighted average price + #[serde(rename = "w")] + weighted_avg_price: String, + /// Previous day's close price + #[serde(rename = "x")] + prev_close_price: String, + /// Current price (last trade price) + #[serde(rename = "c")] + last_price: String, + /// Last quantity traded + #[serde(rename = "Q")] + last_quantity: String, + /// Best bid price + #[serde(rename = "b")] + best_bid_price: String, + /// Best bid quantity + #[serde(rename = "B")] + best_bid_quantity: String, + /// Best ask price + #[serde(rename = "a")] + best_ask_price: String, + /// Best ask quantity + #[serde(rename = "A")] + best_ask_quantity: String, + /// Open price for the 24-hour period + #[serde(rename = "o")] + open_price: String, + /// High price of the 24-hour period + #[serde(rename = "h")] + high_price: String, + /// Low price of the 24-hour period + #[serde(rename = "l")] + low_price: String, + /// Total traded volume of the base asset + #[serde(rename = "v")] + volume: String, + /// Total traded volume of the quote asset + #[serde(rename = "q")] + quote_volume: String, + /// Open time (timestamp) + #[serde(rename = "O")] + open_time: u64, + /// Close time (timestamp) + #[serde(rename = "C")] + close_time: u64, + /// First trade ID + #[serde(rename = "F")] + first_trade_id: u64, + /// Last trade ID + #[serde(rename = "L")] + last_trade_id: u64, + /// Total number of trades + #[serde(rename = "n")] + num_trades: u64, +} + +pub(crate) struct BinanceDataFeeder { + symbols: Vec, +} + +impl BinanceDataFeeder { + pub(crate) fn new(symbols: Vec) -> Self { + Self { symbols } + } + async fn connect( + &self, + ) -> tokio_tungstenite::tungstenite::Result< + tokio_tungstenite::WebSocketStream, + > { + let url = format!("wss://stream.binance.com:9443/ws/{}@ticker", self.symbol.to_lowercase()); + let (ws_stream, _) = connect_async(url).await?; + Ok(ws_stream) + } +} diff --git a/oracle/src/data_feeder/mod.rs b/oracle/src/data_feeder/mod.rs new file mode 100644 index 0000000..f2f6742 --- /dev/null +++ b/oracle/src/data_feeder/mod.rs @@ -0,0 +1,14 @@ +use futures::stream::Stream; +use serde::de::DeserializeOwned; + +pub(crate) mod binance; + +/// This trait must be implemented by any data feeder that wants to be used by the Oracle. +pub(crate) trait DataFeeder { + /// The `into_stream` method should return a stream of items that implement + /// `serde::Deserialize`. This uses `DeserializeOwned` for deserialization of the stream + /// items. + fn into_stream(&self) -> Box + Unpin> + where + T: DeserializeOwned + 'static; +} diff --git a/oracle/src/main.rs b/oracle/src/main.rs index e2683dc..f745c98 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -8,6 +8,7 @@ use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; mod cli_ext; +mod data_feeder; mod exex; mod network; mod oracle; From 2cc806d433ff23592ebc7265697a08eb4ca3b412 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Sun, 8 Sep 2024 18:35:36 +0200 Subject: [PATCH 02/14] feat(data_feeder): fetching binance stream --- Cargo.lock | 95 +----------------- oracle/Cargo.toml | 4 +- oracle/src/data_feeder/mod.rs | 14 --- oracle/src/main.rs | 2 +- oracle/src/offchain_data/binance/mod.rs | 96 +++++++++++++++++++ .../binance/ticker.rs} | 30 ++---- oracle/src/offchain_data/mod.rs | 19 ++++ 7 files changed, 132 insertions(+), 128 deletions(-) delete mode 100644 oracle/src/data_feeder/mod.rs create mode 100644 oracle/src/offchain_data/binance/mod.rs rename oracle/src/{data_feeder/binance.rs => offchain_data/binance/ticker.rs} (79%) create mode 100644 oracle/src/offchain_data/mod.rs diff --git a/Cargo.lock b/Cargo.lock index faa7490..023187a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2392,21 +2392,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -3961,23 +3946,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "native-tls" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nibble_vec" version = "0.1.0" @@ -4253,50 +4221,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl" -version = "0.10.66" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.77", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.103" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "option-ext" version = "0.2.0" @@ -4326,9 +4256,10 @@ dependencies = [ "reth-tracing", "serde", "serde_json", + "thiserror", "tokio", "tokio-stream", - "tungstenite 0.21.0", + "tokio-tungstenite", ] [[package]] @@ -8603,7 +8534,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite 0.23.0", + "tungstenite", "webpki-roots", ] @@ -8945,26 +8876,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "native-tls", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.23.0" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 62a04c5..4692a65 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -30,8 +30,10 @@ enr = "0.12" futures.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-tungstenite = "0.23" -tungstenite = { version = "0.21", features = ["native-tls"] } + +thiserror = "1" # misc clap = "4" diff --git a/oracle/src/data_feeder/mod.rs b/oracle/src/data_feeder/mod.rs deleted file mode 100644 index f2f6742..0000000 --- a/oracle/src/data_feeder/mod.rs +++ /dev/null @@ -1,14 +0,0 @@ -use futures::stream::Stream; -use serde::de::DeserializeOwned; - -pub(crate) mod binance; - -/// This trait must be implemented by any data feeder that wants to be used by the Oracle. -pub(crate) trait DataFeeder { - /// The `into_stream` method should return a stream of items that implement - /// `serde::Deserialize`. This uses `DeserializeOwned` for deserialization of the stream - /// items. - fn into_stream(&self) -> Box + Unpin> - where - T: DeserializeOwned + 'static; -} diff --git a/oracle/src/main.rs b/oracle/src/main.rs index f745c98..617cd66 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -8,9 +8,9 @@ use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; mod cli_ext; -mod data_feeder; mod exex; mod network; +mod offchain_data; mod oracle; const ORACLE_EXEX_ID: &str = "exex-oracle"; diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs new file mode 100644 index 0000000..047dce7 --- /dev/null +++ b/oracle/src/offchain_data/binance/mod.rs @@ -0,0 +1,96 @@ +use futures::{Stream, StreamExt}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; +use ticker::{BinanceResponse, Ticker}; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use super::DataFeeder; + +mod ticker; + +#[derive(Error, Debug)] +pub(crate) enum BinanceDataFeederError { + /// Error connecting to the WebSocket. + #[error("error connecting to WebSocket")] + Connection(#[from] tokio_tungstenite::tungstenite::Error), + /// Error decoding the message. + #[error("error decoding message")] + Decode(#[from] serde_json::Error), +} + +/// This structure controls the interaction with the Binance WebSocket API. +pub(crate) struct BinanceDataFeeder { + /// The URL of the Binance WebSocket API. + url: String, + /// The WebSocket stream. + inner: WebSocketStream>, +} + +impl BinanceDataFeeder { + /// Creates a new BinanceDataFeeder instance. + pub(crate) async fn new(symbols: Vec) -> Result { + let query = symbols + .iter() + .map(|symbol| format!("{}@ticker", symbol)) + .collect::>() + .join("/"); + + let url = format!("wss://stream.binance.com/stream?streams={}", query); + let (client, _) = connect_async(url.to_string()).await?; + + Ok(Self { url, inner: client }) + } +} + +/// We implement the Stream trait for the BinanceDataFeeder struct +/// in order to encode the messages received from the WebSocket into our Ticker struct. +impl Stream for BinanceDataFeeder { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + let msg = msg.into_text()?; + let resp: BinanceResponse = serde_json::from_str(&msg)?; + Poll::Ready(Some(Ok(resp.data))) + } + Poll::Ready(Some(Err(e))) => { + // we should handle reconnections here + Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl DataFeeder for BinanceDataFeeder { + type Item = Ticker; + + /// Converts the Binance feeder into a stream of `Ticker` items. + fn into_stream( + self, + ) -> Pin> + Send>> { + Box::pin(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[tokio::test] + async fn can_connect() { + let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; + let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap(); + let msg = feeder.next().await.unwrap().unwrap(); + assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT"); + } +} diff --git a/oracle/src/data_feeder/binance.rs b/oracle/src/offchain_data/binance/ticker.rs similarity index 79% rename from oracle/src/data_feeder/binance.rs rename to oracle/src/offchain_data/binance/ticker.rs index 10e8fe6..427fbfc 100644 --- a/oracle/src/data_feeder/binance.rs +++ b/oracle/src/offchain_data/binance/ticker.rs @@ -1,5 +1,14 @@ use serde::{Deserialize, Serialize}; +/// Struct representing the full JSON response from Binance +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct BinanceResponse { + /// Stream name (e.g., "ethusdt@ticker") + stream: String, + /// The ticker data nested inside the `data` field + pub data: Ticker, +} + /// Binance ticker data #[derive(Serialize, Deserialize, Debug)] pub(crate) struct Ticker { @@ -11,7 +20,7 @@ pub(crate) struct Ticker { event_time: u64, /// Trading pair symbol #[serde(rename = "s")] - symbol: String, + pub(crate) symbol: String, /// Price change over the last 24 hours #[serde(rename = "p")] price_change: String, @@ -73,22 +82,3 @@ pub(crate) struct Ticker { #[serde(rename = "n")] num_trades: u64, } - -pub(crate) struct BinanceDataFeeder { - symbols: Vec, -} - -impl BinanceDataFeeder { - pub(crate) fn new(symbols: Vec) -> Self { - Self { symbols } - } - async fn connect( - &self, - ) -> tokio_tungstenite::tungstenite::Result< - tokio_tungstenite::WebSocketStream, - > { - let url = format!("wss://stream.binance.com:9443/ws/{}@ticker", self.symbol.to_lowercase()); - let (ws_stream, _) = connect_async(url).await?; - Ok(ws_stream) - } -} diff --git a/oracle/src/offchain_data/mod.rs b/oracle/src/offchain_data/mod.rs new file mode 100644 index 0000000..8a50c0b --- /dev/null +++ b/oracle/src/offchain_data/mod.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; + +use binance::BinanceDataFeederError; +use futures::stream::Stream; + +pub(crate) mod binance; + +/// This trait must be implemented by any data feeder that wants to be used by the Oracle. +/// +/// It takes care of feeding off-chain data to the Oracle. +pub(crate) trait DataFeeder { + /// The Item type that the stream will return. + type Item; + + /// Converts the data feeder into a stream of items. + fn into_stream( + self, + ) -> Pin> + Send>>; +} From 323da5f38a7ff132513ec7782b1c80b6d01881eb Mon Sep 17 00:00:00 2001 From: Loocapro Date: Mon, 9 Sep 2024 19:41:25 +0200 Subject: [PATCH 03/14] feat(data_feeder): wrapping binance stream and polling from the oracle, also added cli args --- oracle/src/cli_ext.rs | 34 +++++++++++++ oracle/src/main.rs | 4 +- oracle/src/offchain_data/binance/mod.rs | 19 +------- oracle/src/offchain_data/mod.rs | 64 +++++++++++++++++++------ oracle/src/oracle.rs | 24 ++++++++-- 5 files changed, 108 insertions(+), 37 deletions(-) diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs index 86a212c..40cca3b 100644 --- a/oracle/src/cli_ext.rs +++ b/oracle/src/cli_ext.rs @@ -2,6 +2,7 @@ use clap::Args; pub const DEFAULT_DISCOVERY_PORT: u16 = 30304; pub const DEFAULT_RLPX_PORT: u16 = 30303; +pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdt,ethusdt"; #[derive(Debug, Clone, Args)] pub(crate) struct OracleExt { @@ -12,4 +13,37 @@ pub(crate) struct OracleExt { /// UDP port used for discovery #[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)] pub udp_port: u16, + + /// The list of symbols to configure the stream of prices from binance. + #[clap(long = "data.symbols", use_value_delimiter = true, default_value = DEFAULT_BINANCE_SYMBOLS)] + pub binance_symbols: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::{Args, Parser}; + + /// A helper type to parse Args more easily + #[derive(Parser)] + struct CommandParser { + #[clap(flatten)] + args: T, + } + + #[test] + fn test_oracle_ext() { + let cli = CommandParser::::parse_from(&[ + "test", + "--disc.tcp-port", + "30304", + "--disc.udp-port", + "30303", + "--data.symbols", + "btcusdt,ethusdt", + ]); + assert_eq!(cli.args.tcp_port, 30304); + assert_eq!(cli.args.udp_port, 30303); + assert_eq!(cli.args.binance_symbols, vec!["btcusdt", "ethusdt"]); + } } diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 617cd66..5e60122 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -2,6 +2,7 @@ use clap::Parser; use cli_ext::OracleExt; use exex::ExEx; use network::{proto::OracleProtoHandler, Network}; +use offchain_data::DataFeederStream; use oracle::Oracle; use reth::args::utils::DefaultChainSpecParser; use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; @@ -28,7 +29,8 @@ fn main() -> eyre::Result<()> { let exex = ExEx::new(ctx); let network = Network::new(tcp_port, udp_port).await?; - let oracle = Oracle::new(exex, network).await?; + let data_feed = DataFeederStream::new(args.binance_symbols).await?; + let oracle = Oracle::new(exex, network, data_feed); Ok(oracle) }) .launch() diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs index 047dce7..2f642d2 100644 --- a/oracle/src/offchain_data/binance/mod.rs +++ b/oracle/src/offchain_data/binance/mod.rs @@ -8,9 +8,7 @@ use ticker::{BinanceResponse, Ticker}; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; -use super::DataFeeder; - -mod ticker; +pub(crate) mod ticker; #[derive(Error, Debug)] pub(crate) enum BinanceDataFeederError { @@ -24,8 +22,6 @@ pub(crate) enum BinanceDataFeederError { /// This structure controls the interaction with the Binance WebSocket API. pub(crate) struct BinanceDataFeeder { - /// The URL of the Binance WebSocket API. - url: String, /// The WebSocket stream. inner: WebSocketStream>, } @@ -42,7 +38,7 @@ impl BinanceDataFeeder { let url = format!("wss://stream.binance.com/stream?streams={}", query); let (client, _) = connect_async(url.to_string()).await?; - Ok(Self { url, inner: client }) + Ok(Self { inner: client }) } } @@ -70,17 +66,6 @@ impl Stream for BinanceDataFeeder { } } -impl DataFeeder for BinanceDataFeeder { - type Item = Ticker; - - /// Converts the Binance feeder into a stream of `Ticker` items. - fn into_stream( - self, - ) -> Pin> + Send>> { - Box::pin(self) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/oracle/src/offchain_data/mod.rs b/oracle/src/offchain_data/mod.rs index 8a50c0b..67bae4f 100644 --- a/oracle/src/offchain_data/mod.rs +++ b/oracle/src/offchain_data/mod.rs @@ -1,19 +1,53 @@ -use std::pin::Pin; - -use binance::BinanceDataFeederError; -use futures::stream::Stream; +use binance::{ticker::Ticker, BinanceDataFeeder, BinanceDataFeederError}; +use futures::{stream::Stream, StreamExt}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; pub(crate) mod binance; -/// This trait must be implemented by any data feeder that wants to be used by the Oracle. -/// -/// It takes care of feeding off-chain data to the Oracle. -pub(crate) trait DataFeeder { - /// The Item type that the stream will return. - type Item; - - /// Converts the data feeder into a stream of items. - fn into_stream( - self, - ) -> Pin> + Send>>; +/// The enum that represents the various types of data feeds, e.g., Binance. +pub(crate) enum DataFeeds { + Binance(Ticker), +} + +/// The error enum that wraps errors from all data feeders. +#[derive(Error, Debug)] +pub(crate) enum DataFeederError { + #[error(transparent)] + Binance(#[from] BinanceDataFeederError), +} + +/// The struct that implements the Stream trait for polling multiple data feeds. +pub(crate) struct DataFeederStream { + binance: BinanceDataFeeder, + // Add other feeder fields if needed. +} + +impl DataFeederStream { + /// Creates a new instance of the DataFeederStream with initialized Binance feeder. + pub(crate) async fn new(binance_symbols: Vec) -> Result { + let binance = BinanceDataFeeder::new(binance_symbols).await?; + Ok(Self { binance }) + } +} + +/// Implementing the Stream trait for DataFeederStream to wrap the individual feeders. +impl Stream for DataFeederStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.binance.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(ticker))) => Poll::Ready(Some(Ok(DataFeeds::Binance(ticker)))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFeederError::Binance(e)))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + + // Add other data-feeds here. + } } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index 9903fad..6f594f9 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,4 +1,4 @@ -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use reth_node_api::FullNodeComponents; use reth_tracing::tracing::{error, info}; use std::{ @@ -7,7 +7,7 @@ use std::{ task::{Context, Poll}, }; -use crate::{exex::ExEx, network::Network}; +use crate::{exex::ExEx, network::Network, offchain_data::DataFeederStream}; /// The Oracle struct is a long running task that orchestrates discovery of new peers, /// decoding data from chain events (ExEx) and gossiping it to peers. @@ -17,11 +17,13 @@ pub(crate) struct Oracle { network: Network, /// The execution extension task for this node. exex: ExEx, + /// The offchain data feed stream. + data_feed: DataFeederStream, } impl Oracle { - pub(crate) async fn new(exex: ExEx, network: Network) -> eyre::Result { - Ok(Self { exex, network }) + pub(crate) fn new(exex: ExEx, network: Network, data_feed: DataFeederStream) -> Self { + Self { exex, network, data_feed } } } @@ -47,6 +49,20 @@ impl Future for Oracle { } } + // Poll the data feed future until it's drained + while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) { + match item { + Some(Ok(data)) => { + // Process the data feed by signing it and sending it to the network + } + Some(Err(e)) => { + error!(?e, "Data feed task encountered an error"); + return Poll::Ready(Err(e.into())); + } + None => break, + } + } + // Poll the exex future until its drained loop { match this.exex.poll_unpin(cx)? { From db4325e0408378766c55cad6fd092af1dc180ea1 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Mon, 9 Sep 2024 20:05:33 +0200 Subject: [PATCH 04/14] feat(binance_data): ws reconnections --- oracle/src/offchain_data/binance/mod.rs | 66 ++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs index 2f642d2..d7edd17 100644 --- a/oracle/src/offchain_data/binance/mod.rs +++ b/oracle/src/offchain_data/binance/mod.rs @@ -1,4 +1,5 @@ use futures::{Stream, StreamExt}; +use reth_tracing::tracing::error; use std::{ pin::Pin, task::{Context, Poll}, @@ -8,6 +9,12 @@ use ticker::{BinanceResponse, Ticker}; use tokio::net::TcpStream; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +macro_rules! block_on { + ($expr:expr) => { + tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on($expr)) + }; +} + pub(crate) mod ticker; #[derive(Error, Debug)] @@ -18,12 +25,16 @@ pub(crate) enum BinanceDataFeederError { /// Error decoding the message. #[error("error decoding message")] Decode(#[from] serde_json::Error), + /// Error reconnecting after max retries. + #[error("reached max number of reconnection attempts")] + MaxRetriesExceeded, } /// This structure controls the interaction with the Binance WebSocket API. pub(crate) struct BinanceDataFeeder { /// The WebSocket stream. - inner: WebSocketStream>, + inner: Option>>, + url: String, } impl BinanceDataFeeder { @@ -36,9 +47,34 @@ impl BinanceDataFeeder { .join("/"); let url = format!("wss://stream.binance.com/stream?streams={}", query); - let (client, _) = connect_async(url.to_string()).await?; + let client = Self::connect_with_retries(url.to_string(), 10).await?; - Ok(Self { inner: client }) + Ok(Self { inner: Some(client), url }) + } + + /// Function to connect with retries and an optional delay between retries + async fn connect_with_retries( + url: String, + max_retries: usize, + ) -> Result>, BinanceDataFeederError> { + let mut attempts = 0; + + while attempts < max_retries { + let conn = connect_async(url.clone()).await; + + match conn { + Ok((connection, _)) => return Ok(connection), + Err(e) => { + error!(?e, attempts, max_retries, "Connection attempt failed, retrying..."); + attempts += 1; + if attempts < max_retries { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + } + } + + Err(BinanceDataFeederError::MaxRetriesExceeded) } } @@ -50,18 +86,34 @@ impl Stream for BinanceDataFeeder { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - match this.inner.poll_next_unpin(cx) { + let mut connection = this.inner.take().expect("inner WebSocket client is missing"); + + match connection.poll_next_unpin(cx) { Poll::Ready(Some(Ok(msg))) => { let msg = msg.into_text()?; let resp: BinanceResponse = serde_json::from_str(&msg)?; + this.inner = Some(connection); Poll::Ready(Some(Ok(resp.data))) } Poll::Ready(Some(Err(e))) => { - // we should handle reconnections here - Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) + // handle ws disconnections + error!(?e, "Binance ws disconnected, reconnecting"); + match block_on!(Self::connect_with_retries(this.url.clone(), 10)) { + Ok(conn) => { + this.inner = Some(conn); + Poll::Pending + } + Err(reconnect_error) => { + error!(?reconnect_error, "Failed to reconnect after max retries"); + Poll::Ready(Some(Err(reconnect_error))) + } + } } Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + Poll::Pending => { + this.inner = Some(connection); + Poll::Pending + } } } } From bbc6c264ab0138e7e95f1d52cb1b1681645f3771 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Tue, 10 Sep 2024 19:37:28 +0200 Subject: [PATCH 05/14] feat: handle reconnections properly --- oracle/src/offchain_data/binance/feeder.rs | 111 ++++++++++++++ oracle/src/offchain_data/binance/mod.rs | 163 +++++++-------------- oracle/src/offchain_data/mod.rs | 6 +- 3 files changed, 171 insertions(+), 109 deletions(-) create mode 100644 oracle/src/offchain_data/binance/feeder.rs diff --git a/oracle/src/offchain_data/binance/feeder.rs b/oracle/src/offchain_data/binance/feeder.rs new file mode 100644 index 0000000..916527f --- /dev/null +++ b/oracle/src/offchain_data/binance/feeder.rs @@ -0,0 +1,111 @@ +use futures::{Stream, StreamExt}; +use reth_tracing::tracing::error; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use super::ticker::{BinanceResponse, Ticker}; + +#[derive(Error, Debug)] +pub(crate) enum BinanceDataFeederError { + /// Error connecting to the WebSocket. + #[error("error connecting to WebSocket")] + Connection(#[from] tokio_tungstenite::tungstenite::Error), + /// Error decoding the message. + #[error("error decoding message")] + Decode(#[from] serde_json::Error), + /// Error reconnecting after max retries. + #[error("reached max number of reconnection attempts")] + MaxRetriesExceeded, +} + +/// This structure controls the interaction with the Binance WebSocket API. +pub(crate) struct BinanceDataFeeder { + /// The WebSocket stream. + pub(crate) inner: WebSocketStream>, + pub(crate) symbols: Vec, +} + +impl BinanceDataFeeder { + /// Creates a new BinanceDataFeeder instance. + pub(crate) async fn new(symbols: Vec) -> Result { + let query = symbols + .iter() + .map(|symbol| format!("{}@ticker", symbol)) + .collect::>() + .join("/"); + + let url = format!("wss://stream.binance.com/stream?streams={}", query); + let client = Self::connect_with_retries(url.to_string()).await?; + + Ok(Self { inner: client, symbols }) + } + + /// Function to connect with retries and an optional delay between retries + pub(crate) async fn connect_with_retries( + url: String, + ) -> Result>, BinanceDataFeederError> { + let mut attempts = 0; + let max_retries = 10; + + while attempts < max_retries { + let conn = connect_async(url.clone()).await; + + match conn { + Ok((connection, _)) => return Ok(connection), + Err(e) => { + error!(?e, attempts, max_retries, "Connection attempt failed, retrying..."); + attempts += 1; + if attempts < max_retries { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + } + } + + Err(BinanceDataFeederError::MaxRetriesExceeded) + } +} + +/// We implement the Stream trait for the BinanceDataFeeder struct +/// in order to encode the messages received from the WebSocket into our Ticker struct. +impl Stream for BinanceDataFeeder { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + let msg = msg.into_text()?; + let resp: BinanceResponse = serde_json::from_str(&msg)?; + Poll::Ready(Some(Ok(resp.data))) + } + Poll::Ready(Some(Err(e))) => { + // handle ws disconnections + error!(?e, "Binance ws disconnected, reconnecting"); + Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[tokio::test] + async fn can_connect() { + let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; + let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap(); + let msg = feeder.next().await.unwrap().unwrap(); + assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT"); + } +} diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs index d7edd17..6dd56e3 100644 --- a/oracle/src/offchain_data/binance/mod.rs +++ b/oracle/src/offchain_data/binance/mod.rs @@ -1,133 +1,84 @@ -use futures::{Stream, StreamExt}; -use reth_tracing::tracing::error; use std::{ + future::Future, pin::Pin, task::{Context, Poll}, }; -use thiserror::Error; -use ticker::{BinanceResponse, Ticker}; -use tokio::net::TcpStream; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; -macro_rules! block_on { - ($expr:expr) => { - tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on($expr)) - }; -} +use feeder::{BinanceDataFeeder, BinanceDataFeederError}; +use futures::{FutureExt, Stream, StreamExt}; +use reth_tracing::tracing::{error, info}; +use ticker::Ticker; +pub(crate) mod feeder; pub(crate) mod ticker; -#[derive(Error, Debug)] -pub(crate) enum BinanceDataFeederError { - /// Error connecting to the WebSocket. - #[error("error connecting to WebSocket")] - Connection(#[from] tokio_tungstenite::tungstenite::Error), - /// Error decoding the message. - #[error("error decoding message")] - Decode(#[from] serde_json::Error), - /// Error reconnecting after max retries. - #[error("reached max number of reconnection attempts")] - MaxRetriesExceeded, -} - -/// This structure controls the interaction with the Binance WebSocket API. -pub(crate) struct BinanceDataFeeder { - /// The WebSocket stream. - inner: Option>>, - url: String, +/// The reconnection future for the Binance WebSocket connection. +type ReconnectionFuture = + Pin> + Send>>; + +/// A struct that manages a Binance WebSocket connection and handles reconnection logic +/// by proxying the underlying `BinanceDataFeeder`. +/// +/// The connection is automatically re-established if it encounters a `Connection` error. +pub(crate) struct BinanceConnection { + pub(crate) inner: BinanceDataFeeder, + reconnection: Option, } -impl BinanceDataFeeder { - /// Creates a new BinanceDataFeeder instance. +impl BinanceConnection { + /// Creates a new `BinanceConnection` with an initial WebSocket connection + /// to the Binance API using the provided list of symbols. pub(crate) async fn new(symbols: Vec) -> Result { - let query = symbols - .iter() - .map(|symbol| format!("{}@ticker", symbol)) - .collect::>() - .join("/"); - - let url = format!("wss://stream.binance.com/stream?streams={}", query); - let client = Self::connect_with_retries(url.to_string(), 10).await?; - - Ok(Self { inner: Some(client), url }) - } - - /// Function to connect with retries and an optional delay between retries - async fn connect_with_retries( - url: String, - max_retries: usize, - ) -> Result>, BinanceDataFeederError> { - let mut attempts = 0; - - while attempts < max_retries { - let conn = connect_async(url.clone()).await; - - match conn { - Ok((connection, _)) => return Ok(connection), - Err(e) => { - error!(?e, attempts, max_retries, "Connection attempt failed, retrying..."); - attempts += 1; - if attempts < max_retries { - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - } - } - } - } - - Err(BinanceDataFeederError::MaxRetriesExceeded) + let inner = BinanceDataFeeder::new(symbols).await?; + Ok(Self { inner, reconnection: None }) } } -/// We implement the Stream trait for the BinanceDataFeeder struct -/// in order to encode the messages received from the WebSocket into our Ticker struct. -impl Stream for BinanceDataFeeder { +impl Stream for BinanceConnection { type Item = Result; + /// Polls the next message from the Binance WebSocket connection. + /// + /// If the connection is active, it will poll the next message. In case of a connection error, + /// it will attempt to reconnect by storing a future in `reconnect_future` and polling it until + /// reconnection is successful. + /// + /// # Returns + /// - `Poll::Ready(Some(Ok(Ticker)))`: If a new ticker message is successfully received. + /// - `Poll::Ready(Some(Err(BinanceDataFeederError)))`: If there is an error in the WebSocket + /// connection. + /// - `Poll::Ready(None)`: If the WebSocket stream has ended. + /// - `Poll::Pending`: If the WebSocket stream is still active or a reconnection is in progress. fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let mut connection = this.inner.take().expect("inner WebSocket client is missing"); + let reconn = this.reconnection.take(); - match connection.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - let msg = msg.into_text()?; - let resp: BinanceResponse = serde_json::from_str(&msg)?; - this.inner = Some(connection); - Poll::Ready(Some(Ok(resp.data))) - } - Poll::Ready(Some(Err(e))) => { - // handle ws disconnections - error!(?e, "Binance ws disconnected, reconnecting"); - match block_on!(Self::connect_with_retries(this.url.clone(), 10)) { - Ok(conn) => { - this.inner = Some(conn); - Poll::Pending - } - Err(reconnect_error) => { - error!(?reconnect_error, "Failed to reconnect after max retries"); - Poll::Ready(Some(Err(reconnect_error))) - } + if let Some(mut fut) = reconn { + match fut.as_mut().poll(cx) { + Poll::Ready(Ok(conn)) => { + this.inner = conn; + this.reconnection = None; + info!("Reconnected to Binance WebSocket successfully."); + } + Poll::Ready(Err(e)) => { + this.reconnection = None; + return Poll::Ready(Some(Err(e))); } + Poll::Pending => return Poll::Pending, } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => { - this.inner = Some(connection); + } + + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(Ok(msg))), + Poll::Ready(Some(Err(e))) => { + error!(?e, "Binance WebSocket disconnected. Attempting to reconnect..."); + let fut = BinanceDataFeeder::new(this.inner.symbols.clone()).boxed(); + this.reconnection = Some(fut); Poll::Pending } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } - -#[cfg(test)] -mod tests { - use super::*; - use futures::StreamExt; - - #[tokio::test] - async fn can_connect() { - let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; - let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap(); - let msg = feeder.next().await.unwrap().unwrap(); - assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT"); - } -} diff --git a/oracle/src/offchain_data/mod.rs b/oracle/src/offchain_data/mod.rs index 67bae4f..0b1cdf8 100644 --- a/oracle/src/offchain_data/mod.rs +++ b/oracle/src/offchain_data/mod.rs @@ -1,4 +1,4 @@ -use binance::{ticker::Ticker, BinanceDataFeeder, BinanceDataFeederError}; +use binance::{feeder::BinanceDataFeederError, ticker::Ticker, BinanceConnection}; use futures::{stream::Stream, StreamExt}; use std::{ pin::Pin, @@ -22,14 +22,14 @@ pub(crate) enum DataFeederError { /// The struct that implements the Stream trait for polling multiple data feeds. pub(crate) struct DataFeederStream { - binance: BinanceDataFeeder, + binance: BinanceConnection, // Add other feeder fields if needed. } impl DataFeederStream { /// Creates a new instance of the DataFeederStream with initialized Binance feeder. pub(crate) async fn new(binance_symbols: Vec) -> Result { - let binance = BinanceDataFeeder::new(binance_symbols).await?; + let binance = BinanceConnection::new(binance_symbols).await?; Ok(Self { binance }) } } From fc1974869de43cdd6f59c9da6cebfe574f394714 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Fri, 13 Sep 2024 15:14:34 +0200 Subject: [PATCH 06/14] feat(rlp-proto): added signed tick message, implemented decoding and encoding and unit tests --- Cargo.lock | 3 + oracle/Cargo.toml | 4 + oracle/src/cli_ext.rs | 2 +- oracle/src/network/proto/connection.rs | 43 +++----- oracle/src/network/proto/data.rs | 17 +++ oracle/src/network/proto/mod.rs | 114 +++++++++++++++------ oracle/src/offchain_data/binance/ticker.rs | 47 ++++----- oracle/src/oracle.rs | 2 +- 8 files changed, 147 insertions(+), 85 deletions(-) create mode 100644 oracle/src/network/proto/data.rs diff --git a/Cargo.lock b/Cargo.lock index 023187a..4ef56b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4237,6 +4237,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" name = "oracle" version = "0.1.0" dependencies = [ + "alloy-rlp", + "alloy-signer", + "alloy-signer-local", "clap", "discv5 0.7.0", "enr", diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 4692a65..07b140e 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -22,6 +22,10 @@ reth-rpc-types.workspace = true reth-tracing.workspace = true reth.workspace = true +alloy-signer-local = "0.3" +alloy-signer = "0.3" +alloy-rlp = "0.3" + # networking discv5 = "0.7" enr = "0.12" diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs index 40cca3b..4116267 100644 --- a/oracle/src/cli_ext.rs +++ b/oracle/src/cli_ext.rs @@ -2,7 +2,7 @@ use clap::Args; pub const DEFAULT_DISCOVERY_PORT: u16 = 30304; pub const DEFAULT_RLPX_PORT: u16 = 30303; -pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdt,ethusdt"; +pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdc,ethusdc"; #[derive(Debug, Clone, Args)] pub(crate) struct OracleExt { diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index 46a3b2f..f58248a 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -42,40 +42,23 @@ impl Stream for OracleConnection { return Poll::Ready(Some(initial_ping.encoded())); } - loop { - if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { - return match cmd { - OracleCommand::Message { msg, response } => { - this.pending_pong = Some(response); - Poll::Ready(Some(OracleProtoMessage::ping_message(msg).encoded())) - } - }; - } - - let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; - let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); - }; + let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; - match msg.message { - OracleProtoMessageKind::Ping => { - return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) - } - OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::PingMessage(msg) => { - return Poll::Ready(Some(OracleProtoMessage::pong_message(msg).encoded())) - } - OracleProtoMessageKind::PongMessage(msg) => { - if let Some(sender) = this.pending_pong.take() { - sender.send(msg).ok(); - } - continue; - } + match msg.message { + OracleProtoMessageKind::Ping => { + return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) + } + OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::SignedTicker(_) => { + // TODO: verify signature and keep count } - - return Poll::Pending; } + + Poll::Pending } } diff --git a/oracle/src/network/proto/data.rs b/oracle/src/network/proto/data.rs new file mode 100644 index 0000000..5af925c --- /dev/null +++ b/oracle/src/network/proto/data.rs @@ -0,0 +1,17 @@ +use crate::offchain_data::binance::ticker::Ticker; +use alloy_rlp::{RlpDecodable, RlpEncodable}; +use alloy_signer::Signature; +use reth_primitives::Address; + +#[derive(Clone, Debug, RlpEncodable, RlpDecodable, PartialEq)] +pub struct SignedTicker { + pub(crate) ticker: Ticker, + pub(crate) signature: Signature, + pub(crate) signer: Address, +} + +impl SignedTicker { + pub fn new(ticker: Ticker, signature: Signature, signer: Address) -> Self { + Self { ticker, signature, signer } + } +} diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index 91f617d..f3ba4e8 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -1,6 +1,8 @@ #![allow(dead_code)] +use alloy_rlp::{Decodable, Encodable}; use connection::{OracleCommand, OracleConnHandler}; +use data::SignedTicker; use reth_eth_wire::{protocol::Protocol, Capability}; use reth_network::{protocol::ProtocolHandler, Direction}; use reth_network_api::PeerId; @@ -9,25 +11,24 @@ use std::net::SocketAddr; use tokio::sync::mpsc; pub(crate) mod connection; +pub(crate) mod data; #[repr(u8)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum OracleProtoMessageId { Ping = 0x00, Pong = 0x01, - PingMessage = 0x02, - PongMessage = 0x03, + TickData = 0x04, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) enum OracleProtoMessageKind { Ping, Pong, - PingMessage(String), - PongMessage(String), + SignedTicker(Box), } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub(crate) struct OracleProtoMessage { pub(crate) message_type: OracleProtoMessageId, pub(crate) message: OracleProtoMessageKind, @@ -44,21 +45,6 @@ impl OracleProtoMessage { Protocol::new(Self::capability(), 4) } - /// Creates a ping message - pub(crate) fn ping_message(msg: impl Into) -> Self { - Self { - message_type: OracleProtoMessageId::PingMessage, - message: OracleProtoMessageKind::PingMessage(msg.into()), - } - } - /// Creates a ping message - pub(crate) fn pong_message(msg: impl Into) -> Self { - Self { - message_type: OracleProtoMessageId::PongMessage, - message: OracleProtoMessageKind::PongMessage(msg.into()), - } - } - /// Creates a ping message pub(crate) fn ping() -> Self { Self { message_type: OracleProtoMessageId::Ping, message: OracleProtoMessageKind::Ping } @@ -75,8 +61,8 @@ impl OracleProtoMessage { buf.put_u8(self.message_type as u8); match &self.message { OracleProtoMessageKind::Ping | OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::PingMessage(msg) | OracleProtoMessageKind::PongMessage(msg) => { - buf.put(msg.as_bytes()); + OracleProtoMessageKind::SignedTicker(data) => { + data.encode(&mut buf); } } buf @@ -92,18 +78,15 @@ impl OracleProtoMessage { let message_type = match id { 0x00 => OracleProtoMessageId::Ping, 0x01 => OracleProtoMessageId::Pong, - 0x02 => OracleProtoMessageId::PingMessage, - 0x03 => OracleProtoMessageId::PongMessage, + 0x04 => OracleProtoMessageId::TickData, _ => return None, }; let message = match message_type { OracleProtoMessageId::Ping => OracleProtoMessageKind::Ping, OracleProtoMessageId::Pong => OracleProtoMessageKind::Pong, - OracleProtoMessageId::PingMessage => { - OracleProtoMessageKind::PingMessage(String::from_utf8_lossy(&buf[..]).into_owned()) - } - OracleProtoMessageId::PongMessage => { - OracleProtoMessageKind::PongMessage(String::from_utf8_lossy(&buf[..]).into_owned()) + OracleProtoMessageId::TickData => { + let data = SignedTicker::decode(buf).unwrap(); + OracleProtoMessageKind::SignedTicker(Box::new(data)) } }; @@ -156,3 +139,74 @@ pub(crate) enum ProtocolEvent { to_connection: mpsc::UnboundedSender, }, } +#[cfg(test)] +mod tests { + use super::*; + use crate::offchain_data::binance::ticker::Ticker; + use alloy_signer::SignerSync; + use alloy_signer_local::PrivateKeySigner; + + #[test] + fn can_decode_msg() { + test_msg(OracleProtoMessage::ping()); + test_msg(OracleProtoMessage::pong()); + + // generate signer + let signer = PrivateKeySigner::random(); + let signer_address = signer.address(); + + // sign ticker data + let ticker_data = mock_ticker(); + let mut buffer = BytesMut::new(); + ticker_data.encode(&mut buffer); + let signature = signer.sign_message_sync(&buffer).unwrap(); + + // recover signer address and verify it was signed properly + let recovered_addr = signature.recover_address_from_msg(&buffer).unwrap(); + assert_eq!(recovered_addr, signer_address); + + // create signed ticker + let signed_ticker = SignedTicker::new(ticker_data, signature, signer_address); + + // test the RLP encoding/decoding + test_msg(OracleProtoMessage { + message_type: OracleProtoMessageId::TickData, + message: OracleProtoMessageKind::SignedTicker(Box::new(signed_ticker)), + }); + } + + fn test_msg(message: OracleProtoMessage) { + let encoded = message.encoded(); + let decoded = OracleProtoMessage::decode_message(&mut &encoded[..]).unwrap(); + assert_eq!(message.message_type, decoded.message_type); + assert_eq!(message.message, decoded.message); + } + + fn mock_ticker() -> Ticker { + Ticker { + event_type: "24hrTicker".to_string(), + event_time: 1622548800000, + symbol: "BTCUSDT".to_string(), + price_change: "100.0".to_string(), + price_change_percent: "1.0".to_string(), + weighted_avg_price: "50000.0".to_string(), + prev_close_price: "49000.0".to_string(), + last_price: "50000.0".to_string(), + last_quantity: "0.1".to_string(), + best_bid_price: "49950.0".to_string(), + best_bid_quantity: "0.2".to_string(), + best_ask_price: "50050.0".to_string(), + best_ask_quantity: "0.3".to_string(), + open_price: "49000.0".to_string(), + high_price: "51000.0".to_string(), + low_price: "48000.0".to_string(), + volume: "1000.0".to_string(), + quote_volume: "50000000.0".to_string(), + open_time: 1622462400000, + close_time: 1622548800000, + first_trade_id: 100000, + last_trade_id: 100100, + num_trades: 100, + } + } +} diff --git a/oracle/src/offchain_data/binance/ticker.rs b/oracle/src/offchain_data/binance/ticker.rs index 427fbfc..b760047 100644 --- a/oracle/src/offchain_data/binance/ticker.rs +++ b/oracle/src/offchain_data/binance/ticker.rs @@ -1,3 +1,4 @@ +use alloy_rlp::{RlpDecodable, RlpEncodable}; use serde::{Deserialize, Serialize}; /// Struct representing the full JSON response from Binance @@ -10,75 +11,75 @@ pub(crate) struct BinanceResponse { } /// Binance ticker data -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] pub(crate) struct Ticker { /// Event type (e.g., "24hrTicker") #[serde(rename = "e")] - event_type: String, + pub(crate) event_type: String, /// Event time (timestamp) #[serde(rename = "E")] - event_time: u64, + pub(crate) event_time: u64, /// Trading pair symbol #[serde(rename = "s")] pub(crate) symbol: String, /// Price change over the last 24 hours #[serde(rename = "p")] - price_change: String, + pub(crate) price_change: String, /// Price change percent #[serde(rename = "P")] - price_change_percent: String, + pub(crate) price_change_percent: String, /// Weighted average price #[serde(rename = "w")] - weighted_avg_price: String, + pub(crate) weighted_avg_price: String, /// Previous day's close price #[serde(rename = "x")] - prev_close_price: String, + pub(crate) prev_close_price: String, /// Current price (last trade price) #[serde(rename = "c")] - last_price: String, + pub(crate) last_price: String, /// Last quantity traded #[serde(rename = "Q")] - last_quantity: String, + pub(crate) last_quantity: String, /// Best bid price #[serde(rename = "b")] - best_bid_price: String, + pub(crate) best_bid_price: String, /// Best bid quantity #[serde(rename = "B")] - best_bid_quantity: String, + pub(crate) best_bid_quantity: String, /// Best ask price #[serde(rename = "a")] - best_ask_price: String, + pub(crate) best_ask_price: String, /// Best ask quantity #[serde(rename = "A")] - best_ask_quantity: String, + pub(crate) best_ask_quantity: String, /// Open price for the 24-hour period #[serde(rename = "o")] - open_price: String, + pub(crate) open_price: String, /// High price of the 24-hour period #[serde(rename = "h")] - high_price: String, + pub(crate) high_price: String, /// Low price of the 24-hour period #[serde(rename = "l")] - low_price: String, + pub(crate) low_price: String, /// Total traded volume of the base asset #[serde(rename = "v")] - volume: String, + pub(crate) volume: String, /// Total traded volume of the quote asset #[serde(rename = "q")] - quote_volume: String, + pub(crate) quote_volume: String, /// Open time (timestamp) #[serde(rename = "O")] - open_time: u64, + pub(crate) open_time: u64, /// Close time (timestamp) #[serde(rename = "C")] - close_time: u64, + pub(crate) close_time: u64, /// First trade ID #[serde(rename = "F")] - first_trade_id: u64, + pub(crate) first_trade_id: u64, /// Last trade ID #[serde(rename = "L")] - last_trade_id: u64, + pub(crate) last_trade_id: u64, /// Total number of trades #[serde(rename = "n")] - num_trades: u64, + pub(crate) num_trades: u64, } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index 6f594f9..e7563a3 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -52,7 +52,7 @@ impl Future for Oracle { // Poll the data feed future until it's drained while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) { match item { - Some(Ok(data)) => { + Some(Ok(_data)) => { // Process the data feed by signing it and sending it to the network } Some(Err(e)) => { From 4119ec7eece6b11a115d385df1961cc4430ec085 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Fri, 13 Sep 2024 15:20:51 +0200 Subject: [PATCH 07/14] fix: binance test --- oracle/src/offchain_data/binance/feeder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/oracle/src/offchain_data/binance/feeder.rs b/oracle/src/offchain_data/binance/feeder.rs index 916527f..3a20612 100644 --- a/oracle/src/offchain_data/binance/feeder.rs +++ b/oracle/src/offchain_data/binance/feeder.rs @@ -101,6 +101,7 @@ mod tests { use super::*; use futures::StreamExt; + #[ignore] #[tokio::test] async fn can_connect() { let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; From 55de2182eda912e548f8daec6e729619fad67eed Mon Sep 17 00:00:00 2001 From: Loocapro Date: Mon, 16 Sep 2024 14:18:47 +0200 Subject: [PATCH 08/14] feat(gossip): added a broadcast channel to subscribe to the datafeed stream on peer established event --- oracle/src/cli_ext.rs | 2 +- oracle/src/main.rs | 6 +-- oracle/src/network/discovery.rs | 4 +- oracle/src/network/gossip.rs | 59 +++++++++++++++++++++++ oracle/src/network/mod.rs | 40 +++++++++++++++- oracle/src/network/proto/connection.rs | 65 ++++++++++++++++---------- oracle/src/network/proto/mod.rs | 18 +++++-- oracle/src/oracle.rs | 32 ++++++++++--- 8 files changed, 184 insertions(+), 42 deletions(-) create mode 100644 oracle/src/network/gossip.rs diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs index 4116267..ac85ff3 100644 --- a/oracle/src/cli_ext.rs +++ b/oracle/src/cli_ext.rs @@ -33,7 +33,7 @@ mod tests { #[test] fn test_oracle_ext() { - let cli = CommandParser::::parse_from(&[ + let cli = CommandParser::::parse_from([ "test", "--disc.tcp-port", "30304", diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 5e60122..8b15d76 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -24,13 +24,13 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { - let subproto = OracleProtoHandler::new(); + let (subproto, proto_events) = OracleProtoHandler::new(); ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); let exex = ExEx::new(ctx); - let network = Network::new(tcp_port, udp_port).await?; + let (network, to_gossip) = Network::new(proto_events, tcp_port, udp_port).await?; let data_feed = DataFeederStream::new(args.binance_symbols).await?; - let oracle = Oracle::new(exex, network, data_feed); + let oracle = Oracle::new(exex, network, data_feed, to_gossip); Ok(oracle) }) .launch() diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index 8113759..6e328cc 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -67,11 +67,11 @@ impl Future for Discovery { match ready!(this.events.poll_recv(cx)) { Some(evt) => match evt { Event::Discovered(enr) => { - info!(?enr, "Discovered a new peer."); + info!(?enr, "Discovered a new node."); this.add_node(enr)?; } Event::SessionEstablished(enr, socket_addr) => { - info!(?enr, ?socket_addr, "Session established with a new peer."); + info!(?enr, ?socket_addr, "Session established with a new node."); } evt => { info!(?evt, "New discovery event."); diff --git a/oracle/src/network/gossip.rs b/oracle/src/network/gossip.rs new file mode 100644 index 0000000..2bd9afc --- /dev/null +++ b/oracle/src/network/gossip.rs @@ -0,0 +1,59 @@ +use reth_tracing::tracing::error; +use tokio::sync::{ + broadcast::{error::RecvError, Sender}, + mpsc::UnboundedSender, +}; + +use super::proto::{connection::OracleCommand, data::SignedTicker}; + +/// The size of the broadcast channel. +/// +/// This value is based on the estimated message rate and the tolerance for lag. +/// - We assume an average of 10-20 updates per second per symbol. +/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second. +/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds. +/// +/// Thus, the buffer size is calculated as: +/// +/// `Buffer Size = Message Rate per Second * Lag Tolerance` +/// +/// For 2 symbols, we calculate: `40 * 5 = 200`. +const BROADCAST_CHANNEL_SIZE: usize = 200; + +pub(crate) struct Gossip { + pub(crate) inner: Sender, +} + +impl Gossip { + /// Creates a new Broadcast instance. + pub(crate) fn new() -> (Self, tokio::sync::broadcast::Sender) { + let (sender, _receiver) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + (Self { inner: sender.clone() }, sender) + } + + /// Starts to gossip data to the connected peers from the broadcast channel. + pub(crate) fn start( + &self, + to_connection: UnboundedSender, + ) -> Result<(), RecvError> { + let mut receiver = self.inner.subscribe(); + + tokio::task::spawn(async move { + loop { + match receiver.recv().await { + Ok(signed_data) => { + if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data)) { + error!(?e, "Failed to broadcast message to peer"); + } + } + Err(e) => { + error!(?e, "Data feed task encountered an error"); + return Err::<(), RecvError>(e); + } + } + } + }); + + Ok(()) + } +} diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 1a0f42d..54d43e4 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,5 +1,7 @@ use discovery::Discovery; use futures::FutureExt; +use gossip::Gossip; +use proto::{data::SignedTicker, ProtocolEvent}; use reth_tracing::tracing::{error, info}; use std::{ future::Future, @@ -7,8 +9,10 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::sync::broadcast::Sender; mod discovery; +mod gossip; pub(crate) mod proto; /// The Network struct is a long running task that orchestrates discovery of new peers and network @@ -16,14 +20,23 @@ pub(crate) mod proto; pub(crate) struct Network { /// The discovery task for this node. discovery: Discovery, + /// The protocol events channel. + proto_events: proto::ProtoEvents, + /// Helper struct to manage gossiping data to connected peers. + gossip: Gossip, } impl Network { - pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { + pub(crate) async fn new( + proto_events: proto::ProtoEvents, + tcp_port: u16, + udp_port: u16, + ) -> eyre::Result<(Self, Sender)> { let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); let discovery = Discovery::new(disc_addr, rlpx_addr).await?; - Ok(Self { discovery }) + let (gossip, to_gossip) = Gossip::new(); + Ok((Self { discovery, proto_events, gossip }, to_gossip)) } } @@ -42,6 +55,29 @@ impl Future for Network { error!(?e, "Discovery task encountered an error"); return Poll::Ready(Err(e)); } + Poll::Pending => break, + } + } + + loop { + match this.proto_events.poll_recv(cx) { + Poll::Ready(Some(ProtocolEvent::Established { + direction, + peer_id, + to_connection, + })) => { + info!( + ?direction, + ?peer_id, + ?to_connection, + "Established connection, will start gossiping" + ); + this.gossip.start(to_connection)?; + } + Poll::Ready(None) => { + return Poll::Ready(Ok(())); + } + Poll::Pending => {} } } diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index f58248a..587d6d8 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -1,35 +1,34 @@ -use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState}; +use super::{ + data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState, +}; +use alloy_rlp::Encodable; use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network::protocol::{ConnectionHandler, OnNotSupported}; use reth_network_api::Direction; -use reth_primitives::BytesMut; +use reth_primitives::{Address, BytesMut}; use reth_rpc_types::PeerId; use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; /// The commands supported by the OracleConnection. pub(crate) enum OracleCommand { - /// Sends a message to the peer - Message { - msg: String, - /// The response will be sent to this channel. - response: oneshot::Sender, - }, + /// Sends a signed tick to a peer + Tick(SignedTicker), } /// This struct defines the connection object for the Oracle subprotocol. pub(crate) struct OracleConnection { conn: ProtocolConnection, commands: UnboundedReceiverStream, - pending_pong: Option>, initial_ping: Option, + attestations: Vec
, } impl Stream for OracleConnection { @@ -42,23 +41,41 @@ impl Stream for OracleConnection { return Poll::Ready(Some(initial_ping.encoded())); } - let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + loop { + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + return match cmd { + OracleCommand::Tick(tick) => { + Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())) + } + }; + } + + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; - let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { - return Poll::Ready(None); - }; + let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; - match msg.message { - OracleProtoMessageKind::Ping => { - return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) - } - OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::SignedTicker(_) => { - // TODO: verify signature and keep count + match msg.message { + OracleProtoMessageKind::Ping => { + return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) + } + OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::SignedTicker(signed_data) => { + let signer = signed_data.signer; + let sig = signed_data.signature; + + let mut buffer = BytesMut::new(); + signed_data.ticker.encode(&mut buffer); + + let addr = sig.recover_address_from_msg(buffer).ok().unwrap(); + + if addr == signer && !this.attestations.contains(&addr) { + this.attestations.push(addr); + } + } } } - - Poll::Pending } } @@ -98,7 +115,7 @@ impl ConnectionHandler for OracleConnHandler { conn, initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), - pending_pong: None, + attestations: Vec::new(), } } } diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index f3ba4e8..67d4f87 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -18,7 +18,7 @@ pub(crate) mod data; pub(crate) enum OracleProtoMessageId { Ping = 0x00, Pong = 0x01, - TickData = 0x04, + TickData = 0x02, } #[derive(Clone, Debug, PartialEq)] @@ -45,6 +45,14 @@ impl OracleProtoMessage { Protocol::new(Self::capability(), 4) } + /// Creates a signed ticker message + pub(crate) fn signed_ticker(data: SignedTicker) -> Self { + Self { + message_type: OracleProtoMessageId::TickData, + message: OracleProtoMessageKind::SignedTicker(Box::new(data)), + } + } + /// Creates a ping message pub(crate) fn ping() -> Self { Self { message_type: OracleProtoMessageId::Ping, message: OracleProtoMessageKind::Ping } @@ -100,11 +108,13 @@ pub(crate) struct OracleProtoHandler { state: ProtocolState, } +pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; + impl OracleProtoHandler { /// Creates a new `OracleProtoHandler` with the given protocol state. - pub(crate) fn new() -> Self { - let (tx, _) = mpsc::unbounded_channel(); - Self { state: ProtocolState { events: tx } } + pub(crate) fn new() -> (Self, ProtoEvents) { + let (tx, rx) = mpsc::unbounded_channel(); + (Self { state: ProtocolState { events: tx } }, rx) } } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index e7563a3..9f53cf3 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,3 +1,11 @@ +use crate::{ + exex::ExEx, + network::{proto::data::SignedTicker, Network}, + offchain_data::{DataFeederStream, DataFeeds}, +}; +use alloy_rlp::{BytesMut, Encodable}; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; use futures::{FutureExt, StreamExt}; use reth_node_api::FullNodeComponents; use reth_tracing::tracing::{error, info}; @@ -7,8 +15,6 @@ use std::{ task::{Context, Poll}, }; -use crate::{exex::ExEx, network::Network, offchain_data::DataFeederStream}; - /// The Oracle struct is a long running task that orchestrates discovery of new peers, /// decoding data from chain events (ExEx) and gossiping it to peers. pub(crate) struct Oracle { @@ -19,11 +25,20 @@ pub(crate) struct Oracle { exex: ExEx, /// The offchain data feed stream. data_feed: DataFeederStream, + /// The signer to sign the data feed. + signer: PrivateKeySigner, + /// Half of the broadcast channel to send data to gossip. + to_gossip: tokio::sync::broadcast::Sender, } impl Oracle { - pub(crate) fn new(exex: ExEx, network: Network, data_feed: DataFeederStream) -> Self { - Self { exex, network, data_feed } + pub(crate) fn new( + exex: ExEx, + network: Network, + data_feed: DataFeederStream, + to_gossip: tokio::sync::broadcast::Sender, + ) -> Self { + Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_gossip } } } @@ -52,8 +67,13 @@ impl Future for Oracle { // Poll the data feed future until it's drained while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) { match item { - Some(Ok(_data)) => { - // Process the data feed by signing it and sending it to the network + Some(Ok(ticker_data)) => { + let DataFeeds::Binance(ticker) = ticker_data; + let mut buffer = BytesMut::new(); + ticker.encode(&mut buffer); + let signature = this.signer.sign_message_sync(&buffer)?; + let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address()); + this.to_gossip.send(signed_ticker.clone())?; } Some(Err(e)) => { error!(?e, "Data feed task encountered an error"); From 78fcdfaf9830fc1e2db6f56c775dd2930ae3c190 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Mon, 16 Sep 2024 14:25:58 +0200 Subject: [PATCH 09/14] fix: decoding --- oracle/src/network/proto/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index 67d4f87..a9a283d 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -86,7 +86,7 @@ impl OracleProtoMessage { let message_type = match id { 0x00 => OracleProtoMessageId::Ping, 0x01 => OracleProtoMessageId::Pong, - 0x04 => OracleProtoMessageId::TickData, + 0x02 => OracleProtoMessageId::TickData, _ => return None, }; let message = match message_type { From b35655499984c6d4ff8f64463a140ed20b4ec28d Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 18 Sep 2024 20:20:53 +0200 Subject: [PATCH 10/14] review --- Cargo.lock | 2 ++ Cargo.toml | 7 +++++ oracle/Cargo.toml | 13 +++++---- oracle/src/main.rs | 17 ++++++++++-- oracle/src/network/gossip.rs | 6 ++-- oracle/src/network/mod.rs | 16 ++++++----- oracle/src/network/proto/mod.rs | 4 +-- oracle/src/offchain_data/binance/feeder.rs | 32 +++++++++++++++------- oracle/src/offchain_data/binance/mod.rs | 14 ++++++++-- oracle/src/oracle.rs | 11 +++++--- 10 files changed, 88 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ef56b8..09c6708 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4249,6 +4249,7 @@ dependencies = [ "reth-discv5", "reth-eth-wire", "reth-exex", + "reth-exex-test-utils", "reth-network", "reth-network-api", "reth-network-peers", @@ -4256,6 +4257,7 @@ dependencies = [ "reth-node-ethereum", "reth-primitives", "reth-rpc-types", + "reth-testing-utils", "reth-tracing", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 0d65d3e..75cb58b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,9 @@ reth-tracing = { git = "https://github.com/paradigmxyz/reth" } # alloy alloy-sol-types = { version = "0.8", features = ["json"] } +alloy-signer-local = "0.3" +alloy-signer = "0.3" +alloy-rlp = "0.3" # async futures = "0.3" @@ -50,6 +53,10 @@ futures-util = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" +# serde +serde = "1" +serde_json = "1" + # misc eyre = "0.6" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 07b140e..1ec477f 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -22,9 +22,9 @@ reth-rpc-types.workspace = true reth-tracing.workspace = true reth.workspace = true -alloy-signer-local = "0.3" -alloy-signer = "0.3" -alloy-rlp = "0.3" +alloy-signer-local.workspace = true +alloy-signer.workspace = true +alloy-rlp.workspace = true # networking discv5 = "0.7" @@ -42,5 +42,8 @@ thiserror = "1" # misc clap = "4" eyre.workspace = true -serde = "1" -serde_json = "1" +serde.workspace = true +serde_json.workspace = true + +reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" } +reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" } diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 8b15d76..cd6ca45 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -1,7 +1,7 @@ use clap::Parser; use cli_ext::OracleExt; use exex::ExEx; -use network::{proto::OracleProtoHandler, Network}; +use network::{proto::OracleProtoHandler, OracleNetwork}; use offchain_data::DataFeederStream; use oracle::Oracle; use reth::args::utils::DefaultChainSpecParser; @@ -24,12 +24,25 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { + // define the oracle subprotocol let (subproto, proto_events) = OracleProtoHandler::new(); + // add it to the network as a subprotocol ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); + // the instance of the execution extension that will handle chain events let exex = ExEx::new(ctx); - let (network, to_gossip) = Network::new(proto_events, tcp_port, udp_port).await?; + + // the instance of the oracle network that will handle discovery and + // gossiping of data + let (network, to_gossip) = + OracleNetwork::new(proto_events, tcp_port, udp_port).await?; + // the off-chain data feed stream let data_feed = DataFeederStream::new(args.binance_symbols).await?; + + // the oracle instance that will orchestrate the network, the execution extensions, + // the offchain data stream and the gossiping + // the oracle will always sign and broadcast data via the channel until a peer is + // subcribed to it let oracle = Oracle::new(exex, network, data_feed, to_gossip); Ok(oracle) }) diff --git a/oracle/src/network/gossip.rs b/oracle/src/network/gossip.rs index 2bd9afc..0270940 100644 --- a/oracle/src/network/gossip.rs +++ b/oracle/src/network/gossip.rs @@ -1,4 +1,4 @@ -use reth_tracing::tracing::error; +use reth_tracing::tracing::{error, info}; use tokio::sync::{ broadcast::{error::RecvError, Sender}, mpsc::UnboundedSender, @@ -42,9 +42,11 @@ impl Gossip { loop { match receiver.recv().await { Ok(signed_data) => { - if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data)) { + if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data.clone())) + { error!(?e, "Failed to broadcast message to peer"); } + info!(?signed_data, "Broadcasted message to peer"); } Err(e) => { error!(?e, "Data feed task encountered an error"); diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 54d43e4..a46c2a3 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -15,9 +15,9 @@ mod discovery; mod gossip; pub(crate) mod proto; -/// The Network struct is a long running task that orchestrates discovery of new peers and network -/// gossiping via the RLPx subprotocol. -pub(crate) struct Network { +/// The OracleNetwork struct is a long running task that orchestrates discovery of new peers and +/// network gossiping via the RLPx subprotocol. +pub(crate) struct OracleNetwork { /// The discovery task for this node. discovery: Discovery, /// The protocol events channel. @@ -26,7 +26,7 @@ pub(crate) struct Network { gossip: Gossip, } -impl Network { +impl OracleNetwork { pub(crate) async fn new( proto_events: proto::ProtoEvents, tcp_port: u16, @@ -40,7 +40,7 @@ impl Network { } } -impl Future for Network { +impl Future for OracleNetwork { type Output = eyre::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -75,10 +75,12 @@ impl Future for Network { this.gossip.start(to_connection)?; } Poll::Ready(None) => { - return Poll::Ready(Ok(())); + return Poll::Pending; } - Poll::Pending => {} + Poll::Pending => { + return Poll::Pending; + } } } } diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index a9a283d..3ef0a2a 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -93,7 +93,7 @@ impl OracleProtoMessage { OracleProtoMessageId::Ping => OracleProtoMessageKind::Ping, OracleProtoMessageId::Pong => OracleProtoMessageKind::Pong, OracleProtoMessageId::TickData => { - let data = SignedTicker::decode(buf).unwrap(); + let data = SignedTicker::decode(buf).ok()?; OracleProtoMessageKind::SignedTicker(Box::new(data)) } }; @@ -105,7 +105,7 @@ impl OracleProtoMessage { /// This struct is responsible of incoming and outgoing connections. #[derive(Debug)] pub(crate) struct OracleProtoHandler { - state: ProtocolState, + pub(crate) state: ProtocolState, } pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; diff --git a/oracle/src/offchain_data/binance/feeder.rs b/oracle/src/offchain_data/binance/feeder.rs index 3a20612..1f5d1e9 100644 --- a/oracle/src/offchain_data/binance/feeder.rs +++ b/oracle/src/offchain_data/binance/feeder.rs @@ -1,4 +1,4 @@ -use futures::{Stream, StreamExt}; +use futures::{ready, Stream, StreamExt}; use reth_tracing::tracing::error; use std::{ pin::Pin, @@ -79,19 +79,31 @@ impl Stream for BinanceDataFeeder { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - match this.inner.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(msg))) => { - let msg = msg.into_text()?; - let resp: BinanceResponse = serde_json::from_str(&msg)?; - Poll::Ready(Some(Ok(resp.data))) + match ready!(this.inner.poll_next_unpin(cx)) { + Some(Ok(msg)) => { + let msg = match msg.into_text() { + Ok(text) => text, + Err(e) => { + error!(?e, "Failed to convert message to text, skipping"); + return Poll::Pending; + } + }; + + let msg = match serde_json::from_str::(&msg) { + Ok(msg) => msg, + Err(e) => { + error!(?e, ?msg, "Failed to decode message, skipping"); + return Poll::Pending; + } + }; + + Poll::Ready(Some(Ok(msg.data))) } - Poll::Ready(Some(Err(e))) => { - // handle ws disconnections + Some(Err(e)) => { error!(?e, "Binance ws disconnected, reconnecting"); Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + None => Poll::Ready(None), } } } diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs index 6dd56e3..93ceaff 100644 --- a/oracle/src/offchain_data/binance/mod.rs +++ b/oracle/src/offchain_data/binance/mod.rs @@ -62,10 +62,14 @@ impl Stream for BinanceConnection { info!("Reconnected to Binance WebSocket successfully."); } Poll::Ready(Err(e)) => { + info!(?e, "Failed to reconnect to Binance WebSocket."); this.reconnection = None; return Poll::Ready(Some(Err(e))); } - Poll::Pending => return Poll::Pending, + Poll::Pending => { + this.reconnection = Some(fut); + return Poll::Pending; + } } } @@ -75,10 +79,16 @@ impl Stream for BinanceConnection { error!(?e, "Binance WebSocket disconnected. Attempting to reconnect..."); let fut = BinanceDataFeeder::new(this.inner.symbols.clone()).boxed(); this.reconnection = Some(fut); + // make sure we are awaken to poll the reconnection future + cx.waker().wake_by_ref(); Poll::Pending } Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + Poll::Pending => { + // wake up the task to poll the stream again + cx.waker().wake_by_ref(); + Poll::Pending + } } } } diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index 9f53cf3..cdd6812 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,6 +1,6 @@ use crate::{ exex::ExEx, - network::{proto::data::SignedTicker, Network}, + network::{proto::data::SignedTicker, OracleNetwork}, offchain_data::{DataFeederStream, DataFeeds}, }; use alloy_rlp::{BytesMut, Encodable}; @@ -20,7 +20,7 @@ use std::{ pub(crate) struct Oracle { /// The network task for this node. /// It is composed by a discovery task and a sub protocol RLPx task. - network: Network, + network: OracleNetwork, /// The execution extension task for this node. exex: ExEx, /// The offchain data feed stream. @@ -34,7 +34,7 @@ pub(crate) struct Oracle { impl Oracle { pub(crate) fn new( exex: ExEx, - network: Network, + network: OracleNetwork, data_feed: DataFeederStream, to_gossip: tokio::sync::broadcast::Sender, ) -> Self { @@ -73,7 +73,10 @@ impl Future for Oracle { ticker.encode(&mut buffer); let signature = this.signer.sign_message_sync(&buffer)?; let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address()); - this.to_gossip.send(signed_ticker.clone())?; + + if let Err(err) = this.to_gossip.send(signed_ticker.clone()) { + error!(?err, "Failed to send ticker to gossip"); + } } Some(Err(e)) => { error!(?e, "Data feed task encountered an error"); From 055af66d7ccdad0ab72d610a6962ad478ab0ffb4 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Thu, 19 Sep 2024 20:01:52 +0200 Subject: [PATCH 11/14] feat: wiring the gossip directly in the subprotocol connection --- oracle/src/main.rs | 7 ++- oracle/src/network/gossip.rs | 61 -------------------------- oracle/src/network/mod.rs | 13 ++---- oracle/src/network/proto/connection.rs | 9 +++- oracle/src/network/proto/mod.rs | 25 +++++++++-- oracle/src/oracle.rs | 12 ++--- 6 files changed, 41 insertions(+), 86 deletions(-) delete mode 100644 oracle/src/network/gossip.rs diff --git a/oracle/src/main.rs b/oracle/src/main.rs index cd6ca45..2748b77 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -25,7 +25,7 @@ fn main() -> eyre::Result<()> { .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { // define the oracle subprotocol - let (subproto, proto_events) = OracleProtoHandler::new(); + let (subproto, proto_events, to_peers) = OracleProtoHandler::new(); // add it to the network as a subprotocol ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); @@ -34,8 +34,7 @@ fn main() -> eyre::Result<()> { // the instance of the oracle network that will handle discovery and // gossiping of data - let (network, to_gossip) = - OracleNetwork::new(proto_events, tcp_port, udp_port).await?; + let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?; // the off-chain data feed stream let data_feed = DataFeederStream::new(args.binance_symbols).await?; @@ -43,7 +42,7 @@ fn main() -> eyre::Result<()> { // the offchain data stream and the gossiping // the oracle will always sign and broadcast data via the channel until a peer is // subcribed to it - let oracle = Oracle::new(exex, network, data_feed, to_gossip); + let oracle = Oracle::new(exex, network, data_feed, to_peers); Ok(oracle) }) .launch() diff --git a/oracle/src/network/gossip.rs b/oracle/src/network/gossip.rs deleted file mode 100644 index 0270940..0000000 --- a/oracle/src/network/gossip.rs +++ /dev/null @@ -1,61 +0,0 @@ -use reth_tracing::tracing::{error, info}; -use tokio::sync::{ - broadcast::{error::RecvError, Sender}, - mpsc::UnboundedSender, -}; - -use super::proto::{connection::OracleCommand, data::SignedTicker}; - -/// The size of the broadcast channel. -/// -/// This value is based on the estimated message rate and the tolerance for lag. -/// - We assume an average of 10-20 updates per second per symbol. -/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second. -/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds. -/// -/// Thus, the buffer size is calculated as: -/// -/// `Buffer Size = Message Rate per Second * Lag Tolerance` -/// -/// For 2 symbols, we calculate: `40 * 5 = 200`. -const BROADCAST_CHANNEL_SIZE: usize = 200; - -pub(crate) struct Gossip { - pub(crate) inner: Sender, -} - -impl Gossip { - /// Creates a new Broadcast instance. - pub(crate) fn new() -> (Self, tokio::sync::broadcast::Sender) { - let (sender, _receiver) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); - (Self { inner: sender.clone() }, sender) - } - - /// Starts to gossip data to the connected peers from the broadcast channel. - pub(crate) fn start( - &self, - to_connection: UnboundedSender, - ) -> Result<(), RecvError> { - let mut receiver = self.inner.subscribe(); - - tokio::task::spawn(async move { - loop { - match receiver.recv().await { - Ok(signed_data) => { - if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data.clone())) - { - error!(?e, "Failed to broadcast message to peer"); - } - info!(?signed_data, "Broadcasted message to peer"); - } - Err(e) => { - error!(?e, "Data feed task encountered an error"); - return Err::<(), RecvError>(e); - } - } - } - }); - - Ok(()) - } -} diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index a46c2a3..4fd0bf6 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,7 +1,6 @@ use discovery::Discovery; use futures::FutureExt; -use gossip::Gossip; -use proto::{data::SignedTicker, ProtocolEvent}; +use proto::ProtocolEvent; use reth_tracing::tracing::{error, info}; use std::{ future::Future, @@ -9,10 +8,8 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::sync::broadcast::Sender; mod discovery; -mod gossip; pub(crate) mod proto; /// The OracleNetwork struct is a long running task that orchestrates discovery of new peers and @@ -22,8 +19,6 @@ pub(crate) struct OracleNetwork { discovery: Discovery, /// The protocol events channel. proto_events: proto::ProtoEvents, - /// Helper struct to manage gossiping data to connected peers. - gossip: Gossip, } impl OracleNetwork { @@ -31,12 +26,11 @@ impl OracleNetwork { proto_events: proto::ProtoEvents, tcp_port: u16, udp_port: u16, - ) -> eyre::Result<(Self, Sender)> { + ) -> eyre::Result { let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); let discovery = Discovery::new(disc_addr, rlpx_addr).await?; - let (gossip, to_gossip) = Gossip::new(); - Ok((Self { discovery, proto_events, gossip }, to_gossip)) + Ok(Self { discovery, proto_events }) } } @@ -72,7 +66,6 @@ impl Future for OracleNetwork { ?to_connection, "Established connection, will start gossiping" ); - this.gossip.start(to_connection)?; } Poll::Ready(None) => { return Poll::Pending; diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index 587d6d8..6229ff4 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -15,9 +15,10 @@ use std::{ task::{ready, Context, Poll}, }; use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; /// The commands supported by the OracleConnection. +#[derive(Clone)] pub(crate) enum OracleCommand { /// Sends a signed tick to a peer Tick(SignedTicker), @@ -27,6 +28,7 @@ pub(crate) enum OracleCommand { pub(crate) struct OracleConnection { conn: ProtocolConnection, commands: UnboundedReceiverStream, + signed_ticks: BroadcastStream, initial_ping: Option, attestations: Vec
, } @@ -50,6 +52,10 @@ impl Stream for OracleConnection { }; } + if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) { + return Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())); + } + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { @@ -115,6 +121,7 @@ impl ConnectionHandler for OracleConnHandler { conn, initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), + signed_ticks: BroadcastStream::new(self.state.to_peers.subscribe()), attestations: Vec::new(), } } diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index 3ef0a2a..39c4ffd 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -102,19 +102,35 @@ impl OracleProtoMessage { } } +pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; +pub(crate) type ToPeers = tokio::sync::broadcast::Sender; + /// This struct is responsible of incoming and outgoing connections. #[derive(Debug)] pub(crate) struct OracleProtoHandler { pub(crate) state: ProtocolState, } -pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; +/// The size of the broadcast channel. +/// +/// This value is based on the estimated message rate and the tolerance for lag. +/// - We assume an average of 10-20 updates per second per symbol. +/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second. +/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds. +/// +/// Thus, the buffer size is calculated as: +/// +/// `Buffer Size = Message Rate per Second * Lag Tolerance` +/// +/// For 2 symbols, we calculate: `40 * 5 = 200`. +const BROADCAST_CHANNEL_SIZE: usize = 200; impl OracleProtoHandler { /// Creates a new `OracleProtoHandler` with the given protocol state. - pub(crate) fn new() -> (Self, ProtoEvents) { + pub(crate) fn new() -> (Self, ProtoEvents, ToPeers) { let (tx, rx) = mpsc::unbounded_channel(); - (Self { state: ProtocolState { events: tx } }, rx) + let (to_peers, _) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + (Self { state: ProtocolState { events: tx, to_peers: to_peers.clone() } }, rx, to_peers) } } @@ -138,10 +154,11 @@ impl ProtocolHandler for OracleProtoHandler { #[derive(Clone, Debug)] pub(crate) struct ProtocolState { pub(crate) events: mpsc::UnboundedSender, + pub(crate) to_peers: tokio::sync::broadcast::Sender, } /// The events that can be emitted by our custom protocol. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum ProtocolEvent { Established { direction: Direction, diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index cdd6812..284e215 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -27,8 +27,8 @@ pub(crate) struct Oracle { data_feed: DataFeederStream, /// The signer to sign the data feed. signer: PrivateKeySigner, - /// Half of the broadcast channel to send data to gossip. - to_gossip: tokio::sync::broadcast::Sender, + /// Half of the broadcast channel to send data to connected peers. + to_peers: tokio::sync::broadcast::Sender, } impl Oracle { @@ -36,9 +36,9 @@ impl Oracle { exex: ExEx, network: OracleNetwork, data_feed: DataFeederStream, - to_gossip: tokio::sync::broadcast::Sender, + to_peers: tokio::sync::broadcast::Sender, ) -> Self { - Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_gossip } + Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers } } } @@ -74,8 +74,8 @@ impl Future for Oracle { let signature = this.signer.sign_message_sync(&buffer)?; let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address()); - if let Err(err) = this.to_gossip.send(signed_ticker.clone()) { - error!(?err, "Failed to send ticker to gossip"); + if let Err(err) = this.to_peers.send(signed_ticker.clone()) { + error!(?err, "Failed to send ticker to gossip, no peers connected"); } } Some(Err(e)) => { From a6a7a040db6e2ee7197272784913a66a4eb1b176 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Thu, 26 Sep 2024 20:57:31 +0200 Subject: [PATCH 12/14] feat: attestations, unit testing 2 connected peers sending attestations to each other --- Cargo.lock | 4 + Cargo.toml | 2 +- oracle/Cargo.toml | 2 + oracle/src/network/proto/connection.rs | 54 +++++++-- oracle/src/network/proto/data.rs | 10 +- oracle/src/network/proto/mod.rs | 123 ++++++++++++++++++++- oracle/src/offchain_data/binance/ticker.rs | 4 +- 7 files changed, 181 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09c6708..1452950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4241,6 +4241,7 @@ dependencies = [ "alloy-signer", "alloy-signer-local", "clap", + "dashmap", "discv5 0.7.0", "enr", "eyre", @@ -4265,6 +4266,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tungstenite", + "uuid", ] [[package]] @@ -6226,6 +6228,7 @@ dependencies = [ "reth-network-peers", "reth-network-types", "reth-primitives", + "reth-provider", "reth-storage-api", "reth-tasks", "reth-tokio-util", @@ -6235,6 +6238,7 @@ dependencies = [ "secp256k1", "serde", "smallvec", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 75cb58b..7ea50ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ reth-execution-types = { git = "https://github.com/paradigmxyz/reth" } reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] } reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" } reth-evm = { git = "https://github.com/paradigmxyz/reth" } -reth-network = { git = "https://github.com/paradigmxyz/reth" } +reth-network = { git = "https://github.com/paradigmxyz/reth", features = ["test-utils"] } reth-network-api = { git = "https://github.com/paradigmxyz/reth" } reth-network-peers = { git = "https://github.com/paradigmxyz/reth" } reth-node-api = { git = "https://github.com/paradigmxyz/reth" } diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 1ec477f..c76a2bd 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -44,6 +44,8 @@ clap = "4" eyre.workspace = true serde.workspace = true serde_json.workspace = true +uuid = "1.10.0" +dashmap = "6.1.0" reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" } reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" } diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index 6229ff4..a35afcc 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -2,26 +2,28 @@ use super::{ data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState, }; use alloy_rlp::Encodable; +use dashmap::DashMap; use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network::protocol::{ConnectionHandler, OnNotSupported}; use reth_network_api::Direction; -use reth_primitives::{Address, BytesMut}; +use reth_primitives::{Address, BytesMut, B256}; use reth_rpc_types::PeerId; use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; /// The commands supported by the OracleConnection. -#[derive(Clone)] pub(crate) enum OracleCommand { /// Sends a signed tick to a peer - Tick(SignedTicker), + Tick(Box), + /// Requests for attestations from a given id + Attestation(B256, oneshot::Sender>), } /// This struct defines the connection object for the Oracle subprotocol. @@ -30,7 +32,8 @@ pub(crate) struct OracleConnection { commands: UnboundedReceiverStream, signed_ticks: BroadcastStream, initial_ping: Option, - attestations: Vec
, + attestations: DashMap>, + pending_response: Option>>, } impl Stream for OracleConnection { @@ -49,11 +52,22 @@ impl Stream for OracleConnection { OracleCommand::Tick(tick) => { Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())) } + OracleCommand::Attestation(id, pending_response) => { + let attestations = + this.attestations.get(&id).map(|a| a.clone()).unwrap_or_default(); + + this.pending_response = Some(pending_response); + return Poll::Ready(Some( + OracleProtoMessage::attestations(attestations).encoded(), + )); + } }; } if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) { - return Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())); + return Poll::Ready(Some( + OracleProtoMessage::signed_ticker(Box::new(tick)).encoded(), + )); } let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; @@ -67,6 +81,11 @@ impl Stream for OracleConnection { return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) } OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::Attestations(attestations) => { + if let Some(sender) = this.pending_response.take() { + sender.send(attestations).ok(); + } + } OracleProtoMessageKind::SignedTicker(signed_data) => { let signer = signed_data.signer; let sig = signed_data.signature; @@ -74,11 +93,25 @@ impl Stream for OracleConnection { let mut buffer = BytesMut::new(); signed_data.ticker.encode(&mut buffer); - let addr = sig.recover_address_from_msg(buffer).ok().unwrap(); + let addr = match sig.recover_address_from_msg(buffer.clone()) { + Ok(addr) => addr, + Err(_) => return Poll::Ready(None), + }; - if addr == signer && !this.attestations.contains(&addr) { - this.attestations.push(addr); + if addr == signer { + this.attestations + .entry(signed_data.id) + .and_modify(|vec| vec.push(addr)) + .or_insert_with(|| vec![addr]); } + let attestations = this + .attestations + .get(&signed_data.id) + .map(|a| a.clone()) + .unwrap_or_default(); + return Poll::Ready(Some( + OracleProtoMessage::attestations(attestations).encoded(), + )); } } } @@ -122,7 +155,8 @@ impl ConnectionHandler for OracleConnHandler { initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), signed_ticks: BroadcastStream::new(self.state.to_peers.subscribe()), - attestations: Vec::new(), + attestations: DashMap::new(), + pending_response: None, } } } diff --git a/oracle/src/network/proto/data.rs b/oracle/src/network/proto/data.rs index 5af925c..195485d 100644 --- a/oracle/src/network/proto/data.rs +++ b/oracle/src/network/proto/data.rs @@ -1,17 +1,21 @@ use crate::offchain_data::binance::ticker::Ticker; -use alloy_rlp::{RlpDecodable, RlpEncodable}; +use alloy_rlp::{BytesMut, Encodable, RlpDecodable, RlpEncodable}; use alloy_signer::Signature; -use reth_primitives::Address; +use reth_primitives::{keccak256, Address, B256}; #[derive(Clone, Debug, RlpEncodable, RlpDecodable, PartialEq)] pub struct SignedTicker { pub(crate) ticker: Ticker, pub(crate) signature: Signature, pub(crate) signer: Address, + pub(crate) id: B256, } impl SignedTicker { pub fn new(ticker: Ticker, signature: Signature, signer: Address) -> Self { - Self { ticker, signature, signer } + let mut buffer = BytesMut::new(); + ticker.encode(&mut buffer); + let id = keccak256(&buffer); + Self { ticker, signature, signer, id } } } diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index 39c4ffd..4d0a367 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -6,7 +6,7 @@ use data::SignedTicker; use reth_eth_wire::{protocol::Protocol, Capability}; use reth_network::{protocol::ProtocolHandler, Direction}; use reth_network_api::PeerId; -use reth_primitives::{Buf, BufMut, BytesMut}; +use reth_primitives::{Address, Buf, BufMut, BytesMut}; use std::net::SocketAddr; use tokio::sync::mpsc; @@ -19,6 +19,7 @@ pub(crate) enum OracleProtoMessageId { Ping = 0x00, Pong = 0x01, TickData = 0x02, + Attestations = 0x03, } #[derive(Clone, Debug, PartialEq)] @@ -26,6 +27,7 @@ pub(crate) enum OracleProtoMessageKind { Ping, Pong, SignedTicker(Box), + Attestations(Vec
), } #[derive(Clone, Debug)] @@ -45,11 +47,19 @@ impl OracleProtoMessage { Protocol::new(Self::capability(), 4) } + /// Creates an attestations message + pub(crate) fn attestations(attestations: Vec
) -> Self { + Self { + message_type: OracleProtoMessageId::Attestations, + message: OracleProtoMessageKind::Attestations(attestations), + } + } + /// Creates a signed ticker message - pub(crate) fn signed_ticker(data: SignedTicker) -> Self { + pub(crate) fn signed_ticker(data: Box) -> Self { Self { message_type: OracleProtoMessageId::TickData, - message: OracleProtoMessageKind::SignedTicker(Box::new(data)), + message: OracleProtoMessageKind::SignedTicker(data), } } @@ -69,6 +79,9 @@ impl OracleProtoMessage { buf.put_u8(self.message_type as u8); match &self.message { OracleProtoMessageKind::Ping | OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::Attestations(data) => { + data.encode(&mut buf); + } OracleProtoMessageKind::SignedTicker(data) => { data.encode(&mut buf); } @@ -87,11 +100,16 @@ impl OracleProtoMessage { 0x00 => OracleProtoMessageId::Ping, 0x01 => OracleProtoMessageId::Pong, 0x02 => OracleProtoMessageId::TickData, + 0x03 => OracleProtoMessageId::Attestations, _ => return None, }; let message = match message_type { OracleProtoMessageId::Ping => OracleProtoMessageKind::Ping, OracleProtoMessageId::Pong => OracleProtoMessageKind::Pong, + OracleProtoMessageId::Attestations => { + let data = Vec::
::decode(buf).ok()?; + OracleProtoMessageKind::Attestations(data) + } OracleProtoMessageId::TickData => { let data = SignedTicker::decode(buf).ok()?; OracleProtoMessageKind::SignedTicker(Box::new(data)) @@ -172,7 +190,106 @@ mod tests { use crate::offchain_data::binance::ticker::Ticker; use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; + use mpsc::{UnboundedReceiver, UnboundedSender}; + use reth::providers::test_utils::MockEthProvider; + use reth_network::test_utils::Testnet; + use reth_primitives::B256; + use tokio::sync::oneshot; + + #[tokio::test(flavor = "multi_thread")] + async fn can_attest_multiple_times() { + reth_tracing::init_test_tracing(); + let provider = MockEthProvider::default(); + let mut net = Testnet::create_with(2, provider.clone()).await; + + let (events, mut from_peer0) = mpsc::unbounded_channel(); + let (to_peer0, mut broadcast_from_peer0) = + tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + net.peers_mut()[0].add_rlpx_sub_protocol(OracleProtoHandler { + state: ProtocolState { events, to_peers: to_peer0.clone() }, + }); + + let (events, mut from_peer1) = mpsc::unbounded_channel(); + let (to_peer1, mut broadcast_from_peer1) = + tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + net.peers_mut()[1].add_rlpx_sub_protocol(OracleProtoHandler { + state: ProtocolState { events, to_peers: to_peer1.clone() }, + }); + + let handle = net.spawn(); + + handle.connect_peers().await; + + let peer0_conn = established(&mut from_peer0, *handle.peers()[1].peer_id()).await; + let peer1_conn = established(&mut from_peer1, *handle.peers()[0].peer_id()).await; + + let (peer0_att, _, broadcast_from_peer0) = + sent_att(&to_peer0, &mut broadcast_from_peer0).await; + got_att(peer0_conn.clone(), peer0_att.id, peer0_att.signer, 1).await; + + let (peer1_att, _, _) = sent_att(&to_peer1, &mut broadcast_from_peer1).await; + got_att(peer1_conn.clone(), peer1_att.id, peer1_att.signer, 1).await; + + let (att, _to_peer0, _broad_cast_from_peer0) = + sent_att(&to_peer0, broadcast_from_peer0).await; + got_att(peer0_conn, att.id, att.signer, 2).await; + } + async fn established( + from_peer0: &mut UnboundedReceiver, + wanted: PeerId, + ) -> UnboundedSender { + let peer0_to_peer1 = from_peer0.recv().await.unwrap(); + match peer0_to_peer1 { + ProtocolEvent::Established { direction: _, peer_id, to_connection } => { + assert_eq!(wanted, peer_id); + to_connection + } + } + } + async fn got_att( + connection: UnboundedSender, + attestation_id: B256, + expected_signer: Address, + expcted_att_len: usize, + ) { + let (tx, rx) = oneshot::channel(); + + connection.send(OracleCommand::Attestation(attestation_id, tx)).unwrap(); + + let response = rx.await.unwrap(); + assert_eq!(response.len(), expcted_att_len); + assert_eq!(response[expcted_att_len - 1], expected_signer); + } + + async fn sent_att<'a>( + to_peer: &'a tokio::sync::broadcast::Sender, + broadcast_from_peer: &'a mut tokio::sync::broadcast::Receiver, + ) -> ( + SignedTicker, + &'a tokio::sync::broadcast::Sender, + &'a mut tokio::sync::broadcast::Receiver, + ) { + // Create a new signer and sign the ticker data + let signer = PrivateKeySigner::random(); + let signer_address = signer.address(); + let ticker_data = mock_ticker(); + let mut buffer = BytesMut::new(); + ticker_data.encode(&mut buffer); + let signature = signer.sign_message_sync(&buffer).unwrap(); + + // Create the signed ticker + let signed_ticker = SignedTicker::new(ticker_data, signature, signer_address); + + // Send the signed ticker to the peer + to_peer.send(signed_ticker.clone()).unwrap(); + + // Expect it in the broadcast receiver + let received = broadcast_from_peer.recv().await.unwrap(); + assert_eq!(received, signed_ticker); + + (signed_ticker, to_peer, broadcast_from_peer) + } #[test] fn can_decode_msg() { test_msg(OracleProtoMessage::ping()); diff --git a/oracle/src/offchain_data/binance/ticker.rs b/oracle/src/offchain_data/binance/ticker.rs index b760047..73698ad 100644 --- a/oracle/src/offchain_data/binance/ticker.rs +++ b/oracle/src/offchain_data/binance/ticker.rs @@ -11,7 +11,9 @@ pub(crate) struct BinanceResponse { } /// Binance ticker data -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] +#[derive( + Serialize, Deserialize, Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable, Default, +)] pub(crate) struct Ticker { /// Event type (e.g., "24hrTicker") #[serde(rename = "e")] From d87ff09dc3338775fc9ffbfb4bd272b9ed5ab991 Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Fri, 27 Sep 2024 13:17:31 +0200 Subject: [PATCH 13/14] review: ready macro, removed dashmap, more docs, oracle command docs and removed unused --- Cargo.lock | 1 - oracle/Cargo.toml | 1 - oracle/src/network/mod.rs | 19 +++------- oracle/src/network/proto/connection.rs | 48 +++++++++++--------------- 4 files changed, 24 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1452950..f3a1cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4241,7 +4241,6 @@ dependencies = [ "alloy-signer", "alloy-signer-local", "clap", - "dashmap", "discv5 0.7.0", "enr", "eyre", diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index c76a2bd..d510ec0 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -45,7 +45,6 @@ eyre.workspace = true serde.workspace = true serde_json.workspace = true uuid = "1.10.0" -dashmap = "6.1.0" reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" } reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" } diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 4fd0bf6..6880c6b 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,5 +1,5 @@ use discovery::Discovery; -use futures::FutureExt; +use futures::{ready, FutureExt}; use proto::ProtocolEvent; use reth_tracing::tracing::{error, info}; use std::{ @@ -52,14 +52,9 @@ impl Future for OracleNetwork { Poll::Pending => break, } } - loop { - match this.proto_events.poll_recv(cx) { - Poll::Ready(Some(ProtocolEvent::Established { - direction, - peer_id, - to_connection, - })) => { + match ready!(this.proto_events.poll_recv(cx)) { + Some(ProtocolEvent::Established { direction, peer_id, to_connection }) => { info!( ?direction, ?peer_id, @@ -67,13 +62,7 @@ impl Future for OracleNetwork { "Established connection, will start gossiping" ); } - Poll::Ready(None) => { - return Poll::Pending; - } - - Poll::Pending => { - return Poll::Pending; - } + None => return Poll::Ready(Ok(())), } } } diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index a35afcc..0a1d9a5 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -2,7 +2,6 @@ use super::{ data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState, }; use alloy_rlp::Encodable; -use dashmap::DashMap; use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, @@ -12,6 +11,7 @@ use reth_network_api::Direction; use reth_primitives::{Address, BytesMut, B256}; use reth_rpc_types::PeerId; use std::{ + collections::HashMap, pin::Pin, task::{ready, Context, Poll}, }; @@ -20,20 +20,24 @@ use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; /// The commands supported by the OracleConnection. pub(crate) enum OracleCommand { - /// Sends a signed tick to a peer - Tick(Box), - /// Requests for attestations from a given id + /// A peer can request the attestations for a specific tick data. Attestation(B256, oneshot::Sender>), } /// This struct defines the connection object for the Oracle subprotocol. pub(crate) struct OracleConnection { + /// The connection channel receiving RLP bytes from the network. conn: ProtocolConnection, + /// The channel to receive commands from the Oracle network. commands: UnboundedReceiverStream, + /// The channel to receive signed ticks from the Oracle network. signed_ticks: BroadcastStream, + /// The initial ping message to send to the peer. initial_ping: Option, - attestations: DashMap>, - pending_response: Option>>, + /// The attestations received from the peer. + attestations: HashMap>, + /// The pending attestation channel to send back attestations to who requested them. + pending_att: Option>>, } impl Stream for OracleConnection { @@ -48,20 +52,10 @@ impl Stream for OracleConnection { loop { if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { - return match cmd { - OracleCommand::Tick(tick) => { - Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded())) - } - OracleCommand::Attestation(id, pending_response) => { - let attestations = - this.attestations.get(&id).map(|a| a.clone()).unwrap_or_default(); - - this.pending_response = Some(pending_response); - return Poll::Ready(Some( - OracleProtoMessage::attestations(attestations).encoded(), - )); - } - }; + let OracleCommand::Attestation(id, pending_att) = cmd; + let attestations = this.attestations.get(&id).cloned().unwrap_or_default(); + this.pending_att = Some(pending_att); + return Poll::Ready(Some(OracleProtoMessage::attestations(attestations).encoded())); } if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) { @@ -82,7 +76,7 @@ impl Stream for OracleConnection { } OracleProtoMessageKind::Pong => {} OracleProtoMessageKind::Attestations(attestations) => { - if let Some(sender) = this.pending_response.take() { + if let Some(sender) = this.pending_att.take() { sender.send(attestations).ok(); } } @@ -104,11 +98,9 @@ impl Stream for OracleConnection { .and_modify(|vec| vec.push(addr)) .or_insert_with(|| vec![addr]); } - let attestations = this - .attestations - .get(&signed_data.id) - .map(|a| a.clone()) - .unwrap_or_default(); + + let attestations = + this.attestations.get(&signed_data.id).cloned().unwrap_or_default(); return Poll::Ready(Some( OracleProtoMessage::attestations(attestations).encoded(), )); @@ -155,8 +147,8 @@ impl ConnectionHandler for OracleConnHandler { initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), signed_ticks: BroadcastStream::new(self.state.to_peers.subscribe()), - attestations: DashMap::new(), - pending_response: None, + attestations: HashMap::new(), + pending_att: None, } } } From 12d0bfdb112f15e5dcc2c220baf1902d7e5074e2 Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Fri, 27 Sep 2024 13:30:04 +0200 Subject: [PATCH 14/14] clippy --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ae360f..c104fb1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,8 +52,8 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 30 steps: - - uses: actions/checkout@v3 - - uses: dtolnay/rust-toolchain@clippy + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true