Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exex oracle first iteration #22

Merged
merged 14 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ enr = "0.12"
futures.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"


thiserror = "1"

# misc
clap = "4"
eyre.workspace = true
serde = "1"
serde_json = "1"
loocapro marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 34 additions & 0 deletions oracle/src/cli_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;
pub const DEFAULT_BINANCE_SYMBOLS: &str = "btcusdt,ethusdt";

#[derive(Debug, Clone, Args)]
pub(crate) struct OracleExt {
Expand All @@ -12,4 +13,37 @@ pub(crate) struct OracleExt {
/// UDP port used for discovery
#[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,

/// The list of symbols to configure the stream of prices from binance.
#[clap(long = "data.symbols", use_value_delimiter = true, default_value = DEFAULT_BINANCE_SYMBOLS)]
pub binance_symbols: Vec<String>,
}

#[cfg(test)]
mod tests {
use super::*;
use clap::{Args, Parser};

/// A helper type to parse Args more easily
#[derive(Parser)]
struct CommandParser<T: Args> {
#[clap(flatten)]
args: T,
}

#[test]
fn test_oracle_ext() {
let cli = CommandParser::<OracleExt>::parse_from(&[
"test",
"--disc.tcp-port",
"30304",
"--disc.udp-port",
"30303",
"--data.symbols",
"btcusdt,ethusdt",
]);
assert_eq!(cli.args.tcp_port, 30304);
assert_eq!(cli.args.udp_port, 30303);
assert_eq!(cli.args.binance_symbols, vec!["btcusdt", "ethusdt"]);
}
}
5 changes: 4 additions & 1 deletion oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use network::{proto::OracleProtoHandler, Network};
use offchain_data::DataFeederStream;
use oracle::Oracle;
use reth::args::utils::DefaultChainSpecParser;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
Expand All @@ -10,6 +11,7 @@ use reth_node_ethereum::EthereumNode;
mod cli_ext;
mod exex;
mod network;
mod offchain_data;
mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";
Expand All @@ -27,7 +29,8 @@ fn main() -> eyre::Result<()> {

let exex = ExEx::new(ctx);
let network = Network::new(tcp_port, udp_port).await?;
let oracle = Oracle::new(exex, network).await?;
let data_feed = DataFeederStream::new(args.binance_symbols).await?;
let oracle = Oracle::new(exex, network, data_feed);
Ok(oracle)
})
.launch()
Expand Down
133 changes: 133 additions & 0 deletions oracle/src/offchain_data/binance/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use futures::{Stream, StreamExt};
use reth_tracing::tracing::error;
use std::{
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use ticker::{BinanceResponse, Ticker};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

macro_rules! block_on {
($expr:expr) => {
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on($expr))
};
}

pub(crate) mod ticker;

