Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exex oracle first iteration #22

Merged
merged 14 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@clippy
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true
Expand Down
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ reth-execution-types = { git = "https://github.com/paradigmxyz/reth" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] }
reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" }
reth-evm = { git = "https://github.com/paradigmxyz/reth" }
reth-network = { git = "https://github.com/paradigmxyz/reth" }
reth-network = { git = "https://github.com/paradigmxyz/reth", features = ["test-utils"] }
reth-network-api = { git = "https://github.com/paradigmxyz/reth" }
reth-network-peers = { git = "https://github.com/paradigmxyz/reth" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth" }
Expand All @@ -43,13 +43,20 @@ reth-tracing = { git = "https://github.com/paradigmxyz/reth" }

# alloy
alloy-sol-types = { version = "0.8", features = ["json"] }
alloy-signer-local = "0.3"
alloy-signer = "0.3"
alloy-rlp = "0.3"

# async
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"

# serde
serde = "1"
serde_json = "1"

# misc
eyre = "0.6"

Expand Down
14 changes: 14 additions & 0 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ reth-rpc-types.workspace = true
reth-tracing.workspace = true
reth.workspace = true

alloy-signer-local.workspace = true
alloy-signer.workspace = true
alloy-rlp.workspace = true

# networking
discv5 = "0.7"
enr = "0.12"
Expand All @@ -30,7 +34,17 @@ enr = "0.12"
futures.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"


thiserror = "1"

# misc
clap = "4"
eyre.workspace = true
serde.workspace = true
serde_json.workspace = true
uuid = "1.10.0"

reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }
34 changes: 34 additions & 0 deletions oracle/src/cli_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;
pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdc,ethusdc";

#[derive(Debug, Clone, Args)]
pub(crate) struct OracleExt {
Expand All @@ -12,4 +13,37 @@ pub(crate) struct OracleExt {
/// UDP port used for discovery
#[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,

/// The list of symbols to configure the stream of prices from binance.
#[clap(long = "data.symbols", use_value_delimiter = true, default_value = DEFAULT_BINANCE_SYMBOLS)]
pub binance_symbols: Vec<String>,
}

#[cfg(test)]
mod tests {
use super::*;
use clap::{Args, Parser};

/// A helper type to parse Args more easily
#[derive(Parser)]
struct CommandParser<T: Args> {
#[clap(flatten)]
args: T,
}

#[test]
fn test_oracle_ext() {
let cli = CommandParser::<OracleExt>::parse_from([
"test",
"--disc.tcp-port",
"30304",
"--disc.udp-port",
"30303",
"--data.symbols",
"btcusdt,ethusdt",
]);
assert_eq!(cli.args.tcp_port, 30304);
assert_eq!(cli.args.udp_port, 30303);
assert_eq!(cli.args.binance_symbols, vec!["btcusdt", "ethusdt"]);
}
}
23 changes: 19 additions & 4 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use network::{proto::OracleProtoHandler, Network};
use network::{proto::OracleProtoHandler, OracleNetwork};
use offchain_data::DataFeederStream;
use oracle::Oracle;
use reth::args::utils::DefaultChainSpecParser;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
Expand All @@ -10,6 +11,7 @@ use reth_node_ethereum::EthereumNode;
mod cli_ext;
mod exex;
mod network;
mod offchain_data;
mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";
Expand All @@ -22,12 +24,25 @@ fn main() -> eyre::Result<()> {
let handle = builder
.node(EthereumNode::default())
.install_exex(ORACLE_EXEX_ID, move |ctx| async move {
let subproto = OracleProtoHandler::new();
// define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
// add it to the network as a subprotocol
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

// the instance of the execution extension that will handle chain events
let exex = ExEx::new(ctx);
let network = Network::new(tcp_port, udp_port).await?;
let oracle = Oracle::new(exex, network).await?;

// the instance of the oracle network that will handle discovery and
// gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
// the off-chain data feed stream
let data_feed = DataFeederStream::new(args.binance_symbols).await?;

// the oracle instance that will orchestrate the network, the execution extensions,
// the offchain data stream and the gossiping
// the oracle will always sign and broadcast data via the channel until a peer is
// subcribed to it
let oracle = Oracle::new(exex, network, data_feed, to_peers);
Ok(oracle)
})
.launch()
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/network/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ impl Future for Discovery {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => match evt {
Event::Discovered(enr) => {
info!(?enr, "Discovered a new peer.");
info!(?enr, "Discovered a new node.");
this.add_node(enr)?;
}
Event::SessionEstablished(enr, socket_addr) => {
info!(?enr, ?socket_addr, "Session established with a new peer.");
info!(?enr, ?socket_addr, "Session established with a new node.");
}
evt => {
info!(?evt, "New discovery event.");
Expand Down
38 changes: 29 additions & 9 deletions oracle/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use discovery::Discovery;
use futures::FutureExt;
use futures::{ready, FutureExt};
use proto::ProtocolEvent;
use reth_tracing::tracing::{error, info};
use std::{
future::Future,
Expand All @@ -11,23 +12,29 @@ use std::{
mod discovery;
pub(crate) mod proto;

/// The Network struct is a long running task that orchestrates discovery of new peers and network
/// gossiping via the RLPx subprotocol.
pub(crate) struct Network {
/// The OracleNetwork struct is a long running task that orchestrates discovery of new peers and
/// network gossiping via the RLPx subprotocol.
pub(crate) struct OracleNetwork {
/// The discovery task for this node.
discovery: Discovery,
/// The protocol events channel.
proto_events: proto::ProtoEvents,
}

impl Network {
pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result<Self> {
impl OracleNetwork {
pub(crate) async fn new(
proto_events: proto::ProtoEvents,
tcp_port: u16,
udp_port: u16,
) -> eyre::Result<Self> {
let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port);
let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port);
let discovery = Discovery::new(disc_addr, rlpx_addr).await?;
Ok(Self { discovery })
Ok(Self { discovery, proto_events })
}
}

impl Future for Network {
impl Future for OracleNetwork {
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -42,7 +49,20 @@ impl Future for Network {
error!(?e, "Discovery task encountered an error");
return Poll::Ready(Err(e));
}
Poll::Pending => {}
Poll::Pending => break,
}
}
loop {
match ready!(this.proto_events.poll_recv(cx)) {
Some(ProtocolEvent::Established { direction, peer_id, to_connection }) => {
info!(
?direction,
?peer_id,
?to_connection,
"Established connection, will start gossiping"
);
}
None => return Poll::Ready(Ok(())),
}
}
}
Expand Down
Loading
Loading