Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make L1 client async again #2240

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ derive_more = "0.99.17"
es-version = { git = "https://github.com/EspressoSystems/es-version.git", branch = "main" }
dotenvy = "0.15"
dyn-clone = "1.0"
ethers = { version = "2.0", features = ["solc"] }
ethers = { version = "2.0", features = ["solc", "ws"] }
futures = "0.3"

hotshot = { git = "https://github.com/EspressoSystems/hotshot", tag = "0.5.79" }
Expand Down Expand Up @@ -114,6 +114,7 @@ jf-relation = { git = "https://github.com/EspressoSystems/jellyfish", tag = "0.4
jf-utils = { git = "https://github.com/EspressoSystems/jellyfish", tag = "0.4.5" }
libp2p = { version = "0.53", default-features = false }
log-panics = { version = "2.0", features = ["with-backtrace"] }
lru = "0.12"
strum = { version = "0.26", features = ["derive"] }
surf-disco = "0.9"
sqlx = "0.8"
Expand Down
6 changes: 4 additions & 2 deletions builder/src/bin/permissionless-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn run<V: Versions>(
) -> anyhow::Result<()> {
let l1_params = L1Params {
url: opt.l1_provider_url,
events_max_block_range: 10000,
options: Default::default(),
};

let builder_key_pair = EthKeyPair::from_mnemonic(&opt.eth_mnemonic, opt.eth_account_index)?;
Expand All @@ -138,7 +138,9 @@ async fn run<V: Versions>(
let builder_server_url: Url = format!("http://0.0.0.0:{}", opt.port).parse().unwrap();

let instance_state =
build_instance_state::<V>(genesis.chain_config, l1_params, opt.state_peers).unwrap();
build_instance_state::<V>(genesis.chain_config, l1_params, opt.state_peers)
.await
.unwrap();

let base_fee = genesis.max_base_fee();
tracing::info!(?base_fee, "base_fee");
Expand Down
8 changes: 4 additions & 4 deletions builder/src/non_permissioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use async_broadcast::broadcast;
use async_compatibility_layer::art::async_spawn;
use async_std::sync::{Arc, RwLock};
use espresso_types::{
eth_signature_key::EthKeyPair, v0_3::ChainConfig, FeeAmount, L1Client, NodeState, Payload,
SeqTypes, ValidatedState,
eth_signature_key::EthKeyPair, v0_3::ChainConfig, FeeAmount, NodeState, Payload, SeqTypes,
ValidatedState,
};
use hotshot::traits::BlockPayload;
use hotshot_builder_core::{
Expand Down Expand Up @@ -39,12 +39,12 @@ pub struct BuilderConfig {
pub hotshot_builder_apis_url: Url,
}

pub fn build_instance_state<V: Versions>(
pub async fn build_instance_state<V: Versions>(
chain_config: ChainConfig,
l1_params: L1Params,
state_peers: Vec<Url>,
) -> anyhow::Result<NodeState> {
let l1_client = L1Client::new(l1_params.url, l1_params.events_max_block_range);
let l1_client = l1_params.options.connect(l1_params.url).await?;
let instance_state = NodeState::new(
u64::MAX, // dummy node ID, only used for debugging
chain_config,
Expand Down
6 changes: 4 additions & 2 deletions marketplace-builder/src/bin/marketplace-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn run<V: Versions>(
) -> anyhow::Result<()> {
let l1_params = L1Params {
url: opt.l1_provider_url,
events_max_block_range: 10000,
options: Default::default(),
};

let is_reserve = opt.is_reserve;
Expand All @@ -165,7 +165,9 @@ async fn run<V: Versions>(
let builder_server_url: Url = format!("http://0.0.0.0:{}", opt.port).parse().unwrap();

let instance_state =
build_instance_state::<V>(genesis.chain_config, l1_params, opt.state_peers).unwrap();
build_instance_state::<V>(genesis.chain_config, l1_params, opt.state_peers)
.await
.unwrap();

let base_fee = genesis.max_base_fee();
tracing::info!(?base_fee, "base_fee");
Expand Down
4 changes: 2 additions & 2 deletions marketplace-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ pub struct BuilderConfig {
pub hotshot_builder_apis_url: Url,
}

pub fn build_instance_state<V: Versions>(
pub async fn build_instance_state<V: Versions>(
chain_config: ChainConfig,
l1_params: L1Params,
state_peers: Vec<Url>,
) -> anyhow::Result<NodeState> {
let l1_client = L1Client::new(l1_params.url, l1_params.events_max_block_range);
let l1_client = l1_params.options.connect(l1_params.url).await?;

let instance_state = NodeState::new(
u64::MAX, // dummy node ID, only used for debugging
Expand Down
4 changes: 4 additions & 0 deletions sequencer/api/public-env-vars.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ variables = [
"ESPRESSO_SEQUENCER_HOTSHOT_ADDRESS",
"ESPRESSO_SEQUENCER_HOTSHOT_EVENT_STREAMING_API_PORT",
"ESPRESSO_SEQUENCER_IS_DA",
"ESPRESSO_SEQUENCER_L1_BLOCKS_CACHE_SIZE",
"ESPRESSO_SEQUENCER_L1_EVENTS_CHANNEL_CAPACITY",
"ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE",
"ESPRESSO_SEQUENCER_L1_POLLING_INTERVAL",
"ESPRESSO_SEQUENCER_L1_RETRY_DELAY",
"ESPRESSO_SEQUENCER_LIBP2P_ADVERTISE_ADDRESS",
"ESPRESSO_SEQUENCER_LIBP2P_BIND_ADDRESS",
"ESPRESSO_SEQUENCER_MAX_CONNECTIONS",
Expand Down
4 changes: 4 additions & 0 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
.create_gauge("node_index".into(), None)
.set(instance_state.node_id as usize);

// Start L1 client if it isn't already.
instance_state.l1_client.start().await;

// Load saved consensus state from storage.
let (initializer, anchor_view) = persistence
.load_consensus_state::<V>(instance_state.clone())
Expand Down Expand Up @@ -321,6 +324,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
tracing::info!("shutting down SequencerContext");
self.handle.write().await.shut_down().await;
self.tasks.shut_down().await;
self.node_state.l1_client.stop().await;

// Since we've already shut down, we can set `detached` so the drop
// handler doesn't call `shut_down` again.
Expand Down
9 changes: 5 additions & 4 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use async_std::sync::RwLock;
use catchup::StatePeers;
use context::SequencerContext;
use espresso_types::{
traits::EventConsumer, BackoffParams, L1Client, NodeState, PubKey, SeqTypes,
traits::EventConsumer, BackoffParams, L1Client, L1ClientOptions, NodeState, PubKey, SeqTypes,
SolverAuctionResultsProvider, ValidatedState,
};
use ethers::types::U256;
Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct NetworkParams {

pub struct L1Params {
pub url: Url,
pub events_max_block_range: u64,
pub options: L1ClientOptions,
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -473,7 +473,8 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
genesis_state.prefund_account(address, amount);
}

let l1_client = L1Client::new(l1_params.url, l1_params.events_max_block_range);
let l1_client = l1_params.options.connect(l1_params.url).await?;
l1_client.start().await;
let l1_genesis = match genesis.l1_finalized {
L1Finalized::Block(b) => b,
L1Finalized::Number { number } => l1_client.wait_for_finalized_block(number).await,
Expand Down Expand Up @@ -975,7 +976,7 @@ pub mod testing {
let node_state = NodeState::new(
i as u64,
state.chain_config.resolve().unwrap_or_default(),
L1Client::new(self.l1_url.clone(), 1000),
L1Client::new(self.l1_url.clone()).await.unwrap(),
catchup::local_and_remote(persistence_opt.clone(), catchup).await,
V::Base::VERSION,
)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
let (private_staking_key, private_state_key) = opt.private_keys()?;
let l1_params = L1Params {
url: opt.l1_provider_url,
events_max_block_range: opt.l1_events_max_block_range,
options: opt.l1_options,
};

let network_params = NetworkParams {
Expand Down
12 changes: 4 additions & 8 deletions sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use anyhow::{bail, Context};
use clap::{error::ErrorKind, Args, FromArgMatches, Parser};
use derivative::Derivative;
use espresso_types::{parse_duration, BackoffParams};
use espresso_types::{parse_duration, BackoffParams, L1ClientOptions};
use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey};
use libp2p::Multiaddr;
use url::Url;
Expand Down Expand Up @@ -334,13 +334,9 @@ pub struct Options {
#[derivative(Debug(format_with = "Display::fmt"))]
pub l1_provider_url: Url,

/// Maximum number of L1 blocks that can be scanned for events in a single query.
#[clap(
long,
env = "ESPRESSO_SEQUENCER_L1_EVENTS_MAX_BLOCK_RANGE",
default_value = "10000"
)]
pub l1_events_max_block_range: u64,
/// Configuration for the L1 client.
#[clap(flatten)]
pub l1_options: L1ClientOptions,

/// Whether or not we are a DA node.
#[clap(long, env = "ESPRESSO_SEQUENCER_IS_DA", action)]
Expand Down
4 changes: 3 additions & 1 deletion sequencer/src/restart_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl<S: TestableSequencerDataSource> TestNode<S> {
.join(","),
"--l1-provider-url",
network.l1_provider,
"--l1-polling-interval",
"1s",
]);
opt.is_da = node.is_da;
Self {
Expand Down Expand Up @@ -557,7 +559,7 @@ impl TestNetwork {
};

let anvil_port = ports.pick();
let anvil = Anvil::new().port(anvil_port).spawn();
let anvil = Anvil::new().port(anvil_port).block_time(1u64).spawn();
let anvil_endpoint = anvil.endpoint();

let api_ports = node_params
Expand Down
2 changes: 2 additions & 0 deletions types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ testing = ["hotshot-testing"]

anyhow = { workspace = true }
ark-serialize = { workspace = true }
async-broadcast = { workspace = true }
async-compatibility-layer = { workspace = true }
async-std = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -36,6 +37,7 @@ itertools = { workspace = true }
jf-merkle-tree = { workspace = true }
jf-utils = { workspace = true } # TODO temporary: used only for test_rng()
jf-vid = { workspace = true }
lru = { workspace = true }
num-traits = { workspace = true }
paste = { workspace = true }
pretty_assertions = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion types/src/v0/impls/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,11 @@ mod test_headers {

let anvil = Anvil::new().block_time(1u32).spawn();
let mut genesis_state = NodeState::mock()
.with_l1(L1Client::new(anvil.endpoint().parse().unwrap(), 1))
.with_l1(
L1Client::new(anvil.endpoint().parse().unwrap())
.await
.unwrap(),
)
.with_current_version(StaticVersion::<0, 1>::version());

let genesis = GenesisForTest::default().await;
Expand Down
8 changes: 4 additions & 4 deletions types/src/v0/impls/instance_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl NodeState {
Self::new(
0,
ChainConfig::default(),
L1Client::new("http://localhost:3331".parse().unwrap(), 10000),
L1Client::http("http://localhost:3331".parse().unwrap()),
mock::MockStateCatchup::default(),
StaticVersion::<0, 1>::version(),
)
Expand All @@ -84,7 +84,7 @@ impl NodeState {
Self::new(
0,
ChainConfig::default(),
L1Client::new("http://localhost:3331".parse().unwrap(), 10000),
L1Client::http("http://localhost:3331".parse().unwrap()),
mock::MockStateCatchup::default(),
StaticVersion::<0, 2>::version(),
)
Expand All @@ -97,7 +97,7 @@ impl NodeState {
Self::new(
0,
ChainConfig::default(),
L1Client::new("http://localhost:3331".parse().unwrap(), 10000),
L1Client::http("http://localhost:3331".parse().unwrap()),
mock::MockStateCatchup::default(),
StaticVersion::<0, 3>::version(),
)
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Default for NodeState {
Self::new(
1u64,
ChainConfig::default(),
L1Client::new("http://localhost:3331".parse().unwrap(), 10000),
L1Client::http("http://localhost:3331".parse().unwrap()),
mock::MockStateCatchup::default(),
StaticVersion::<0, 1>::version(),
)
Expand Down
Loading
Loading