#[derive(Error, Debug)]
pub(crate) enum BinanceDataFeederError {
/// Error connecting to the WebSocket.
#[error("error connecting to WebSocket")]
Connection(#[from] tokio_tungstenite::tungstenite::Error),
/// Error decoding the message.
#[error("error decoding message")]
Decode(#[from] serde_json::Error),
/// Error reconnecting after max retries.
#[error("reached max number of reconnection attempts")]
MaxRetriesExceeded,
}

/// This structure controls the interaction with the Binance WebSocket API.
pub(crate) struct BinanceDataFeeder {
/// The WebSocket stream.
inner: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
url: String,
}

impl BinanceDataFeeder {
/// Creates a new BinanceDataFeeder instance.
pub(crate) async fn new(symbols: Vec<String>) -> Result<Self, BinanceDataFeederError> {
let query = symbols
.iter()
.map(|symbol| format!("{}@ticker", symbol))
.collect::<Vec<String>>()
.join("/");

let url = format!("wss://stream.binance.com/stream?streams={}", query);
let client = Self::connect_with_retries(url.to_string(), 10).await?;

Ok(Self { inner: Some(client), url })
}

/// Function to connect with retries and an optional delay between retries
async fn connect_with_retries(
url: String,
max_retries: usize,
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, BinanceDataFeederError> {
let mut attempts = 0;

while attempts < max_retries {
let conn = connect_async(url.clone()).await;

match conn {
Ok((connection, _)) => return Ok(connection),
Err(e) => {
error!(?e, attempts, max_retries, "Connection attempt failed, retrying...");
attempts += 1;
if attempts < max_retries {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}

Err(BinanceDataFeederError::MaxRetriesExceeded)
}
}

/// We implement the Stream trait for the BinanceDataFeeder struct
/// in order to encode the messages received from the WebSocket into our Ticker struct.
impl Stream for BinanceDataFeeder {
type Item = Result<Ticker, BinanceDataFeederError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

let mut connection = this.inner.take().expect("inner WebSocket client is missing");

match connection.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
let msg = msg.into_text()?;
let resp: BinanceResponse = serde_json::from_str(&msg)?;
this.inner = Some(connection);
Poll::Ready(Some(Ok(resp.data)))
}
Poll::Ready(Some(Err(e))) => {
// handle ws disconnections
error!(?e, "Binance ws disconnected, reconnecting");
match block_on!(Self::connect_with_retries(this.url.clone(), 10)) {
loocapro marked this conversation as resolved.
Show resolved Hide resolved
Ok(conn) => {
this.inner = Some(conn);
Poll::Pending
}
Err(reconnect_error) => {
error!(?reconnect_error, "Failed to reconnect after max retries");
Poll::Ready(Some(Err(reconnect_error)))
}
}
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => {
this.inner = Some(connection);
Poll::Pending
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;

#[tokio::test]
async fn can_connect() {
let symbols = vec!["btcusdt".to_string(), "ethusdt".to_string()];
let mut feeder = BinanceDataFeeder::new(symbols).await.unwrap();
let msg = feeder.next().await.unwrap().unwrap();
assert!(msg.symbol == "BTCUSDT" || msg.symbol == "ETHUSDT");
}
}
84 changes: 84 additions & 0 deletions oracle/src/offchain_data/binance/ticker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use serde::{Deserialize, Serialize};

/// Struct representing the full JSON response from Binance
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct BinanceResponse {
/// Stream name (e.g., "ethusdt@ticker")
stream: String,
/// The ticker data nested inside the `data` field
pub data: Ticker,
}

/// Binance ticker data
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct Ticker {
/// Event type (e.g., "24hrTicker")
#[serde(rename = "e")]
event_type: String,
/// Event time (timestamp)
#[serde(rename = "E")]
event_time: u64,
/// Trading pair symbol
#[serde(rename = "s")]
pub(crate) symbol: String,
/// Price change over the last 24 hours
#[serde(rename = "p")]
price_change: String,
/// Price change percent
#[serde(rename = "P")]
price_change_percent: String,
/// Weighted average price
#[serde(rename = "w")]
weighted_avg_price: String,
/// Previous day's close price
#[serde(rename = "x")]
prev_close_price: String,
/// Current price (last trade price)
#[serde(rename = "c")]
last_price: String,
/// Last quantity traded
#[serde(rename = "Q")]
last_quantity: String,
/// Best bid price
#[serde(rename = "b")]
best_bid_price: String,
/// Best bid quantity
#[serde(rename = "B")]
best_bid_quantity: String,
/// Best ask price
#[serde(rename = "a")]
best_ask_price: String,
/// Best ask quantity
#[serde(rename = "A")]
best_ask_quantity: String,
/// Open price for the 24-hour period
#[serde(rename = "o")]
open_price: String,
/// High price of the 24-hour period
#[serde(rename = "h")]
high_price: String,
/// Low price of the 24-hour period
#[serde(rename = "l")]
low_price: String,
/// Total traded volume of the base asset
#[serde(rename = "v")]
volume: String,
/// Total traded volume of the quote asset
#[serde(rename = "q")]
quote_volume: String,
/// Open time (timestamp)
#[serde(rename = "O")]
open_time: u64,
/// Close time (timestamp)
#[serde(rename = "C")]
close_time: u64,
/// First trade ID
#[serde(rename = "F")]
first_trade_id: u64,
/// Last trade ID
#[serde(rename = "L")]
last_trade_id: u64,
/// Total number of trades
#[serde(rename = "n")]
num_trades: u64,
}
53 changes: 53 additions & 0 deletions oracle/src/offchain_data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use binance::{ticker::Ticker, BinanceDataFeeder, BinanceDataFeederError};
use futures::{stream::Stream, StreamExt};
use std::{
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;

pub(crate) mod binance;

/// The enum that represents the various types of data feeds, e.g., Binance.
pub(crate) enum DataFeeds {
Binance(Ticker),
}

/// The error enum that wraps errors from all data feeders.
#[derive(Error, Debug)]
pub(crate) enum DataFeederError {
#[error(transparent)]
Binance(#[from] BinanceDataFeederError),
}

/// The struct that implements the Stream trait for polling multiple data feeds.
pub(crate) struct DataFeederStream {
binance: BinanceDataFeeder,
// Add other feeder fields if needed.
}

impl DataFeederStream {
/// Creates a new instance of the DataFeederStream with initialized Binance feeder.
pub(crate) async fn new(binance_symbols: Vec<String>) -> Result<Self, DataFeederError> {
let binance = BinanceDataFeeder::new(binance_symbols).await?;
Ok(Self { binance })
}
}

/// Implementing the Stream trait for DataFeederStream to wrap the individual feeders.
impl Stream for DataFeederStream {
type Item = Result<DataFeeds, DataFeederError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

match this.binance.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(ticker))) => Poll::Ready(Some(Ok(DataFeeds::Binance(ticker)))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(DataFeederError::Binance(e)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}

// Add other data-feeds here.
}
}
Loading
Loading