Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

submit new l2 blocks using RPC #28

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,77 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use anyhow::{anyhow as any_err, Error, Ok};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Receiver<String>,
node_rx: Option<Receiver<String>>,
avs_p2p_tx: Sender<String>,
gas_used: u64,
}

impl Node {
pub fn new(node_rx: Receiver<String>, avs_p2p_tx: Sender<String>) -> Self {
let taiko = Taiko::new("http://127.0.0.1:1234");
let taiko = Taiko::new("http://127.0.0.1:1234", "http://127.0.0.1:1235");
Self {
taiko,
node_rx,
node_rx: Some(node_rx),
avs_p2p_tx,
gas_used: 0,
}
}

/// Consumes the Node and starts two loops:
/// one for handling incoming messages and one for the block preconfirmation
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.preconfirmation_loop().await;
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
if let Err(err) = self.step().await {
tracing::error!("Node processing step failed: {}", err);
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
},
}
}
}

async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
async fn preconfirmation_loop(&self) {
loop {
let start_time = tokio::time::Instant::now();
if let Err(err) = self.main_block_preconfirmation_step().await {
tracing::error!("Failed to execute main block preconfirmation step: {}", err);
}
let elapsed = start_time.elapsed();
let sleep_duration = std::time::Duration::from_secs(4).saturating_sub(elapsed);
tokio::time::sleep(sleep_duration).await;
}
Ok(())
}

async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
self.taiko
let pending_tx_lists = self
.taiko
.get_pending_l2_tx_lists()
.await
.context("Failed to get pending l2 tx lists")?;
.map_err(Error::from)?;
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p().await?;
self.taiko.submit_new_l2_blocks();
Ok(())
}

async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
self.taiko
.advance_head_to_new_l2_block(pending_tx_lists, self.gas_used)
.await?;
Ok(())
}

Expand All @@ -62,6 +83,6 @@ impl Node {
self.avs_p2p_tx
.send("Hello from node!".to_string())
.await
.map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e))
.map_err(|e| any_err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
2 changes: 1 addition & 1 deletion Node/src/p2p_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl AVSp2p {
.send("Hello from avs p2p!".to_string())
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
});

Expand Down
103 changes: 85 additions & 18 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@ use anyhow::Error;
use serde_json::Value;

pub struct Taiko {
rpc_client: RpcClient,
rpc_proposer: RpcClient,
rpc_driver: RpcClient,
}

impl Taiko {
pub fn new(url: &str) -> Self {
pub fn new(proposer_url: &str, driver_url: &str) -> Self {
Self {
rpc_client: RpcClient::new(url),
rpc_proposer: RpcClient::new(proposer_url),
rpc_driver: RpcClient::new(driver_url),
}
}

pub async fn get_pending_l2_tx_lists(&self) -> Result<Value, Error> {
tracing::debug!("Getting L2 tx lists");
self.rpc_client
self.rpc_proposer
.call_method("RPC.GetL2TxLists", vec![])
.await
}

pub fn submit_new_l2_blocks(&self) {
pub async fn advance_head_to_new_l2_block(
&self,
tx_lists: Value,
gas_used: u64,
) -> Result<Value, Error> {
tracing::debug!("Submitting new L2 blocks");
let payload = serde_json::json!({
"TxLists": tx_lists["TxLists"],
"gasUsed": gas_used,
});
self.rpc_driver
.call_method("RPC.AdvanceL2ChainHeadWithNewBlocks", vec![payload])
.await
}
}

Expand All @@ -35,22 +48,76 @@ mod test {
async fn test_get_pending_l2_tx_lists() {
tracing_subscriber::fmt::init();

let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3030).await;
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["TxLists"][0][0]["type"], "0x0");
assert_eq!(
json["TxLists"][0][0]["hash"],
"0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
);
assert_eq!(json["TxLists"][0][1]["type"], "0x2");
assert_eq!(
json["TxLists"][0][1]["hash"],
"0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b"
);
assert_eq!(json["TxLists"][0][2]["type"], "0x2");
assert_eq!(
json["TxLists"][0][2]["hash"],
"0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193"
);
rpc_server.stop().await;
}

#[tokio::test]
async fn test_advance_head_to_new_l2_block() {
let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3040).await;
let value = serde_json::json!({
"TxLists": [
[
{
"type": "0x0",
"chainId": "0x28c61",
"nonce": "0x1",
"to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb",
"gas": "0x267ac",
"gasPrice": "0x5e76e0800",
"maxPriorityFeePerGas": null,
"maxFeePerGas": null,
"value": "0x0",
"input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913",
"v": "0x518e6",
"r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35",
"s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055",
"hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
}
]
]
});

let response = taiko
.advance_head_to_new_l2_block(value, 1234)
.await
.unwrap();
assert_eq!(
response["result"],
"Request received and processed successfully"
);
rpc_server.stop().await;
}

async fn setup_rpc_server_and_taiko(port: u16) -> (RpcServer, Taiko) {
// Start the RPC server
let mut rpc_server = RpcServer::new();
let addr: SocketAddr = "127.0.0.1:3030".parse().unwrap();
let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
rpc_server.start_test_responses(addr).await.unwrap();

let taiko = Taiko::new("http://127.0.0.1:3030");
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["result"]["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["result"]["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["result"]["TxLists"][0][0]["type"], "0x0");
assert_eq!(json["result"]["TxLists"][0][0]["hash"], "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd");
assert_eq!(json["result"]["TxLists"][0][1]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][1]["hash"], "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b");
assert_eq!(json["result"]["TxLists"][0][2]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][2]["hash"], "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193");
rpc_server.stop().await;
let taiko = Taiko::new(
&format!("http://127.0.0.1:{}", port),
&format!("http://127.0.0.1:{}", port),
);
(rpc_server, taiko)
}
}
Loading