Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): enabled syncing pregenesis blocks over p2p #3192

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 1 addition & 21 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@ pub(super) struct EN {
impl EN {
/// Task running a consensus node for the external node.
/// It may be a validator, but it cannot be a leader (cannot propose blocks).
///
/// If `enable_pregenesis` is false,
/// before starting the consensus node it fetches all the blocks
/// older than consensus genesis from the main node using json RPC.
/// NOTE: currently `enable_pregenesis` is hardcoded to `false` in `era.rs`.
/// True is used only in tests. Once the `block_metadata` RPC is enabled everywhere
/// this flag should be removed and fetching pregenesis blocks will always be done
/// over the gossip network.
pub async fn run(
self,
ctx: &ctx::Ctx,
actions: ActionQueueSender,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
build_version: Option<semver::Version>,
enable_pregenesis: bool,
) -> anyhow::Result<()> {
let attester = config::attester_key(&secrets).context("attester_key")?;

Expand All @@ -74,24 +65,13 @@ impl EN {
.await
.wrap("try_update_global_config()")?;

let mut payload_queue = conn
let payload_queue = conn
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;

drop(conn);

// Fetch blocks before the genesis.
if !enable_pregenesis {
self.fetch_blocks(
ctx,
&mut payload_queue,
Some(global_config.genesis.first_block),
)
.await
.wrap("fetch_blocks()")?;
}

// Monitor the genesis of the main node.
// If it changes, it means that a hard fork occurred and we need to reset the consensus state.
s.spawn_bg::<()>({
Expand Down
14 changes: 2 additions & 12 deletions core/node/consensus/src/era.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,8 @@ pub async fn run_external_node(
is_validator = secrets.validator_key.is_some(),
"running external node"
);
// We will enable it once the main node on all envs supports
// `block_metadata()` JSON RPC method.
let enable_pregenesis = false;
en.run(
ctx,
actions,
cfg,
secrets,
Some(build_version),
enable_pregenesis,
)
.await
en.run(ctx, actions, cfg, secrets, Some(build_version))
.await
}
None => {
tracing::info!("running fetcher");
Expand Down
11 changes: 1 addition & 10 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub(super) struct ConfigSet {
net: network::Config,
pub(super) config: config::ConsensusConfig,
pub(super) secrets: config::ConsensusSecrets,
pub(super) enable_pregenesis: bool,
}

impl ConfigSet {
Expand All @@ -83,17 +82,11 @@ impl ConfigSet {
config: make_config(&net, None),
secrets: make_secrets(&net, None),
net,
enable_pregenesis: self.enable_pregenesis,
}
}
}

pub(super) fn new_configs(
rng: &mut impl Rng,
setup: &Setup,
seed_peers: usize,
pregenesis: bool,
) -> Vec<ConfigSet> {
pub(super) fn new_configs(rng: &mut impl Rng, setup: &Setup, seed_peers: usize) -> Vec<ConfigSet> {
let net_cfgs = network::testonly::new_configs(rng, setup, 0);
let genesis_spec = config::GenesisSpec {
chain_id: setup.genesis.chain_id.0.try_into().unwrap(),
Expand Down Expand Up @@ -133,7 +126,6 @@ pub(super) fn new_configs(
config: make_config(&net, Some(genesis_spec.clone())),
secrets: make_secrets(&net, setup.attester_keys.get(i).cloned()),
net,
enable_pregenesis: pregenesis,
})
.collect()
}
Expand Down Expand Up @@ -473,7 +465,6 @@ impl StateKeeper {
cfgs.config,
cfgs.secrets,
cfgs.net.build_version,
cfgs.enable_pregenesis,
)
.await
}
Expand Down
10 changes: 5 additions & 5 deletions core/node/consensus/src/tests/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context as _;
use rand::Rng as _;
use test_casing::{test_casing, Product};
use test_casing::test_casing;
use tracing::Instrument as _;
use zksync_concurrency::{ctx, error::Wrap, scope};
use zksync_consensus_roles::{
Expand All @@ -12,7 +12,7 @@ use zksync_test_account::Account;
use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use super::{POLL_INTERVAL, PREGENESIS, VERSIONS};
use super::{POLL_INTERVAL, VERSIONS};
use crate::{
mn::run_main_node,
registry::{testonly, Registry},
Expand Down Expand Up @@ -126,9 +126,9 @@ async fn test_attestation_status_api(version: ProtocolVersionId) {
// Test running a couple of attesters (which are also validators).
// Main node is expected to collect all certificates.
// External nodes are expected to just vote for the batch.
#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_multiple_attesters(version: ProtocolVersionId, pregenesis: bool) {
async fn test_multiple_attesters(version: ProtocolVersionId) {
const NODES: usize = 4;

zksync_concurrency::testonly::abort_on_panic();
Expand All @@ -137,7 +137,7 @@ async fn test_multiple_attesters(version: ProtocolVersionId, pregenesis: bool) {
let account = &mut Account::random();
let to_fund = &[account.address];
let setup = Setup::new(rng, 4);
let mut cfgs = new_configs(rng, &setup, NODES, pregenesis);
let mut cfgs = new_configs(rng, &setup, NODES);
scope::run!(ctx, |ctx, s| async {
let validator_pool = ConnectionPool::test(false, version).await;
let (mut validator, runner) = StateKeeper::new(ctx, validator_pool.clone()).await?;
Expand Down
56 changes: 25 additions & 31 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mod attestation;

const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()];
const FROM_SNAPSHOT: [bool; 2] = [true, false];
const PREGENESIS: [bool; 2] = [true, false];
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(500);

#[test_casing(2, VERSIONS)]
Expand Down Expand Up @@ -190,14 +189,14 @@ async fn test_validator_block_store(version: ProtocolVersionId) {
// In the current implementation, consensus certificates are created asynchronously
// for the L2 blocks constructed by the StateKeeper. This means that consensus actor
// is effectively just back filling the consensus certificates for the L2 blocks in storage.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_validator(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -254,14 +253,14 @@ async fn test_validator(from_snapshot: bool, version: ProtocolVersionId, pregene
}

// Test running a validator node and 2 full nodes recovered from different snapshots.
#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_nodes_from_various_snapshots(version: ProtocolVersionId, pregenesis: bool) {
async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -335,14 +334,14 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId, pregenesi
.unwrap();
}

#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let mut validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let mut validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -412,16 +411,16 @@ async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId, pre
// Test running a validator node and a couple of full nodes.
// Validator is producing signed blocks and fetchers are expected to fetch
// them directly or indirectly.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) {
const NODES: usize = 2;

zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

// topology:
Expand Down Expand Up @@ -500,16 +499,16 @@ async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId, pregen
}

// Test running external node (non-leader) validators.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) {
const NODES: usize = 3;

zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, NODES);
let cfgs = testonly::new_configs(rng, &setup, 1, pregenesis);
let cfgs = testonly::new_configs(rng, &setup, 1);
let account = &mut Account::random();

// Run all nodes in parallel.
Expand Down Expand Up @@ -583,18 +582,14 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId, pre
}

// Test fetcher back filling missing certs.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_p2p_fetcher_backfill_certs(
from_snapshot: bool,
version: ProtocolVersionId,
pregenesis: bool,
) {
async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -668,16 +663,16 @@ async fn test_p2p_fetcher_backfill_certs(
}

// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
// We force certs to be missing on EN by having 1 of the validators permanently offline.
// This way no blocks will be finalized at all, so no one will have certs.
let setup = Setup::new(rng, 2);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -749,8 +744,7 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let pregenesis = true;
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -797,14 +791,14 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
.unwrap();
}

#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_with_pruning(version: ProtocolVersionId, pregenesis: bool) {
async fn test_with_pruning(version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down
Loading