Skip to content

Commit

Permalink
DA: Consensus sampling (#708)
Browse files Browse the repository at this point in the history
* Add sampling relay to consensus and massage all generics

* Pipe in sampling filtering of blob info

* Add mark in block

* Pipe validate block

* Refactor mark_in_block -> mark_complete

* Fix generics on tests

* Fix generics on tests

* Fix rebase

* Cargo fmt after rebase

* Sampling service configuration

* Sampling service config in indexer integration tests

---------

Co-authored-by: Gusto <[email protected]>
  • Loading branch information
danielSanchezQ and bacv committed Sep 3, 2024
1 parent 6f6bb61 commit a13f861
Show file tree
Hide file tree
Showing 23 changed files with 678 additions and 85 deletions.
3 changes: 3 additions & 0 deletions nodes/nomos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d
tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling" }
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
Expand All @@ -40,6 +41,8 @@ nomos-system-sig = { path = "../../nomos-services/system-sig" }
tracing-subscriber = "0.3"
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" }
cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger" }
rand = "0.8"
rand_chacha = "0.3"
tokio = { version = "1.24", features = ["sync"] }
serde_json = "1.0"
serde_yaml = "0.9"
Expand Down
168 changes: 155 additions & 13 deletions nodes/nomos-node/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
Expand All @@ -29,6 +30,7 @@ use nomos_mempool::{
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{
cors::{Any, CorsLayer},
Expand All @@ -47,7 +49,19 @@ pub struct AxumBackendSettings {
pub cors_origins: Vec<String>,
}

pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
pub struct AxumBackend<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<A>,
_blob: core::marker::PhantomData<B>,
Expand All @@ -56,6 +70,9 @@ pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
_verifier_backend: core::marker::PhantomData<VB>,
_tx: core::marker::PhantomData<T>,
_storage_serde: core::marker::PhantomData<S>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
}

#[derive(OpenApi)]
Expand All @@ -72,7 +89,32 @@ pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
struct ApiDoc;

#[async_trait::async_trait]
impl<A, B, C, V, VB, T, S, const SIZE: usize> Backend for AxumBackend<A, B, C, V, VB, T, S, SIZE>
impl<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> Backend
for AxumBackend<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>
where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -119,6 +161,12 @@ where
<T as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send + 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
Expand All @@ -136,6 +184,9 @@ where
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
})
}

Expand Down Expand Up @@ -166,16 +217,45 @@ where
.route("/cl/status", routing::post(cl_status::<T>))
.route(
"/cryptarchia/info",
routing::get(cryptarchia_info::<T, S, SIZE>),
routing::get(
cryptarchia_info::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(cryptarchia_headers::<T, S, SIZE>),
routing::get(
cryptarchia_headers::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route("/da/add_blob", routing::post(add_blob::<A, B, VB, S>))
.route(
"/da/get_range",
routing::post(get_range::<T, C, V, S, SIZE>),
routing::post(
get_range::<
T,
C,
V,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route("/storage/block", routing::post(block::<S, T>))
Expand Down Expand Up @@ -263,7 +343,14 @@ struct QueryParams {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_info<Tx, SS, const SIZE: usize>(
async fn cryptarchia_info<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(handle): State<OverwatchHandle>,
) -> Response
where
Expand All @@ -279,8 +366,21 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
make_request_and_return_response!(consensus::cryptarchia_info::<Tx, SS, SIZE>(&handle))
make_request_and_return_response!(consensus::cryptarchia_info::<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&handle))
}

#[utoipa::path(
Expand All @@ -291,7 +391,14 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_headers<Tx, SS, const SIZE: usize>(
async fn cryptarchia_headers<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(store): State<OverwatchHandle>,
Query(query): Query<QueryParams>,
) -> Response
Expand All @@ -308,11 +415,22 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
let QueryParams { from, to } = query;
make_request_and_return_response!(consensus::cryptarchia_headers::<Tx, SS, SIZE>(
&store, from, to
))
make_request_and_return_response!(consensus::cryptarchia_headers::<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&store, from, to))
}

#[utoipa::path(
Expand Down Expand Up @@ -358,7 +476,16 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_range<Tx, C, V, SS, const SIZE: usize>(
async fn get_range<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(handle): State<OverwatchHandle>,
Json(GetRangeReq { app_id, range }): Json<GetRangeReq<V>>,
) -> Response
Expand Down Expand Up @@ -400,8 +527,23 @@ where
<V as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
make_request_and_return_response!(da::get_range::<Tx, C, V, SS, SIZE>(&handle, app_id, range))
make_request_and_return_response!(da::get_range::<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&handle, app_id, range))
}

#[utoipa::path(
Expand Down
1 change: 1 addition & 0 deletions nodes/nomos-node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct Config {
<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub da_indexer: <crate::DaIndexer as ServiceData>::Settings,
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
}
Expand Down
39 changes: 36 additions & 3 deletions nodes/nomos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorage
use nomos_da_indexer::DataIndexerService;
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter;
use nomos_da_sampling::DaSamplingService;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter as VerifierNetworkAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
Expand All @@ -41,13 +44,30 @@ use nomos_storage::{
use nomos_system_sig::SystemSig;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use rand_chacha::ChaCha20Rng;
use serde::{de::DeserializeOwned, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
// internal
pub use tx::Tx;

pub type NomosApiService =
ApiService<AxumBackend<(), DaBlob, BlobInfo, BlobInfo, KzgrsDaVerifier, Tx, Wire, MB16>>;
/// Membership used by the DA Network service.
pub type NomosDaMembership = FillFromNodeList;

pub type NomosApiService = ApiService<
AxumBackend<
(),
DaBlob,
BlobInfo,
BlobInfo,
KzgrsDaVerifier,
Tx,
Wire,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
MB16,
>,
>;

pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
Expand All @@ -62,6 +82,9 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;

pub type TxMempool = TxMempoolService<
Expand All @@ -88,6 +111,15 @@ pub type DaIndexer = DataIndexerService<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;

pub type DaSampling = DaSamplingService<
KzgrsSamplingBackend<ChaCha20Rng>,
SamplingLibp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;

pub type DaVerifier = DaVerifierService<
Expand All @@ -103,7 +135,8 @@ pub struct Nomos {
network: ServiceHandle<NetworkService<NetworkBackend>>,
da_indexer: ServiceHandle<DaIndexer>,
da_verifier: ServiceHandle<DaVerifier>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_sampling: ServiceHandle<DaSampling>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<NomosDaMembership>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
Expand Down
1 change: 1 addition & 0 deletions nodes/nomos-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn main() -> Result<()> {
},
da_network: config.da_network,
da_indexer: config.da_indexer,
da_sampling: config.da_sampling,
da_verifier: config.da_verifier,
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
Expand Down
2 changes: 1 addition & 1 deletion nomos-core/src/block/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
}

#[must_use]
pub fn with_blobs_certificates(
pub fn with_blobs_info(
mut self,
blobs_certificates: impl Iterator<Item = B> + 'static,
) -> Self {
Expand Down
Loading

0 comments on commit a13f861

Please sign in to comment.