diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ae360f..c104fb1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,8 +52,8 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 30 steps: - - uses: actions/checkout@v3 - - uses: dtolnay/rust-toolchain@clippy + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true diff --git a/Cargo.lock b/Cargo.lock index eb92281..f3a1cd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4237,6 +4237,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" name = "oracle" version = "0.1.0" dependencies = [ + "alloy-rlp", + "alloy-signer", + "alloy-signer-local", "clap", "discv5 0.7.0", "enr", @@ -4246,6 +4249,7 @@ dependencies = [ "reth-discv5", "reth-eth-wire", "reth-exex", + "reth-exex-test-utils", "reth-network", "reth-network-api", "reth-network-peers", @@ -4253,9 +4257,15 @@ dependencies = [ "reth-node-ethereum", "reth-primitives", "reth-rpc-types", + "reth-testing-utils", "reth-tracing", + "serde", + "serde_json", + "thiserror", "tokio", "tokio-stream", + "tokio-tungstenite", + "uuid", ] [[package]] @@ -6217,6 +6227,7 @@ dependencies = [ "reth-network-peers", "reth-network-types", "reth-primitives", + "reth-provider", "reth-storage-api", "reth-tasks", "reth-tokio-util", @@ -6226,6 +6237,7 @@ dependencies = [ "secp256k1", "serde", "smallvec", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 0d65d3e..7ea50ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ reth-execution-types = { git = "https://github.com/paradigmxyz/reth" } reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] } reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" } reth-evm = { git = "https://github.com/paradigmxyz/reth" } -reth-network = { git = "https://github.com/paradigmxyz/reth" } +reth-network = { git = "https://github.com/paradigmxyz/reth", features = ["test-utils"] } reth-network-api = { git = "https://github.com/paradigmxyz/reth" } reth-network-peers = { git = "https://github.com/paradigmxyz/reth" } reth-node-api = { git = "https://github.com/paradigmxyz/reth" } @@ -43,6 +43,9 @@ reth-tracing = { git = "https://github.com/paradigmxyz/reth" } # alloy alloy-sol-types = { version = "0.8", features = ["json"] } +alloy-signer-local = "0.3" +alloy-signer = "0.3" +alloy-rlp = "0.3" # async futures = "0.3" @@ -50,6 +53,10 @@ futures-util = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1" +# serde +serde = "1" +serde_json = "1" + # misc eyre = "0.6" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 419bfc4..d510ec0 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -22,6 +22,10 @@ reth-rpc-types.workspace = true reth-tracing.workspace = true reth.workspace = true +alloy-signer-local.workspace = true +alloy-signer.workspace = true +alloy-rlp.workspace = true + # networking discv5 = "0.7" enr = "0.12" @@ -30,7 +34,17 @@ enr = "0.12" futures.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-tungstenite = "0.23" + + +thiserror = "1" # misc clap = "4" eyre.workspace = true +serde.workspace = true +serde_json.workspace = true +uuid = "1.10.0" + +reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" } +reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" } diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs index 86a212c..ac85ff3 100644 --- a/oracle/src/cli_ext.rs +++ b/oracle/src/cli_ext.rs @@ -2,6 +2,7 @@ use clap::Args; pub const DEFAULT_DISCOVERY_PORT: u16 = 30304; pub const DEFAULT_RLPX_PORT: u16 = 30303; +pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdc,ethusdc"; #[derive(Debug, Clone, Args)] pub(crate) struct OracleExt { @@ -12,4 +13,37 @@ pub(crate) struct OracleExt { /// UDP port used for discovery #[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)] pub udp_port: u16, + + /// The list of symbols to configure the stream of prices from binance. + #[clap(long = "data.symbols", use_value_delimiter = true, default_value = DEFAULT_BINANCE_SYMBOLS)] + pub binance_symbols: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::{Args, Parser}; + + /// A helper type to parse Args more easily + #[derive(Parser)] + struct CommandParser { + #[clap(flatten)] + args: T, + } + + #[test] + fn test_oracle_ext() { + let cli = CommandParser::::parse_from([ + "test", + "--disc.tcp-port", + "30304", + "--disc.udp-port", + "30303", + "--data.symbols", + "btcusdt,ethusdt", + ]); + assert_eq!(cli.args.tcp_port, 30304); + assert_eq!(cli.args.udp_port, 30303); + assert_eq!(cli.args.binance_symbols, vec!["btcusdt", "ethusdt"]); + } } diff --git a/oracle/src/main.rs b/oracle/src/main.rs index e2683dc..2748b77 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -1,7 +1,8 @@ use clap::Parser; use cli_ext::OracleExt; use exex::ExEx; -use network::{proto::OracleProtoHandler, Network}; +use network::{proto::OracleProtoHandler, OracleNetwork}; +use offchain_data::DataFeederStream; use oracle::Oracle; use reth::args::utils::DefaultChainSpecParser; use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; @@ -10,6 +11,7 @@ use reth_node_ethereum::EthereumNode; mod cli_ext; mod exex; mod network; +mod offchain_data; mod oracle; const ORACLE_EXEX_ID: &str = "exex-oracle"; @@ -22,12 +24,25 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { - let subproto = OracleProtoHandler::new(); + // define the oracle subprotocol + let (subproto, proto_events, to_peers) = OracleProtoHandler::new(); + // add it to the network as a subprotocol ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); + // the instance of the execution extension that will handle chain events let exex = ExEx::new(ctx); - let network = Network::new(tcp_port, udp_port).await?; - let oracle = Oracle::new(exex, network).await?; + + // the instance of the oracle network that will handle discovery and + // gossiping of data + let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?; + // the off-chain data feed stream + let data_feed = DataFeederStream::new(args.binance_symbols).await?; + + // the oracle instance that will orchestrate the network, the execution extensions, + // the offchain data stream and the gossiping + // the oracle will always sign and broadcast data via the channel until a peer is + // subcribed to it + let oracle = Oracle::new(exex, network, data_feed, to_peers); Ok(oracle) }) .launch() diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index 8113759..6e328cc 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -67,11 +67,11 @@ impl Future for Discovery { match ready!(this.events.poll_recv(cx)) { Some(evt) => match evt { Event::Discovered(enr) => { - info!(?enr, "Discovered a new peer."); + info!(?enr, "Discovered a new node."); this.add_node(enr)?; } Event::SessionEstablished(enr, socket_addr) => { - info!(?enr, ?socket_addr, "Session established with a new peer."); + info!(?enr, ?socket_addr, "Session established with a new node."); } evt => { info!(?evt, "New discovery event."); diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 1a0f42d..6880c6b 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,5 +1,6 @@ use discovery::Discovery; -use futures::FutureExt; +use futures::{ready, FutureExt}; +use proto::ProtocolEvent; use reth_tracing::tracing::{error, info}; use std::{ future::Future, @@ -11,23 +12,29 @@ use std::{ mod discovery; pub(crate) mod proto; -/// The Network struct is a long running task that orchestrates discovery of new peers and network -/// gossiping via the RLPx subprotocol. -pub(crate) struct Network { +/// The OracleNetwork struct is a long running task that orchestrates discovery of new peers and +/// network gossiping via the RLPx subprotocol. +pub(crate) struct OracleNetwork { /// The discovery task for this node. discovery: Discovery, + /// The protocol events channel. + proto_events: proto::ProtoEvents, } -impl Network { - pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { +impl OracleNetwork { + pub(crate) async fn new( + proto_events: proto::ProtoEvents, + tcp_port: u16, + udp_port: u16, + ) -> eyre::Result { let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); let discovery = Discovery::new(disc_addr, rlpx_addr).await?; - Ok(Self { discovery }) + Ok(Self { discovery, proto_events }) } } -impl Future for Network { +impl Future for OracleNetwork { type Output = eyre::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -42,7 +49,20 @@ impl Future for Network { error!(?e, "Discovery task encountered an error"); return Poll::Ready(Err(e)); } - Poll::Pending => {} + Poll::Pending => break, + } + } + loop { + match ready!(this.proto_events.poll_recv(cx)) { + Some(ProtocolEvent::Established { direction, peer_id, to_connection }) => { + info!( + ?direction, + ?peer_id, + ?to_connection, + "Established connection, will start gossiping" + ); + } + None => return Poll::Ready(Ok(())), } } } diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs index 46a3b2f..0a1d9a5 100644 --- a/oracle/src/network/proto/connection.rs +++ b/oracle/src/network/proto/connection.rs @@ -1,35 +1,43 @@ -use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState}; +use super::{ + data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState, +}; +use alloy_rlp::Encodable; use futures::{Stream, StreamExt}; use reth_eth_wire::{ capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, }; use reth_network::protocol::{ConnectionHandler, OnNotSupported}; use reth_network_api::Direction; -use reth_primitives::BytesMut; +use reth_primitives::{Address, BytesMut, B256}; use reth_rpc_types::PeerId; use std::{ + collections::HashMap, pin::Pin, task::{ready, Context, Poll}, }; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; /// The commands supported by the OracleConnection. pub(crate) enum OracleCommand { - /// Sends a message to the peer - Message { - msg: String, - /// The response will be sent to this channel. - response: oneshot::Sender, - }, + /// A peer can request the attestations for a specific tick data. + Attestation(B256, oneshot::Sender>), } /// This struct defines the connection object for the Oracle subprotocol. pub(crate) struct OracleConnection { + /// The connection channel receiving RLP bytes from the network. conn: ProtocolConnection, + /// The channel to receive commands from the Oracle network. commands: UnboundedReceiverStream, - pending_pong: Option>, + /// The channel to receive signed ticks from the Oracle network. + signed_ticks: BroadcastStream, + /// The initial ping message to send to the peer. initial_ping: Option, + /// The attestations received from the peer. + attestations: HashMap>, + /// The pending attestation channel to send back attestations to who requested them. + pending_att: Option>>, } impl Stream for OracleConnection { @@ -44,12 +52,16 @@ impl Stream for OracleConnection { loop { if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { - return match cmd { - OracleCommand::Message { msg, response } => { - this.pending_pong = Some(response); - Poll::Ready(Some(OracleProtoMessage::ping_message(msg).encoded())) - } - }; + let OracleCommand::Attestation(id, pending_att) = cmd; + let attestations = this.attestations.get(&id).cloned().unwrap_or_default(); + this.pending_att = Some(pending_att); + return Poll::Ready(Some(OracleProtoMessage::attestations(attestations).encoded())); + } + + if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) { + return Poll::Ready(Some( + OracleProtoMessage::signed_ticker(Box::new(tick)).encoded(), + )); } let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; @@ -63,18 +75,37 @@ impl Stream for OracleConnection { return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) } OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::PingMessage(msg) => { - return Poll::Ready(Some(OracleProtoMessage::pong_message(msg).encoded())) + OracleProtoMessageKind::Attestations(attestations) => { + if let Some(sender) = this.pending_att.take() { + sender.send(attestations).ok(); + } } - OracleProtoMessageKind::PongMessage(msg) => { - if let Some(sender) = this.pending_pong.take() { - sender.send(msg).ok(); + OracleProtoMessageKind::SignedTicker(signed_data) => { + let signer = signed_data.signer; + let sig = signed_data.signature; + + let mut buffer = BytesMut::new(); + signed_data.ticker.encode(&mut buffer); + + let addr = match sig.recover_address_from_msg(buffer.clone()) { + Ok(addr) => addr, + Err(_) => return Poll::Ready(None), + }; + + if addr == signer { + this.attestations + .entry(signed_data.id) + .and_modify(|vec| vec.push(addr)) + .or_insert_with(|| vec![addr]); } - continue; + + let attestations = + this.attestations.get(&signed_data.id).cloned().unwrap_or_default(); + return Poll::Ready(Some( + OracleProtoMessage::attestations(attestations).encoded(), + )); } } - - return Poll::Pending; } } } @@ -115,7 +146,9 @@ impl ConnectionHandler for OracleConnHandler { conn, initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), commands: UnboundedReceiverStream::new(rx), - pending_pong: None, + signed_ticks: BroadcastStream::new(self.state.to_peers.subscribe()), + attestations: HashMap::new(), + pending_att: None, } } } diff --git a/oracle/src/network/proto/data.rs b/oracle/src/network/proto/data.rs new file mode 100644 index 0000000..195485d --- /dev/null +++ b/oracle/src/network/proto/data.rs @@ -0,0 +1,21 @@ +use crate::offchain_data::binance::ticker::Ticker; +use alloy_rlp::{BytesMut, Encodable, RlpDecodable, RlpEncodable}; +use alloy_signer::Signature; +use reth_primitives::{keccak256, Address, B256}; + +#[derive(Clone, Debug, RlpEncodable, RlpDecodable, PartialEq)] +pub struct SignedTicker { + pub(crate) ticker: Ticker, + pub(crate) signature: Signature, + pub(crate) signer: Address, + pub(crate) id: B256, +} + +impl SignedTicker { + pub fn new(ticker: Ticker, signature: Signature, signer: Address) -> Self { + let mut buffer = BytesMut::new(); + ticker.encode(&mut buffer); + let id = keccak256(&buffer); + Self { ticker, signature, signer, id } + } +} diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index 91f617d..4d0a367 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -1,33 +1,36 @@ #![allow(dead_code)] +use alloy_rlp::{Decodable, Encodable}; use connection::{OracleCommand, OracleConnHandler}; +use data::SignedTicker; use reth_eth_wire::{protocol::Protocol, Capability}; use reth_network::{protocol::ProtocolHandler, Direction}; use reth_network_api::PeerId; -use reth_primitives::{Buf, BufMut, BytesMut}; +use reth_primitives::{Address, Buf, BufMut, BytesMut}; use std::net::SocketAddr; use tokio::sync::mpsc; pub(crate) mod connection; +pub(crate) mod data; #[repr(u8)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum OracleProtoMessageId { Ping = 0x00, Pong = 0x01, - PingMessage = 0x02, - PongMessage = 0x03, + TickData = 0x02, + Attestations = 0x03, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) enum OracleProtoMessageKind { Ping, Pong, - PingMessage(String), - PongMessage(String), + SignedTicker(Box), + Attestations(Vec
), } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] pub(crate) struct OracleProtoMessage { pub(crate) message_type: OracleProtoMessageId, pub(crate) message: OracleProtoMessageKind, @@ -44,18 +47,19 @@ impl OracleProtoMessage { Protocol::new(Self::capability(), 4) } - /// Creates a ping message - pub(crate) fn ping_message(msg: impl Into) -> Self { + /// Creates an attestations message + pub(crate) fn attestations(attestations: Vec
) -> Self { Self { - message_type: OracleProtoMessageId::PingMessage, - message: OracleProtoMessageKind::PingMessage(msg.into()), + message_type: OracleProtoMessageId::Attestations, + message: OracleProtoMessageKind::Attestations(attestations), } } - /// Creates a ping message - pub(crate) fn pong_message(msg: impl Into) -> Self { + + /// Creates a signed ticker message + pub(crate) fn signed_ticker(data: Box) -> Self { Self { - message_type: OracleProtoMessageId::PongMessage, - message: OracleProtoMessageKind::PongMessage(msg.into()), + message_type: OracleProtoMessageId::TickData, + message: OracleProtoMessageKind::SignedTicker(data), } } @@ -75,8 +79,11 @@ impl OracleProtoMessage { buf.put_u8(self.message_type as u8); match &self.message { OracleProtoMessageKind::Ping | OracleProtoMessageKind::Pong => {} - OracleProtoMessageKind::PingMessage(msg) | OracleProtoMessageKind::PongMessage(msg) => { - buf.put(msg.as_bytes()); + OracleProtoMessageKind::Attestations(data) => { + data.encode(&mut buf); + } + OracleProtoMessageKind::SignedTicker(data) => { + data.encode(&mut buf); } } buf @@ -92,18 +99,20 @@ impl OracleProtoMessage { let message_type = match id { 0x00 => OracleProtoMessageId::Ping, 0x01 => OracleProtoMessageId::Pong, - 0x02 => OracleProtoMessageId::PingMessage, - 0x03 => OracleProtoMessageId::PongMessage, + 0x02 => OracleProtoMessageId::TickData, + 0x03 => OracleProtoMessageId::Attestations, _ => return None, }; let message = match message_type { OracleProtoMessageId::Ping => OracleProtoMessageKind::Ping, OracleProtoMessageId::Pong => OracleProtoMessageKind::Pong, - OracleProtoMessageId::PingMessage => { - OracleProtoMessageKind::PingMessage(String::from_utf8_lossy(&buf[..]).into_owned()) + OracleProtoMessageId::Attestations => { + let data = Vec::
::decode(buf).ok()?; + OracleProtoMessageKind::Attestations(data) } - OracleProtoMessageId::PongMessage => { - OracleProtoMessageKind::PongMessage(String::from_utf8_lossy(&buf[..]).into_owned()) + OracleProtoMessageId::TickData => { + let data = SignedTicker::decode(buf).ok()?; + OracleProtoMessageKind::SignedTicker(Box::new(data)) } }; @@ -111,17 +120,35 @@ impl OracleProtoMessage { } } +pub(crate) type ProtoEvents = mpsc::UnboundedReceiver; +pub(crate) type ToPeers = tokio::sync::broadcast::Sender; + /// This struct is responsible of incoming and outgoing connections. #[derive(Debug)] pub(crate) struct OracleProtoHandler { - state: ProtocolState, + pub(crate) state: ProtocolState, } +/// The size of the broadcast channel. +/// +/// This value is based on the estimated message rate and the tolerance for lag. +/// - We assume an average of 10-20 updates per second per symbol. +/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second. +/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds. +/// +/// Thus, the buffer size is calculated as: +/// +/// `Buffer Size = Message Rate per Second * Lag Tolerance` +/// +/// For 2 symbols, we calculate: `40 * 5 = 200`. +const BROADCAST_CHANNEL_SIZE: usize = 200; + impl OracleProtoHandler { /// Creates a new `OracleProtoHandler` with the given protocol state. - pub(crate) fn new() -> Self { - let (tx, _) = mpsc::unbounded_channel(); - Self { state: ProtocolState { events: tx } } + pub(crate) fn new() -> (Self, ProtoEvents, ToPeers) { + let (tx, rx) = mpsc::unbounded_channel(); + let (to_peers, _) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + (Self { state: ProtocolState { events: tx, to_peers: to_peers.clone() } }, rx, to_peers) } } @@ -145,10 +172,11 @@ impl ProtocolHandler for OracleProtoHandler { #[derive(Clone, Debug)] pub(crate) struct ProtocolState { pub(crate) events: mpsc::UnboundedSender, + pub(crate) to_peers: tokio::sync::broadcast::Sender, } /// The events that can be emitted by our custom protocol. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum ProtocolEvent { Established { direction: Direction, @@ -156,3 +184,173 @@ pub(crate) enum ProtocolEvent { to_connection: mpsc::UnboundedSender, }, } +#[cfg(test)] +mod tests { + use super::*; + use crate::offchain_data::binance::ticker::Ticker; + use alloy_signer::SignerSync; + use alloy_signer_local::PrivateKeySigner; + use mpsc::{UnboundedReceiver, UnboundedSender}; + use reth::providers::test_utils::MockEthProvider; + use reth_network::test_utils::Testnet; + use reth_primitives::B256; + use tokio::sync::oneshot; + + #[tokio::test(flavor = "multi_thread")] + async fn can_attest_multiple_times() { + reth_tracing::init_test_tracing(); + let provider = MockEthProvider::default(); + let mut net = Testnet::create_with(2, provider.clone()).await; + + let (events, mut from_peer0) = mpsc::unbounded_channel(); + let (to_peer0, mut broadcast_from_peer0) = + tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + net.peers_mut()[0].add_rlpx_sub_protocol(OracleProtoHandler { + state: ProtocolState { events, to_peers: to_peer0.clone() }, + }); + + let (events, mut from_peer1) = mpsc::unbounded_channel(); + let (to_peer1, mut broadcast_from_peer1) = + tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE); + net.peers_mut()[1].add_rlpx_sub_protocol(OracleProtoHandler { + state: ProtocolState { events, to_peers: to_peer1.clone() }, + }); + + let handle = net.spawn(); + + handle.connect_peers().await; + + let peer0_conn = established(&mut from_peer0, *handle.peers()[1].peer_id()).await; + let peer1_conn = established(&mut from_peer1, *handle.peers()[0].peer_id()).await; + + let (peer0_att, _, broadcast_from_peer0) = + sent_att(&to_peer0, &mut broadcast_from_peer0).await; + got_att(peer0_conn.clone(), peer0_att.id, peer0_att.signer, 1).await; + + let (peer1_att, _, _) = sent_att(&to_peer1, &mut broadcast_from_peer1).await; + got_att(peer1_conn.clone(), peer1_att.id, peer1_att.signer, 1).await; + + let (att, _to_peer0, _broad_cast_from_peer0) = + sent_att(&to_peer0, broadcast_from_peer0).await; + got_att(peer0_conn, att.id, att.signer, 2).await; + } + + async fn established( + from_peer0: &mut UnboundedReceiver, + wanted: PeerId, + ) -> UnboundedSender { + let peer0_to_peer1 = from_peer0.recv().await.unwrap(); + match peer0_to_peer1 { + ProtocolEvent::Established { direction: _, peer_id, to_connection } => { + assert_eq!(wanted, peer_id); + to_connection + } + } + } + async fn got_att( + connection: UnboundedSender, + attestation_id: B256, + expected_signer: Address, + expcted_att_len: usize, + ) { + let (tx, rx) = oneshot::channel(); + + connection.send(OracleCommand::Attestation(attestation_id, tx)).unwrap(); + + let response = rx.await.unwrap(); + assert_eq!(response.len(), expcted_att_len); + assert_eq!(response[expcted_att_len - 1], expected_signer); + } + + async fn sent_att<'a>( + to_peer: &'a tokio::sync::broadcast::Sender, + broadcast_from_peer: &'a mut tokio::sync::broadcast::Receiver, + ) -> ( + SignedTicker, + &'a tokio::sync::broadcast::Sender, + &'a mut tokio::sync::broadcast::Receiver, + ) { + // Create a new signer and sign the ticker data + let signer = PrivateKeySigner::random(); + let signer_address = signer.address(); + let ticker_data = mock_ticker(); + let mut buffer = BytesMut::new(); + ticker_data.encode(&mut buffer); + let signature = signer.sign_message_sync(&buffer).unwrap(); + + // Create the signed ticker + let signed_ticker = SignedTicker::new(ticker_data, signature, signer_address); + + // Send the signed ticker to the peer + to_peer.send(signed_ticker.clone()).unwrap(); + + // Expect it in the broadcast receiver + let received = broadcast_from_peer.recv().await.unwrap(); + assert_eq!(received, signed_ticker); + + (signed_ticker, to_peer, broadcast_from_peer) + } + #[test] + fn can_decode_msg() { + test_msg(OracleProtoMessage::ping()); + test_msg(OracleProtoMessage::pong()); + + // generate signer + let signer = PrivateKeySigner::random(); + let signer_address = signer.address(); + + // sign ticker data + let ticker_data = mock_ticker(); + let mut buffer = BytesMut::new(); + ticker_data.encode(&mut buffer); + let signature = signer.sign_message_sync(&buffer).unwrap(); + + // recover signer address and verify it was signed properly + let recovered_addr = signature.recover_address_from_msg(&buffer).unwrap(); + assert_eq!(recovered_addr, signer_address); + + // create signed ticker + let signed_ticker = SignedTicker::new(ticker_data, signature, signer_address); + + // test the RLP encoding/decoding + test_msg(OracleProtoMessage { + message_type: OracleProtoMessageId::TickData, + message: OracleProtoMessageKind::SignedTicker(Box::new(signed_ticker)), + }); + } + + fn test_msg(message: OracleProtoMessage) { + let encoded = message.encoded(); + let decoded = OracleProtoMessage::decode_message(&mut &encoded[..]).unwrap(); + assert_eq!(message.message_type, decoded.message_type); + assert_eq!(message.message, decoded.message); + } + + fn mock_ticker() -> Ticker { + Ticker { + event_type: "24hrTicker".to_string(), + event_time: 1622548800000, + symbol: "BTCUSDT".to_string(), + price_change: "100.0".to_string(), + price_change_percent: "1.0".to_string(), + weighted_avg_price: "50000.0".to_string(), + prev_close_price: "49000.0".to_string(), + last_price: "50000.0".to_string(), + last_quantity: "0.1".to_string(), + best_bid_price: "49950.0".to_string(), + best_bid_quantity: "0.2".to_string(), + best_ask_price: "50050.0".to_string(), + best_ask_quantity: "0.3".to_string(), + open_price: "49000.0".to_string(), + high_price: "51000.0".to_string(), + low_price: "48000.0".to_string(), + volume: "1000.0".to_string(), + quote_volume: "50000000.0".to_string(), + open_time: 1622462400000, + close_time: 1622548800000, + first_trade_id: 100000, + last_trade_id: 100100, + num_trades: 100, + } + } +} diff --git a/oracle/src/offchain_data/binance/feeder.rs b/oracle/src/offchain_data/binance/feeder.rs new file mode 100644 index 0000000..1f5d1e9 --- /dev/null +++ b/oracle/src/offchain_data/binance/feeder.rs @@ -0,0 +1,124 @@ +use futures::{ready, Stream, StreamExt}; +use reth_tracing::tracing::error; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; +use tokio::net::TcpStream; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; + +use super::ticker::{BinanceResponse, Ticker}; + +#[derive(Error, Debug)] +pub(crate) enum BinanceDataFeederError { + /// Error connecting to the WebSocket. + #[error("error connecting to WebSocket")] + Connection(#[from] tokio_tungstenite::tungstenite::Error), + /// Error decoding the message. + #[error("error decoding message")] + Decode(#[from] serde_json::Error), + /// Error reconnecting after max retries. + #[error("reached max number of reconnection attempts")] + MaxRetriesExceeded, +} + +/// This structure controls the interaction with the Binance WebSocket API. +pub(crate) struct BinanceDataFeeder { + /// The WebSocket stream. + pub(crate) inner: WebSocketStream>, + pub(crate) symbols: Vec, +} + +impl BinanceDataFeeder { + /// Creates a new BinanceDataFeeder instance. + pub(crate) async fn new(symbols: Vec) -> Result { + let query = symbols + .iter() + .map(|symbol| format!("{}@ticker", symbol)) + .collect::>() + .join("/"); + + let url = format!("wss://stream.binance.com/stream?streams={}", query); + let client = Self::connect_with_retries(url.to_string()).await?; + + Ok(Self { inner: client, symbols }) + } + + /// Function to connect with retries and an optional delay between retries + pub(crate) async fn connect_with_retries( + url: String, + ) -> Result>, BinanceDataFeederError> { + let mut attempts = 0; + let max_retries = 10; + + while attempts < max_retries { + let conn = connect_async(url.clone()).await; + + match conn { + Ok((connection, _)) => return Ok(connection), + Err(e) => { + error!(?e, attempts, max_retries, "Connection attempt failed, retrying..."); + attempts += 1; + if attempts < max_retries { + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + } + } + + Err(BinanceDataFeederError::MaxRetriesExceeded) + } +} + +/// We implement the Stream trait for the BinanceDataFeeder struct +/// in order to encode the messages received from the WebSocket into our Ticker struct. +impl Stream for BinanceDataFeeder { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match ready!(this.inner.poll_next_unpin(cx)) { + Some(Ok(msg)) => { + let msg = match msg.into_text() { + Ok(text) => text, + Err(e) => { + error!(?e, "Failed to convert message to text, skipping"); + return Poll::Pending; + } + }; + + let msg = match serde_json::from_str::(&msg) { + Ok(msg) => msg, + Err(e) => { + error!(?e, ?msg, "Failed to decode message, skipping"); + return Poll::Pending; + } + }; + + Poll::Ready(Some(Ok(msg.data))) + } + Some(Err(e)) => { + error!(?e, "Binance ws disconnected, reconnecting"); + Poll::Ready(Some(Err(BinanceDataFeederError::Connection(e)))) + } + None => Poll::Ready(None), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[ignore] + #[tokio::test] + async fn can_connect() { + let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()]; + let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap(); + let msg = feeder.next().await.unwrap().unwrap(); + assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT"); + } +} diff --git a/oracle/src/offchain_data/binance/mod.rs b/oracle/src/offchain_data/binance/mod.rs new file mode 100644 index 0000000..93ceaff --- /dev/null +++ b/oracle/src/offchain_data/binance/mod.rs @@ -0,0 +1,94 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use feeder::{BinanceDataFeeder, BinanceDataFeederError}; +use futures::{FutureExt, Stream, StreamExt}; +use reth_tracing::tracing::{error, info}; +use ticker::Ticker; + +pub(crate) mod feeder; +pub(crate) mod ticker; + +/// The reconnection future for the Binance WebSocket connection. +type ReconnectionFuture = + Pin> + Send>>; + +/// A struct that manages a Binance WebSocket connection and handles reconnection logic +/// by proxying the underlying `BinanceDataFeeder`. +/// +/// The connection is automatically re-established if it encounters a `Connection` error. +pub(crate) struct BinanceConnection { + pub(crate) inner: BinanceDataFeeder, + reconnection: Option, +} + +impl BinanceConnection { + /// Creates a new `BinanceConnection` with an initial WebSocket connection + /// to the Binance API using the provided list of symbols. + pub(crate) async fn new(symbols: Vec) -> Result { + let inner = BinanceDataFeeder::new(symbols).await?; + Ok(Self { inner, reconnection: None }) + } +} + +impl Stream for BinanceConnection { + type Item = Result; + + /// Polls the next message from the Binance WebSocket connection. + /// + /// If the connection is active, it will poll the next message. In case of a connection error, + /// it will attempt to reconnect by storing a future in `reconnect_future` and polling it until + /// reconnection is successful. + /// + /// # Returns + /// - `Poll::Ready(Some(Ok(Ticker)))`: If a new ticker message is successfully received. + /// - `Poll::Ready(Some(Err(BinanceDataFeederError)))`: If there is an error in the WebSocket + /// connection. + /// - `Poll::Ready(None)`: If the WebSocket stream has ended. + /// - `Poll::Pending`: If the WebSocket stream is still active or a reconnection is in progress. + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + let reconn = this.reconnection.take(); + + if let Some(mut fut) = reconn { + match fut.as_mut().poll(cx) { + Poll::Ready(Ok(conn)) => { + this.inner = conn; + this.reconnection = None; + info!("Reconnected to Binance WebSocket successfully."); + } + Poll::Ready(Err(e)) => { + info!(?e, "Failed to reconnect to Binance WebSocket."); + this.reconnection = None; + return Poll::Ready(Some(Err(e))); + } + Poll::Pending => { + this.reconnection = Some(fut); + return Poll::Pending; + } + } + } + + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => Poll::Ready(Some(Ok(msg))), + Poll::Ready(Some(Err(e))) => { + error!(?e, "Binance WebSocket disconnected. Attempting to reconnect..."); + let fut = BinanceDataFeeder::new(this.inner.symbols.clone()).boxed(); + this.reconnection = Some(fut); + // make sure we are awaken to poll the reconnection future + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => { + // wake up the task to poll the stream again + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } +} diff --git a/oracle/src/offchain_data/binance/ticker.rs b/oracle/src/offchain_data/binance/ticker.rs new file mode 100644 index 0000000..73698ad --- /dev/null +++ b/oracle/src/offchain_data/binance/ticker.rs @@ -0,0 +1,87 @@ +use alloy_rlp::{RlpDecodable, RlpEncodable}; +use serde::{Deserialize, Serialize}; + +/// Struct representing the full JSON response from Binance +#[derive(Serialize, Deserialize, Debug)] +pub(crate) struct BinanceResponse { + /// Stream name (e.g., "ethusdt@ticker") + stream: String, + /// The ticker data nested inside the `data` field + pub data: Ticker, +} + +/// Binance ticker data +#[derive( + Serialize, Deserialize, Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable, Default, +)] +pub(crate) struct Ticker { + /// Event type (e.g., "24hrTicker") + #[serde(rename = "e")] + pub(crate) event_type: String, + /// Event time (timestamp) + #[serde(rename = "E")] + pub(crate) event_time: u64, + /// Trading pair symbol + #[serde(rename = "s")] + pub(crate) symbol: String, + /// Price change over the last 24 hours + #[serde(rename = "p")] + pub(crate) price_change: String, + /// Price change percent + #[serde(rename = "P")] + pub(crate) price_change_percent: String, + /// Weighted average price + #[serde(rename = "w")] + pub(crate) weighted_avg_price: String, + /// Previous day's close price + #[serde(rename = "x")] + pub(crate) prev_close_price: String, + /// Current price (last trade price) + #[serde(rename = "c")] + pub(crate) last_price: String, + /// Last quantity traded + #[serde(rename = "Q")] + pub(crate) last_quantity: String, + /// Best bid price + #[serde(rename = "b")] + pub(crate) best_bid_price: String, + /// Best bid quantity + #[serde(rename = "B")] + pub(crate) best_bid_quantity: String, + /// Best ask price + #[serde(rename = "a")] + pub(crate) best_ask_price: String, + /// Best ask quantity + #[serde(rename = "A")] + pub(crate) best_ask_quantity: String, + /// Open price for the 24-hour period + #[serde(rename = "o")] + pub(crate) open_price: String, + /// High price of the 24-hour period + #[serde(rename = "h")] + pub(crate) high_price: String, + /// Low price of the 24-hour period + #[serde(rename = "l")] + pub(crate) low_price: String, + /// Total traded volume of the base asset + #[serde(rename = "v")] + pub(crate) volume: String, + /// Total traded volume of the quote asset + #[serde(rename = "q")] + pub(crate) quote_volume: String, + /// Open time (timestamp) + #[serde(rename = "O")] + pub(crate) open_time: u64, + /// Close time (timestamp) + #[serde(rename = "C")] + pub(crate) close_time: u64, + /// First trade ID + #[serde(rename = "F")] + pub(crate) first_trade_id: u64, + /// Last trade ID + #[serde(rename = "L")] + pub(crate) last_trade_id: u64, + /// Total number of trades + #[serde(rename = "n")] + pub(crate) num_trades: u64, +} diff --git a/oracle/src/offchain_data/mod.rs b/oracle/src/offchain_data/mod.rs new file mode 100644 index 0000000..0b1cdf8 --- /dev/null +++ b/oracle/src/offchain_data/mod.rs @@ -0,0 +1,53 @@ +use binance::{feeder::BinanceDataFeederError, ticker::Ticker, BinanceConnection}; +use futures::{stream::Stream, StreamExt}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use thiserror::Error; + +pub(crate) mod binance; + +/// The enum that represents the various types of data feeds, e.g., Binance. +pub(crate) enum DataFeeds { + Binance(Ticker), +} + +/// The error enum that wraps errors from all data feeders. +#[derive(Error, Debug)] +pub(crate) enum DataFeederError { + #[error(transparent)] + Binance(#[from] BinanceDataFeederError), +} + +/// The struct that implements the Stream trait for polling multiple data feeds. +pub(crate) struct DataFeederStream { + binance: BinanceConnection, + // Add other feeder fields if needed. +} + +impl DataFeederStream { + /// Creates a new instance of the DataFeederStream with initialized Binance feeder. + pub(crate) async fn new(binance_symbols: Vec) -> Result { + let binance = BinanceConnection::new(binance_symbols).await?; + Ok(Self { binance }) + } +} + +/// Implementing the Stream trait for DataFeederStream to wrap the individual feeders. +impl Stream for DataFeederStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.binance.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(ticker))) => Poll::Ready(Some(Ok(DataFeeds::Binance(ticker)))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFeederError::Binance(e)))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + + // Add other data-feeds here. + } +} diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs index 9903fad..284e215 100644 --- a/oracle/src/oracle.rs +++ b/oracle/src/oracle.rs @@ -1,4 +1,12 @@ -use futures::FutureExt; +use crate::{ + exex::ExEx, + network::{proto::data::SignedTicker, OracleNetwork}, + offchain_data::{DataFeederStream, DataFeeds}, +}; +use alloy_rlp::{BytesMut, Encodable}; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; +use futures::{FutureExt, StreamExt}; use reth_node_api::FullNodeComponents; use reth_tracing::tracing::{error, info}; use std::{ @@ -7,21 +15,30 @@ use std::{ task::{Context, Poll}, }; -use crate::{exex::ExEx, network::Network}; - /// The Oracle struct is a long running task that orchestrates discovery of new peers, /// decoding data from chain events (ExEx) and gossiping it to peers. pub(crate) struct Oracle { /// The network task for this node. /// It is composed by a discovery task and a sub protocol RLPx task. - network: Network, + network: OracleNetwork, /// The execution extension task for this node. exex: ExEx, + /// The offchain data feed stream. + data_feed: DataFeederStream, + /// The signer to sign the data feed. + signer: PrivateKeySigner, + /// Half of the broadcast channel to send data to connected peers. + to_peers: tokio::sync::broadcast::Sender, } impl Oracle { - pub(crate) async fn new(exex: ExEx, network: Network) -> eyre::Result { - Ok(Self { exex, network }) + pub(crate) fn new( + exex: ExEx, + network: OracleNetwork, + data_feed: DataFeederStream, + to_peers: tokio::sync::broadcast::Sender, + ) -> Self { + Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers } } } @@ -47,6 +64,28 @@ impl Future for Oracle { } } + // Poll the data feed future until it's drained + while let Poll::Ready(item) = this.data_feed.poll_next_unpin(cx) { + match item { + Some(Ok(ticker_data)) => { + let DataFeeds::Binance(ticker) = ticker_data; + let mut buffer = BytesMut::new(); + ticker.encode(&mut buffer); + let signature = this.signer.sign_message_sync(&buffer)?; + let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address()); + + if let Err(err) = this.to_peers.send(signed_ticker.clone()) { + error!(?err, "Failed to send ticker to gossip, no peers connected"); + } + } + Some(Err(e)) => { + error!(?e, "Data feed task encountered an error"); + return Poll::Ready(Err(e.into())); + } + None => break, + } + } + // Poll the exex future until its drained loop { match this.exex.poll_unpin(cx)? {