Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exex oracle first iteration #22

Merged
merged 14 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ reth-rpc-types.workspace = true
reth-tracing.workspace = true
reth.workspace = true

alloy-signer-local = "0.3"
alloy-signer = "0.3"
alloy-rlp = "0.3"
loocapro marked this conversation as resolved.
Show resolved Hide resolved

# networking
discv5 = "0.7"
enr = "0.12"
Expand All @@ -30,7 +34,13 @@ enr = "0.12"
futures.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"


thiserror = "1"

# misc
clap = "4"
eyre.workspace = true
serde = "1"
serde_json = "1"
loocapro marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions oracle/src/cli_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;
pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdc,ethusdc";

#[derive(Debug, Clone, Args)]
pub(crate) struct OracleExt {
Expand All @@ -12,4 +13,37 @@ pub(crate) struct OracleExt {
/// UDP port used for discovery
#[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,

/// The list of symbols to configure the stream of prices from binance.
#[clap(long = "data.symbols", use_value_delimiter = true, default_value = DEFAULT_BINANCE_SYMBOLS)]
pub binance_symbols: Vec<String>,
}

#[cfg(test)]
mod tests {
use super::*;
use clap::{Args, Parser};

/// A helper type to parse Args more easily
#[derive(Parser)]
struct CommandParser<T: Args> {
#[clap(flatten)]
args: T,
}

#[test]
fn test_oracle_ext() {
let cli = CommandParser::<OracleExt>::parse_from([
"test",
"--disc.tcp-port",
"30304",
"--disc.udp-port",
"30303",
"--data.symbols",
"btcusdt,ethusdt",
]);
assert_eq!(cli.args.tcp_port, 30304);
assert_eq!(cli.args.udp_port, 30303);
assert_eq!(cli.args.binance_symbols, vec!["btcusdt", "ethusdt"]);
}
}
9 changes: 6 additions & 3 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use network::{proto::OracleProtoHandler, Network};
use offchain_data::DataFeederStream;
use oracle::Oracle;
use reth::args::utils::DefaultChainSpecParser;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
Expand All @@ -10,6 +11,7 @@ use reth_node_ethereum::EthereumNode;
mod cli_ext;
mod exex;
mod network;
mod offchain_data;
mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";
Expand All @@ -22,12 +24,13 @@ fn main() -> eyre::Result<()> {
let handle = builder
.node(EthereumNode::default())
.install_exex(ORACLE_EXEX_ID, move |ctx| async move {
let subproto = OracleProtoHandler::new();
let (subproto, proto_events) = OracleProtoHandler::new();
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

let exex = ExEx::new(ctx);
let network = Network::new(tcp_port, udp_port).await?;
let oracle = Oracle::new(exex, network).await?;
let (network, to_gossip) = Network::new(proto_events, tcp_port, udp_port).await?;
let data_feed = DataFeederStream::new(args.binance_symbols).await?;
let oracle = Oracle::new(exex, network, data_feed, to_gossip);
loocapro marked this conversation as resolved.
Show resolved Hide resolved
Ok(oracle)
})
.launch()
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/network/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ impl Future for Discovery {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => match evt {
Event::Discovered(enr) => {
info!(?enr, "Discovered a new peer.");
info!(?enr, "Discovered a new node.");
this.add_node(enr)?;
}
Event::SessionEstablished(enr, socket_addr) => {
info!(?enr, ?socket_addr, "Session established with a new peer.");
info!(?enr, ?socket_addr, "Session established with a new node.");
}
evt => {
info!(?evt, "New discovery event.");
Expand Down
59 changes: 59 additions & 0 deletions oracle/src/network/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use reth_tracing::tracing::error;
use tokio::sync::{
broadcast::{error::RecvError, Sender},
mpsc::UnboundedSender,
};

use super::proto::{connection::OracleCommand, data::SignedTicker};

/// The size of the broadcast channel.
///
/// This value is based on the estimated message rate and the tolerance for lag.
/// - We assume an average of 10-20 updates per second per symbol.
/// - For 2 symbols (e.g., ETHUSDC and BTCUSDC), this gives approximately 20-40 messages per second.
/// - To allow subscribers to catch up if they fall behind, we provide a lag tolerance of 5 seconds.
///
/// Thus, the buffer size is calculated as:
///
/// `Buffer Size = Message Rate per Second * Lag Tolerance`
///
/// For 2 symbols, we calculate: `40 * 5 = 200`.
const BROADCAST_CHANNEL_SIZE: usize = 200;

pub(crate) struct Gossip {
pub(crate) inner: Sender<SignedTicker>,
}

impl Gossip {
/// Creates a new Broadcast instance.
pub(crate) fn new() -> (Self, tokio::sync::broadcast::Sender<SignedTicker>) {
let (sender, _receiver) = tokio::sync::broadcast::channel(BROADCAST_CHANNEL_SIZE);
(Self { inner: sender.clone() }, sender)
}

/// Starts to gossip data to the connected peers from the broadcast channel.
pub(crate) fn start(
&self,
to_connection: UnboundedSender<OracleCommand>,
) -> Result<(), RecvError> {
let mut receiver = self.inner.subscribe();

tokio::task::spawn(async move {
loop {
match receiver.recv().await {
Ok(signed_data) => {
if let Err(e) = to_connection.send(OracleCommand::Tick(signed_data)) {
error!(?e, "Failed to broadcast message to peer");
}
loocapro marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
error!(?e, "Data feed task encountered an error");
return Err::<(), RecvError>(e);
}
}
}
});

Ok(())
}
}
40 changes: 38 additions & 2 deletions oracle/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
use discovery::Discovery;
use futures::FutureExt;
use gossip::Gossip;
use proto::{data::SignedTicker, ProtocolEvent};
use reth_tracing::tracing::{error, info};
use std::{
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::broadcast::Sender;

mod discovery;
mod gossip;
pub(crate) mod proto;

/// The Network struct is a long running task that orchestrates discovery of new peers and network
/// gossiping via the RLPx subprotocol.
pub(crate) struct Network {
loocapro marked this conversation as resolved.
Show resolved Hide resolved
/// The discovery task for this node.
discovery: Discovery,
/// The protocol events channel.
proto_events: proto::ProtoEvents,
/// Helper struct to manage gossiping data to connected peers.
gossip: Gossip,
}

impl Network {
pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result<Self> {
pub(crate) async fn new(
proto_events: proto::ProtoEvents,
tcp_port: u16,
udp_port: u16,
) -> eyre::Result<(Self, Sender<SignedTicker>)> {
let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port);
let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port);
let discovery = Discovery::new(disc_addr, rlpx_addr).await?;
Ok(Self { discovery })
let (gossip, to_gossip) = Gossip::new();
Ok((Self { discovery, proto_events, gossip }, to_gossip))
}
}

Expand All @@ -42,6 +55,29 @@ impl Future for Network {
error!(?e, "Discovery task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => break,
}
}

loop {
match this.proto_events.poll_recv(cx) {
loocapro marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Some(ProtocolEvent::Established {
direction,
peer_id,
to_connection,
})) => {
info!(
?direction,
?peer_id,
?to_connection,
"Established connection, will start gossiping"
);
this.gossip.start(to_connection)?;
}
Poll::Ready(None) => {
return Poll::Ready(Ok(()));
}

Poll::Pending => {}
loocapro marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
46 changes: 23 additions & 23 deletions oracle/src/network/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState};
use super::{
data::SignedTicker, OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState,
};
use alloy_rlp::Encodable;
use futures::{Stream, StreamExt};
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_primitives::BytesMut;
use reth_primitives::{Address, BytesMut};
use reth_rpc_types::PeerId;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

/// The commands supported by the OracleConnection.
pub(crate) enum OracleCommand {
/// Sends a message to the peer
loocapro marked this conversation as resolved.
Show resolved Hide resolved
Message {
msg: String,
/// The response will be sent to this channel.
response: oneshot::Sender<String>,
},
/// Sends a signed tick to a peer
Tick(SignedTicker),
}

/// This struct defines the connection object for the Oracle subprotocol.
pub(crate) struct OracleConnection {
conn: ProtocolConnection,
commands: UnboundedReceiverStream<OracleCommand>,
pending_pong: Option<oneshot::Sender<String>>,
initial_ping: Option<OracleProtoMessage>,
attestations: Vec<Address>,
}

impl Stream for OracleConnection {
Expand All @@ -45,9 +44,8 @@ impl Stream for OracleConnection {
loop {
if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) {
return match cmd {
OracleCommand::Message { msg, response } => {
this.pending_pong = Some(response);
Poll::Ready(Some(OracleProtoMessage::ping_message(msg).encoded()))
OracleCommand::Tick(tick) => {
Poll::Ready(Some(OracleProtoMessage::signed_ticker(tick).encoded()))
}
};
}
Expand All @@ -63,18 +61,20 @@ impl Stream for OracleConnection {
return Poll::Ready(Some(OracleProtoMessage::pong().encoded()))
}
OracleProtoMessageKind::Pong => {}
OracleProtoMessageKind::PingMessage(msg) => {
return Poll::Ready(Some(OracleProtoMessage::pong_message(msg).encoded()))
}
OracleProtoMessageKind::PongMessage(msg) => {
if let Some(sender) = this.pending_pong.take() {
sender.send(msg).ok();
OracleProtoMessageKind::SignedTicker(signed_data) => {
let signer = signed_data.signer;
let sig = signed_data.signature;

let mut buffer = BytesMut::new();
signed_data.ticker.encode(&mut buffer);

let addr = sig.recover_address_from_msg(buffer).ok().unwrap();

if addr == signer && !this.attestations.contains(&addr) {
this.attestations.push(addr);
}
continue;
}
}

return Poll::Pending;
}
}
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl ConnectionHandler for OracleConnHandler {
conn,
initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping),
commands: UnboundedReceiverStream::new(rx),
pending_pong: None,
attestations: Vec::new(),
}
}
}
17 changes: 17 additions & 0 deletions oracle/src/network/proto/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::offchain_data::binance::ticker::Ticker;
use alloy_rlp::{RlpDecodable, RlpEncodable};
use alloy_signer::Signature;
use reth_primitives::Address;

#[derive(Clone, Debug, RlpEncodable, RlpDecodable, PartialEq)]
pub struct SignedTicker {
pub(crate) ticker: Ticker,
pub(crate) signature: Signature,
pub(crate) signer: Address,
}

impl SignedTicker {
pub fn new(ticker: Ticker, signature: Signature, signer: Address) -> Self {
Self { ticker, signature, signer }
}
}
Loading
Loading