From 5b02e51c24ff65c6dd8c4a8b2269eb1b442988a9 Mon Sep 17 00:00:00 2001 From: Mahmoud Date: Mon, 13 Feb 2023 17:48:50 -0500 Subject: [PATCH] feat: narwhal (#369) * narwhal init --- Cargo.toml | 3 +- README.md | 2 +- crates/ursa-consensus/Cargo.toml | 38 +++++ crates/ursa-consensus/README.md | 6 + crates/ursa-consensus/src/config.rs | 73 ++++++++ crates/ursa-consensus/src/execution.rs | 50 ++++++ crates/ursa-consensus/src/lib.rs | 225 +++++++++++++++++++++++++ crates/ursa-consensus/src/main.rs | 45 +++++ crates/ursa-consensus/src/validator.rs | 26 +++ 9 files changed, 466 insertions(+), 2 deletions(-) create mode 100644 crates/ursa-consensus/Cargo.toml create mode 100644 crates/ursa-consensus/README.md create mode 100644 crates/ursa-consensus/src/config.rs create mode 100644 crates/ursa-consensus/src/execution.rs create mode 100644 crates/ursa-consensus/src/lib.rs create mode 100644 crates/ursa-consensus/src/main.rs create mode 100644 crates/ursa-consensus/src/validator.rs diff --git a/Cargo.toml b/Cargo.toml index 628faa3f..ed5bbeea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/ursa", + "crates/ursa-consensus", "crates/ursa-gateway", "crates/ursa-index-provider", "crates/ursa-metrics", @@ -47,7 +48,7 @@ futures-util = "0.3.25" fvm_ipld_blockstore = { git = "https://github.com/filecoin-project/ref-fvm/" } fvm_ipld_car = { git = "https://github.com/filecoin-project/ref-fvm/" } fvm_ipld_encoding = "=0.3.2" -graphsync = { git = "https://github.com/kckeiks/rs-graphsync.git", branch = "downgrade-cid" } +graphsync = { git = "https://github.com/kckeiks/rs-graphsync.git", branch = "downgrade-cid" } hyper = { version = "0.14.23", features = ["full"] } hyper-tls = "0.5.0" imara-diff = "0.1.5" diff --git a/README.md b/README.md index 171eb4f9..3c0a867e 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![rust-ci](https://img.shields.io/github/actions/workflow/status/fleek-network/ursa/rust.yml?branch=main&label=Tests&style=for-the-badge)](https://github.com/fleek-network/ursa/actions/workflows/rust.yml)  [![docker-build](https://img.shields.io/github/actions/workflow/status/fleek-network/ursa/docker-publish.yml?branch=main&label=Docker%20Build&style=for-the-badge)](https://github.com/fleek-network/ursa/pkgs/container/ursa)  -Ursa, a decentralized content delivery network. +> Ursa, a decentralized content delivery network. ## Run a node diff --git a/crates/ursa-consensus/Cargo.toml b/crates/ursa-consensus/Cargo.toml new file mode 100644 index 00000000..5426fb3d --- /dev/null +++ b/crates/ursa-consensus/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "ursa-consensus" +version = "0.1.0" +edition = "2021" +authors = ["b0xtch "] + +[dependencies] +anyhow = "1.0" +arc-swap = { version = "1.6.0", features = ["serde"] } +async-trait = "0.1" +bytes = "1.3.0" + +fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "235211dc8195590f5353d38135f5ee51a267521e" } +fastcrypto-zkp = { git = "https://github.com/MystenLabs/fastcrypto", rev = "235211dc8195590f5353d38135f5ee51a267521e", package = "fastcrypto-zkp" } +fastcrypto-tbls = { git = "https://github.com/MystenLabs/fastcrypto", rev = "235211dc8195590f5353d38135f5ee51a267521e", package = "fastcrypto-tbls" } + +futures = "0.3.23" +multiaddr = "0.17.0" +mysten-metrics = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "mysten-metrics" } +mysten-network = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "mysten-network" } + +narwhal-config = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-config" } +narwhal-consensus = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-consensus" } +narwhal-crypto = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-crypto" } +narwhal-executor = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-executor" } +narwhal-node = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-node" } +narwhal-primary = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-primary" } +narwhal-types = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-types" } +narwhal-worker = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "narwhal-worker" } +prometheus = "0.13.3" +rand = "0.8.5" +serde = { version = "1.0", features = ["derive"] } +tempfile = "3.3.0" +tokio = { version = "1.24.2", features = ["sync", "rt", "macros"] } +tokio-stream = { version = "0.1.11", features = ["net"] } +tracing = "0.1.37" + +workspace-hack = { git = "https://github.com/MystenLabs/sui.git", branch = "main", package = "workspace-hack" } diff --git a/crates/ursa-consensus/README.md b/crates/ursa-consensus/README.md new file mode 100644 index 00000000..572cc993 --- /dev/null +++ b/crates/ursa-consensus/README.md @@ -0,0 +1,6 @@ +# Narwhal and Bullshark - Mempool and Consensus + +> Narwhal and Bullshark (N/B) - Fleek Network's consensus and ordering algorithms. A DAG based consensus with total ordering and reliable broadcast. + +This code has been adapted from the [MystenLabs Sui](https://github.com/MystenLabs/sui) + diff --git a/crates/ursa-consensus/src/config.rs b/crates/ursa-consensus/src/config.rs new file mode 100644 index 00000000..e4a07398 --- /dev/null +++ b/crates/ursa-consensus/src/config.rs @@ -0,0 +1,73 @@ +// Copyright 2022-2023 Fleek Network +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::{path::PathBuf, sync::Arc}; + +use fastcrypto::{bls12381::min_sig::BLS12381KeyPair, ed25519::Ed25519KeyPair}; +use multiaddr::Multiaddr; +use mysten_metrics::RegistryService; +use narwhal_config::{Parameters, WorkerId}; +use narwhal_crypto::NetworkKeyPair as NarwhalNetworkKeyPair; +use serde::{Deserialize, Serialize}; +use tokio::sync::OnceCell; + +pub type KeyPair = Ed25519KeyPair; +pub type AuthorityKeyPair = BLS12381KeyPair; + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct ValidatorKeyPair { + #[serde(skip)] + keypair: OnceCell>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct NetworkKeyPair { + #[serde(skip)] + keypair: OnceCell>, +} + +impl ValidatorKeyPair { + pub fn new(keypair: AuthorityKeyPair) -> Self { + let cell = OnceCell::new(); + cell.set(Arc::new(keypair)) + .expect("Failed to set authority keypair"); + Self { keypair: cell } + } + + pub fn authority_keypair(&self) -> &AuthorityKeyPair { + self.keypair.get().as_ref().unwrap() + } +} + +impl NetworkKeyPair { + pub fn new(keypair: KeyPair) -> Self { + let cell = OnceCell::new(); + cell.set(Arc::new(keypair)) + .expect("Failed to set authority keypair"); + Self { keypair: cell } + } + + pub fn keypair(&self) -> &KeyPair { + self.keypair.get().as_ref().unwrap() + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct NodeConfig { + pub keypair: ValidatorKeyPair, + pub worker_keypair: NetworkKeyPair, + pub account_keypair: NetworkKeyPair, + pub network_keypair: NetworkKeyPair, + pub network_address: Multiaddr, + pub db_path: PathBuf, +} + +pub struct NarwhalConfig { + pub keypair: ValidatorKeyPair, + pub network_keypair: NetworkKeyPair, + pub registry_service: RegistryService, + pub ids_and_keypairs: Vec<(WorkerId, NarwhalNetworkKeyPair)>, + pub internal_consensus: bool, + pub parameters: Parameters, + pub storage_base_path: PathBuf, +} diff --git a/crates/ursa-consensus/src/execution.rs b/crates/ursa-consensus/src/execution.rs new file mode 100644 index 00000000..eaae30bf --- /dev/null +++ b/crates/ursa-consensus/src/execution.rs @@ -0,0 +1,50 @@ +// Copyright 2022-2023 Fleek Network +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_trait::async_trait; +use bytes::Bytes; +use narwhal_executor::ExecutionState; +use narwhal_types::ConsensusOutput; +use tokio::sync::mpsc::Sender; +use tracing::error; + +type Epoch = u64; + +pub struct Execution { + /// a store for epoch + _store: N, + /// current epoch store implementation + epoch: Epoch, + /// managing certificates generated by narwhal + transactions: Sender>, +} + +impl Execution { + pub fn new(store: N, epoch: Epoch, transactions: Sender>) -> Self { + Self { + _store: store, + epoch, + transactions, + } + } +} + +#[async_trait] +impl ExecutionState for Execution { + async fn handle_consensus_output(&self, consensus_output: ConsensusOutput) { + for (_, batches) in consensus_output.batches { + for batch in batches { + for transaction in batch.transactions.into_iter() { + assert_eq!(transaction, Bytes::from(self.epoch.to_be_bytes().to_vec())); + if let Err(err) = self.transactions.send(transaction).await { + error!("Failed to send txn: {}", err); + } + } + } + } + } + + async fn last_executed_sub_dag_index(&self) -> u64 { + 0 + } +} diff --git a/crates/ursa-consensus/src/lib.rs b/crates/ursa-consensus/src/lib.rs new file mode 100644 index 00000000..726e0d69 --- /dev/null +++ b/crates/ursa-consensus/src/lib.rs @@ -0,0 +1,225 @@ +// Copyright 2022-2023 Fleek Network +// SPDX-License-Identifier: Apache-2.0, MIT + +use anyhow::Result; +use std::{path::PathBuf, sync::Arc}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use config::{NarwhalConfig, NetworkKeyPair, NodeConfig, ValidatorKeyPair}; +use execution::Execution; +use fastcrypto::traits::KeyPair as PrimaryKeyPair; +use multiaddr::Multiaddr; +use mysten_metrics::RegistryService; +use narwhal_config::{ + Authority, Committee, Epoch, Parameters, SharedWorkerCache, Stake, WorkerCache, WorkerId, + WorkerIndex, WorkerInfo, +}; +use narwhal_crypto::NetworkKeyPair as NarwhalNetworkKeyPair; +use narwhal_executor::ExecutionState; +use narwhal_node::{primary_node::PrimaryNode, worker_node::WorkerNodes, NodeStorage}; +use narwhal_worker::TransactionValidator; +use prometheus::Registry; +use tokio::sync::mpsc::channel; +use validator::Validator; + +pub mod config; +pub mod execution; +pub mod validator; + +pub const URSA_VERSION: &str = env!("CARGO_PKG_VERSION"); + +pub struct ValidatorActor { + stake: Stake, + keypair: ValidatorKeyPair, + primary_address: Multiaddr, + worker_keypair: NetworkKeyPair, + network_keypair: NetworkKeyPair, +} + +#[async_trait] +pub trait Consensus { + async fn start( + self, + committee: Arc, + shared_worker_cache: SharedWorkerCache, + execution_state: Arc, + tx_validator: V, + ) -> Result<()> + where + S: ExecutionState + Send + Sync + 'static; +} + +pub struct Narwhal { + keypair: ValidatorKeyPair, + parameters: Parameters, + internal_consensus: bool, + storage_base_path: PathBuf, + // todo(botch): abstract this elsewhere + validators: Vec, + network_keypair: NetworkKeyPair, + ids_and_keypairs: Vec<(WorkerId, NarwhalNetworkKeyPair)>, + worker_nodes: narwhal_node::worker_node::WorkerNodes, + primary_node: narwhal_node::primary_node::PrimaryNode, +} + +#[async_trait] +impl Consensus for Narwhal { + async fn start( + self, + committee: Arc, + shared_worker_cache: SharedWorkerCache, + execution_state: Arc, + tx_validator: V, + ) -> Result<()> + where + S: ExecutionState + Send + Sync + 'static, + { + let mut store_path = self.storage_base_path.clone(); + store_path.push(format!("epoch{}", committee.epoch())); + let store = NodeStorage::reopen(store_path); + let primary_key = self.keypair.authority_keypair().public().clone(); + + self.primary_node + .start( + self.keypair.authority_keypair().copy(), + self.network_keypair.keypair().copy(), + Arc::new(ArcSwap::new(committee.clone().into())), + shared_worker_cache.clone(), + &store, + execution_state, + ) + .await?; + + self.worker_nodes + .start( + primary_key, + self.ids_and_keypairs, + Arc::new(ArcSwap::new(committee.clone().into())), + shared_worker_cache, + &store, + tx_validator, + ) + .await?; + + Ok(()) + } +} + +impl Narwhal { + pub fn new(config: NarwhalConfig, validators: Vec) -> Self { + let primary_node = PrimaryNode::new( + config.parameters.clone(), + config.internal_consensus, + config.registry_service.clone(), + ); + let worker_nodes = + WorkerNodes::new(config.registry_service.clone(), config.parameters.clone()); + + Self { + parameters: config.parameters, + keypair: config.keypair, + internal_consensus: config.internal_consensus, + storage_base_path: config.storage_base_path, + network_keypair: config.network_keypair, + ids_and_keypairs: config.ids_and_keypairs, + worker_nodes, + primary_node, + validators, + } + } +} + +pub struct Service { + config: NodeConfig, +} + +impl Service { + pub async fn start(config: &NodeConfig) -> Result<()> { + let validators: Vec = vec![ValidatorActor { + stake: 1, + keypair: config.keypair.clone(), + primary_address: config.network_address.clone(), + worker_keypair: config.worker_keypair.clone(), + network_keypair: config.network_keypair.clone(), + }]; + + let authorities = validators + .iter() + .map(|validator| { + ( + validator.keypair.authority_keypair().public().clone(), + Authority { + stake: 1, + primary_address: validator.primary_address.clone(), + network_key: validator.network_keypair.keypair().public().clone(), + }, + ) + }) + .collect(); + + let worker_cache = WorkerCache { + epoch: Epoch::default(), + // BTreeMap, + workers: validators + .iter() + .map(|validator| { + let worker_address: Multiaddr = "/ip4/127.0.0.1/udp/0".parse().unwrap(); + let worker_transactions = "/ip4/127.0.0.1/tcp/0/http".parse().unwrap(); + + ( + validator.keypair.authority_keypair().public().clone(), + WorkerIndex( + [( + 0, + WorkerInfo { + name: validator.worker_keypair.keypair().public().clone(), + transactions: worker_transactions, + worker_address, + }, + )] + .into_iter() + .collect(), + ), + ) + }) + .collect(), + }; + + let committee = Committee { + epoch: Epoch::default(), + authorities, + }; + + // todo(botch): handle unwrap + // cloning much? + let narwhal_config = NarwhalConfig { + keypair: config.keypair.clone(), + network_keypair: config.network_keypair.clone(), + // pass this in from the instantiation + registry_service: RegistryService::new(Registry::new()), + ids_and_keypairs: vec![(0, config.worker_keypair.keypair().copy())], + internal_consensus: Default::default(), + parameters: Parameters::default(), + storage_base_path: config.db_path.clone(), + }; + + let narwhal = Narwhal::new(narwhal_config, validators); + let (tx, _rx) = channel(1); + let shared_worker_cache = SharedWorkerCache::from(worker_cache); + let execution_state = Arc::new(Execution::new(unimplemented!(), Epoch::default(), tx)); + + let tx_validator = Validator::new(); + + narwhal + .start( + committee.into(), + shared_worker_cache, + execution_state, + tx_validator, + ) + .await?; + + Ok(()) + } +} diff --git a/crates/ursa-consensus/src/main.rs b/crates/ursa-consensus/src/main.rs new file mode 100644 index 00000000..d3dadc21 --- /dev/null +++ b/crates/ursa-consensus/src/main.rs @@ -0,0 +1,45 @@ +// Copyright 2022-2023 Fleek Network +// SPDX-License-Identifier: Apache-2.0, MIT + +use anyhow::Result; +use fastcrypto::traits::KeyPair as PrimaryKeyPair; +use multiaddr::Multiaddr; +use narwhal_crypto::{KeyPair, NetworkKeyPair as NarwhalNetworkKeyPair}; +use rand::{rngs::OsRng, SeedableRng}; +use tracing::info; +use ursa_consensus::{ + config::{NetworkKeyPair, NodeConfig, ValidatorKeyPair}, + Service, +}; + +#[tokio::main] +async fn main() -> Result<()> { + let listen_address: Multiaddr = "/ip4/0.0.0.0/tcp/5678".parse().unwrap(); + + // todo(botch): Abstract + let mut rng = rand::rngs::StdRng::from_rng(OsRng).unwrap(); + + let dir = tempfile::TempDir::new().unwrap().into_path(); + let keypair = KeyPair::generate(&mut rng); + let worker_keypair = NarwhalNetworkKeyPair::generate(&mut rng); + let account_keypair = NarwhalNetworkKeyPair::generate(&mut rng); + let network_keypair = NarwhalNetworkKeyPair::generate(&mut rng); + + let primary_address: Multiaddr = "/ip4/127.0.0.1/udp/0".parse().unwrap(); + let network_address: Multiaddr = "/ip4/127.0.0.1/tcp/0/http".parse().unwrap(); + + let config = NodeConfig { + keypair: ValidatorKeyPair::new(keypair), + worker_keypair: NetworkKeyPair::new(worker_keypair), + account_keypair: NetworkKeyPair::new(account_keypair), + network_keypair: NetworkKeyPair::new(network_keypair), + network_address, + db_path: dir, + }; + + info!("Started narwhal listening on {}", listen_address); + + Service::start(&config).await?; + + Ok(()) +} diff --git a/crates/ursa-consensus/src/validator.rs b/crates/ursa-consensus/src/validator.rs new file mode 100644 index 00000000..d66396b0 --- /dev/null +++ b/crates/ursa-consensus/src/validator.rs @@ -0,0 +1,26 @@ +// Copyright 2022-2023 Fleek Network +// SPDX-License-Identifier: Apache-2.0, MIT + +use narwhal_worker::TransactionValidator; +use std::io::Error; + +#[derive(Clone)] +pub struct Validator {} + +impl Validator { + pub fn new() -> Self { + Self {} + } +} + +impl TransactionValidator for Validator { + type Error = Error; + + fn validate(&self, _t: &[u8]) -> Result<(), Self::Error> { + Ok(()) + } + + fn validate_batch(&self, _b: &narwhal_types::Batch) -> Result<(), Self::Error> { + Ok(()) + } +}