diff --git a/Cargo.lock b/Cargo.lock index 6545ad86..f63ea7e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8527,9 +8527,12 @@ name = "relayer" version = "0.1.0" dependencies = [ "anyhow", + "ark-serialize 0.4.2", "axum", "bridging_payment", "cgo_oligami", + "checkpoint_light_client", + "checkpoint_light_client-io", "clap", "dotenv", "ethereum-client", @@ -8550,6 +8553,7 @@ dependencies = [ "prometheus", "prover", "rand 0.8.5", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index e77df405..128d273b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ bridging_payment = { path = "gear-programs/bridging-payment" } gear_proof_storage = { path = "gear-programs/proof-storage" } checkpoint_light_client-io = { path = "gear-programs/checkpoint-light-client/io", default-features = false } utils-prometheus = { path = "utils-prometheus" } +checkpoint_light_client = { path = "gear-programs/checkpoint-light-client", default-features = false } plonky2 = { git = "https://github.com/gear-tech/plonky2.git", rev = "4a620f4d79efe9233d0e7682df5a2fc625b5420e" } plonky2_field = { git = "https://github.com/gear-tech/plonky2.git", rev = "4a620f4d79efe9233d0e7682df5a2fc625b5420e" } diff --git a/ethereum-common/src/utils.rs b/ethereum-common/src/utils.rs index 215bc1e5..e70cb231 100644 --- a/ethereum-common/src/utils.rs +++ b/ethereum-common/src/utils.rs @@ -9,6 +9,10 @@ pub fn calculate_period(slot: u64) -> u64 { calculate_epoch(slot) / EPOCHS_PER_SYNC_COMMITTEE } +pub fn calculate_slot(period: u64) -> u64 { + period * SLOTS_PER_EPOCH * EPOCHS_PER_SYNC_COMMITTEE +} + pub fn decode_hex_bytes<'de, D>(deserializer: D) -> Result, D::Error> where D: serde::Deserializer<'de>, diff --git a/gear-programs/checkpoint-light-client/src/lib.rs b/gear-programs/checkpoint-light-client/src/lib.rs index 4cbc6ba9..dc89fc13 100644 --- a/gear-programs/checkpoint-light-client/src/lib.rs +++ b/gear-programs/checkpoint-light-client/src/lib.rs @@ -15,6 +15,3 @@ use gstd::{Box, Vec}; #[cfg(not(feature = "std"))] mod wasm; - -#[cfg(test)] -mod tests; diff --git a/gear-programs/checkpoint-light-client/src/tests/mod.rs b/gear-programs/checkpoint-light-client/src/tests/mod.rs deleted file mode 100644 index 747420f0..00000000 --- a/gear-programs/checkpoint-light-client/src/tests/mod.rs +++ /dev/null @@ -1,641 +0,0 @@ -use crate::WASM_BINARY; -use anyhow::Error as AnyError; -use ark_bls12_381::{G1Projective as G1, G2Projective as G2}; -use ark_serialize::CanonicalDeserialize; -use checkpoint_light_client_io::{ - ethereum_common::{ - base_types::{BytesFixed, FixedArray}, - beacon::{BLSPubKey, Bytes32, SignedBeaconBlockHeader, SyncAggregate, SyncCommittee}, - network::Network, - utils as eth_utils, SLOTS_PER_EPOCH, - }, - replay_back, sync_update, - tree_hash::TreeHash, - ArkScale, BeaconBlockHeader, G1TypeInfo, G2TypeInfo, Handle, HandleResult, Init, - SyncCommitteeUpdate, -}; -use gclient::{EventListener, EventProcessor, GearApi, Result}; -use gstd::prelude::*; -use reqwest::{Client, RequestBuilder}; -use serde::{de::DeserializeOwned, Deserialize}; -use tokio::time::{self, Duration}; - -// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#configuration -pub const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u8 = 128; -const RPC_URL: &str = "http://127.0.0.1:5052"; - -const FINALITY_UPDATE_5_254_112: &[u8; 4_940] = - include_bytes!("./sepolia-finality-update-5_254_112.json"); -const FINALITY_UPDATE_5_263_072: &[u8; 4_941] = - include_bytes!("./sepolia-finality-update-5_263_072.json"); - -#[derive(Deserialize)] -#[serde(untagged)] -enum LightClientHeader { - Unwrapped(BeaconBlockHeader), - Wrapped(Beacon), -} - -#[derive(Deserialize)] -struct Beacon { - beacon: BeaconBlockHeader, -} - -#[derive(Deserialize, Debug)] -struct BeaconBlockHeaderResponse { - data: BeaconBlockHeaderData, -} - -#[derive(Deserialize, Debug)] -struct BeaconBlockHeaderData { - header: SignedBeaconBlockHeader, -} - -pub fn header_deserialize<'de, D>(deserializer: D) -> Result -where - D: serde::Deserializer<'de>, -{ - let header: LightClientHeader = Deserialize::deserialize(deserializer)?; - - Ok(match header { - LightClientHeader::Unwrapped(header) => header, - LightClientHeader::Wrapped(header) => header.beacon, - }) -} - -#[derive(Deserialize, Debug)] -pub struct Bootstrap { - #[serde(deserialize_with = "header_deserialize")] - pub header: BeaconBlockHeader, - pub current_sync_committee: SyncCommittee, - pub current_sync_committee_branch: Vec, -} - -#[derive(Deserialize, Debug)] -struct BootstrapResponse { - data: Bootstrap, -} - -#[derive(Deserialize)] -struct FinalityUpdateResponse { - data: FinalityUpdate, -} - -#[derive(Deserialize)] -pub struct FinalityUpdate { - #[serde(deserialize_with = "header_deserialize")] - pub attested_header: BeaconBlockHeader, - #[serde(deserialize_with = "header_deserialize")] - pub finalized_header: BeaconBlockHeader, - pub finality_branch: Vec, - pub sync_aggregate: SyncAggregate, - #[serde(deserialize_with = "eth_utils::deserialize_u64")] - pub signature_slot: u64, -} - -#[derive(Debug, Clone, Deserialize)] -pub struct Update { - #[serde(deserialize_with = "header_deserialize")] - pub attested_header: BeaconBlockHeader, - pub next_sync_committee: SyncCommittee, - pub next_sync_committee_branch: Vec, - #[serde(deserialize_with = "header_deserialize")] - pub finalized_header: BeaconBlockHeader, - pub finality_branch: Vec, - pub sync_aggregate: SyncAggregate, - #[serde(deserialize_with = "eth_utils::deserialize_u64")] - pub signature_slot: u64, -} - -#[derive(Debug, Clone, Deserialize)] -struct UpdateData { - data: Update, -} - -type UpdateResponse = Vec; - -async fn get(request_builder: RequestBuilder) -> Result { - let bytes = request_builder - .send() - .await - .map_err(AnyError::from)? - .bytes() - .await - .map_err(AnyError::from)?; - - Ok(serde_json::from_slice::(&bytes).map_err(AnyError::from)?) -} - -async fn get_bootstrap(client: &mut Client, checkpoint: &str) -> Result { - let checkpoint_no_prefix = match checkpoint.starts_with("0x") { - true => &checkpoint[2..], - false => checkpoint, - }; - - let url = format!("{RPC_URL}/eth/v1/beacon/light_client/bootstrap/0x{checkpoint_no_prefix}",); - - get::(client.get(&url)) - .await - .map(|response| response.data) -} - -async fn get_finality_update(client: &mut Client) -> Result { - let url = format!("{RPC_URL}/eth/v1/beacon/light_client/finality_update"); - - get::(client.get(&url)) - .await - .map(|response| response.data) -} - -async fn get_updates(client: &mut Client, period: u64, count: u8) -> Result { - let count = cmp::min(count, MAX_REQUEST_LIGHT_CLIENT_UPDATES); - let url = format!( - "{RPC_URL}/eth/v1/beacon/light_client/updates?start_period={period}&count={count}", - ); - - get::(client.get(&url)).await -} - -async fn get_block_header(client: &Client, slot: u64) -> Result { - let url = format!("{RPC_URL}/eth/v1/beacon/headers/{slot}"); - - get::(client.get(&url)) - .await - .map(|response| response.data.header.message) -} - -fn map_public_keys(compressed_public_keys: &[BLSPubKey]) -> Vec> { - compressed_public_keys - .iter() - .map(|BytesFixed(pub_key_compressed)| { - let pub_key = ::deserialize_compressed_unchecked( - &pub_key_compressed.0[..], - ) - .unwrap(); - let ark_scale: ArkScale = G1TypeInfo(pub_key).into(); - - ark_scale - }) - .collect() -} - -fn sync_update_from_finality( - signature: G2, - finality_update: FinalityUpdate, -) -> SyncCommitteeUpdate { - SyncCommitteeUpdate { - signature_slot: finality_update.signature_slot, - attested_header: finality_update.attested_header, - finalized_header: finality_update.finalized_header, - sync_aggregate: finality_update.sync_aggregate, - sync_committee_next_aggregate_pubkey: None, - sync_committee_signature: G2TypeInfo(signature).into(), - sync_committee_next_pub_keys: None, - sync_committee_next_branch: None, - finality_branch: finality_update - .finality_branch - .into_iter() - .map(|BytesFixed(array)| array.0) - .collect::<_>(), - } -} - -fn sync_update_from_update(update: Update) -> SyncCommitteeUpdate { - let signature = ::deserialize_compressed( - &update.sync_aggregate.sync_committee_signature.0 .0[..], - ) - .unwrap(); - - let next_sync_committee_keys = map_public_keys(&update.next_sync_committee.pubkeys.0); - - SyncCommitteeUpdate { - signature_slot: update.signature_slot, - attested_header: update.attested_header, - finalized_header: update.finalized_header, - sync_aggregate: update.sync_aggregate, - sync_committee_next_aggregate_pubkey: Some(update.next_sync_committee.aggregate_pubkey), - sync_committee_signature: G2TypeInfo(signature).into(), - sync_committee_next_pub_keys: Some(Box::new(FixedArray( - next_sync_committee_keys.try_into().unwrap(), - ))), - sync_committee_next_branch: Some( - update - .next_sync_committee_branch - .into_iter() - .map(|BytesFixed(array)| array.0) - .collect::<_>(), - ), - finality_branch: update - .finality_branch - .into_iter() - .map(|BytesFixed(array)| array.0) - .collect::<_>(), - } -} - -async fn common_upload_program( - client: &GearApi, - code: Vec, - payload: impl Encode, -) -> Result<([u8; 32], [u8; 32])> { - let encoded_payload = payload.encode(); - let gas_limit = client - .calculate_upload_gas(None, code.clone(), encoded_payload, 0, true) - .await? - .min_limit; - println!("init gas {gas_limit:?}"); - let (message_id, program_id, _) = client - .upload_program( - code, - gclient::now_micros().to_le_bytes(), - payload, - gas_limit, - 0, - ) - .await?; - - Ok((message_id.into(), program_id.into())) -} - -async fn upload_program( - client: &GearApi, - listener: &mut EventListener, - payload: impl Encode, -) -> Result<[u8; 32]> { - let (message_id, program_id) = - common_upload_program(client, WASM_BINARY.to_vec(), payload).await?; - - assert!(listener - .message_processed(message_id.into()) - .await? - .succeed()); - - Ok(program_id) -} - -#[tokio::test] -async fn init_and_updating() -> Result<()> { - let mut client_http = Client::new(); - - // use the latest finality header as a checkpoint for bootstrapping - let finality_update = get_finality_update(&mut client_http).await?; - let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot); - let mut updates = get_updates(&mut client_http, current_period, 1).await?; - - let update = match updates.pop() { - Some(update) if updates.is_empty() => update.data, - _ => unreachable!("Requested single update"), - }; - - let checkpoint = update.finalized_header.tree_hash_root(); - let checkpoint_hex = hex::encode(checkpoint); - - let bootstrap = get_bootstrap(&mut client_http, &checkpoint_hex).await?; - let sync_update = sync_update_from_update(update); - - let pub_keys = map_public_keys(&bootstrap.current_sync_committee.pubkeys.0); - let init = Init { - network: Network::Sepolia, - sync_committee_current_pub_keys: Box::new(FixedArray(pub_keys.try_into().unwrap())), - sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, - sync_committee_current_branch: bootstrap - .current_sync_committee_branch - .into_iter() - .map(|BytesFixed(bytes)| bytes.0) - .collect(), - update: sync_update, - }; - - // let client = GearApi::dev_from_path("../target/release/gear").await?; - let client = GearApi::dev().await?; - let mut listener = client.subscribe().await?; - - let program_id = upload_program(&client, &mut listener, init).await?; - - println!("program_id = {:?}", hex::encode(program_id)); - - println!(); - println!(); - - for _ in 0..30 { - let update = get_finality_update(&mut client_http).await?; - - let slot: u64 = update.finalized_header.slot; - let current_period = eth_utils::calculate_period(slot); - let mut updates = get_updates(&mut client_http, current_period, 1).await?; - match updates.pop() { - Some(update) if updates.is_empty() && update.data.finalized_header.slot >= slot => { - println!("update sync committee"); - let payload = Handle::SyncUpdate(sync_update_from_update(update.data)); - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("update gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!( - matches!(result_decoded, HandleResult::SyncUpdate(result) if result.is_ok()) - ); - } - - _ => { - println!( - "slot = {slot:?}, attested slot = {:?}, signature slot = {:?}", - update.attested_header.slot, update.signature_slot - ); - let signature = ::deserialize_compressed( - &update.sync_aggregate.sync_committee_signature.0 .0[..], - ); - - let Ok(signature) = signature else { - println!("failed to deserialize point on G2"); - continue; - }; - - let payload = Handle::SyncUpdate(sync_update_from_finality(signature, update)); - - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("finality_update gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!( - matches!(result_decoded, HandleResult::SyncUpdate(result) if result.is_ok()) - ); - } - } - - println!(); - println!(); - - time::sleep(Duration::from_secs(6 * 60)).await; - } - - Ok(()) -} - -#[tokio::test] -async fn replaying_back() -> Result<()> { - let mut client_http = Client::new(); - - let finality_update: FinalityUpdateResponse = - serde_json::from_slice(FINALITY_UPDATE_5_254_112).unwrap(); - let finality_update = finality_update.data; - println!( - "finality_update slot = {}", - finality_update.finalized_header.slot - ); - - // This SyncCommittee operated for about 13K slots, so we make adjustments - let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot); - let mut updates = get_updates(&mut client_http, current_period - 1, 1).await?; - - let update = match updates.pop() { - Some(update) if updates.is_empty() => update.data, - _ => unreachable!("Requested single update"), - }; - let checkpoint = update.finalized_header.tree_hash_root(); - let checkpoint_hex = hex::encode(checkpoint); - - let bootstrap = get_bootstrap(&mut client_http, &checkpoint_hex).await?; - println!("bootstrap slot = {}", bootstrap.header.slot); - - println!("update slot = {}", update.finalized_header.slot); - let sync_update = sync_update_from_update(update); - let slot_start = sync_update.finalized_header.slot; - let slot_end = finality_update.finalized_header.slot; - println!( - "Replaying back from {slot_start} to {slot_end} ({} headers)", - slot_end - slot_start - ); - - let pub_keys = map_public_keys(&bootstrap.current_sync_committee.pubkeys.0); - let init = Init { - network: Network::Sepolia, - sync_committee_current_pub_keys: Box::new(FixedArray(pub_keys.try_into().unwrap())), - sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, - sync_committee_current_branch: bootstrap - .current_sync_committee_branch - .into_iter() - .map(|BytesFixed(bytes)| bytes.0) - .collect(), - update: sync_update, - }; - - // let client = GearApi::dev_from_path("../target/release/gear").await?; - let client = GearApi::dev().await?; - let mut listener = client.subscribe().await?; - - let program_id = upload_program(&client, &mut listener, init).await?; - - println!("program_id = {:?}", hex::encode(program_id)); - - println!(); - println!(); - - // start to replay back - let count_headers = 26 * SLOTS_PER_EPOCH; - let mut requests_headers = Vec::with_capacity(count_headers as usize); - for i in 1..count_headers { - requests_headers.push(get_block_header(&client_http, slot_end - i)); - } - - let headers = futures::future::join_all(requests_headers) - .await - .into_iter() - .filter_map(|maybe_header| maybe_header.ok()) - .collect::>(); - - let signature = ::deserialize_compressed( - &finality_update.sync_aggregate.sync_committee_signature.0 .0[..], - ) - .unwrap(); - - let payload = Handle::ReplayBackStart { - sync_update: sync_update_from_finality(signature, finality_update), - headers, - }; - - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("ReplayBackStart gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!(matches!( - result_decoded, - HandleResult::ReplayBackStart(Ok(replay_back::StatusStart::InProgress)) - )); - - // continue to replay back - let mut slot_end = slot_end - count_headers; - let count_headers = 29 * SLOTS_PER_EPOCH; - let count_batch = (slot_end - slot_start) / count_headers; - - for _batch in 0..count_batch { - let mut requests_headers = Vec::with_capacity(count_headers as usize); - for i in 0..count_headers { - requests_headers.push(get_block_header(&client_http, slot_end - i)); - } - - let headers = futures::future::join_all(requests_headers) - .await - .into_iter() - .filter_map(|maybe_header| maybe_header.ok()) - .collect::>(); - - let payload = Handle::ReplayBack(headers); - - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("ReplayBack gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!(matches!( - result_decoded, - HandleResult::ReplayBack(Some(replay_back::Status::InProcess)) - )); - - slot_end -= count_headers; - } - - // remaining headers - let mut requests_headers = Vec::with_capacity(count_headers as usize); - for i in slot_start..=slot_end { - requests_headers.push(get_block_header(&client_http, i)); - } - - let headers = futures::future::join_all(requests_headers) - .await - .into_iter() - .filter_map(|maybe_header| maybe_header.ok()) - .collect::>(); - - let payload = Handle::ReplayBack(headers); - - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("ReplayBack gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!(matches!( - result_decoded, - HandleResult::ReplayBack(Some(replay_back::Status::Finished)) - )); - - Ok(()) -} - -#[tokio::test] -async fn sync_update_requires_replaying_back() -> Result<()> { - let mut client_http = Client::new(); - - let finality_update: FinalityUpdateResponse = - serde_json::from_slice(FINALITY_UPDATE_5_263_072).unwrap(); - let finality_update = finality_update.data; - println!( - "finality_update slot = {}", - finality_update.finalized_header.slot - ); - - let slot = finality_update.finalized_header.slot; - let current_period = eth_utils::calculate_period(slot); - let mut updates = get_updates(&mut client_http, current_period, 1).await?; - - let update = match updates.pop() { - Some(update) if updates.is_empty() => update.data, - _ => unreachable!("Requested single update"), - }; - - let checkpoint = update.finalized_header.tree_hash_root(); - let checkpoint_hex = hex::encode(checkpoint); - - let bootstrap = get_bootstrap(&mut client_http, &checkpoint_hex).await?; - let sync_update = sync_update_from_update(update); - - let pub_keys = map_public_keys(&bootstrap.current_sync_committee.pubkeys.0); - let init = Init { - network: Network::Sepolia, - sync_committee_current_pub_keys: Box::new(FixedArray(pub_keys.try_into().unwrap())), - sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, - sync_committee_current_branch: bootstrap - .current_sync_committee_branch - .into_iter() - .map(|BytesFixed(bytes)| bytes.0) - .collect(), - update: sync_update, - }; - - let client = GearApi::dev().await?; - let mut listener = client.subscribe().await?; - - let program_id = upload_program(&client, &mut listener, init).await?; - - println!("program_id = {:?}", hex::encode(program_id)); - - println!(); - println!(); - - println!( - "slot = {slot:?}, attested slot = {:?}, signature slot = {:?}", - finality_update.attested_header.slot, finality_update.signature_slot - ); - let signature = ::deserialize_compressed( - &finality_update.sync_aggregate.sync_committee_signature.0 .0[..], - ) - .unwrap(); - - let payload = Handle::SyncUpdate(sync_update_from_finality(signature, finality_update)); - - let gas_limit = client - .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) - .await? - .min_limit; - println!("finality_update gas_limit {gas_limit:?}"); - - let (message_id, _) = client - .send_message(program_id.into(), payload, gas_limit, 0) - .await?; - - let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; - let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); - assert!(matches!( - result_decoded, - HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. })) - )); - - Ok(()) -} diff --git a/gear-programs/checkpoint-light-client/src/wasm/mod.rs b/gear-programs/checkpoint-light-client/src/wasm/mod.rs index 9c8fb362..d4b46f02 100644 --- a/gear-programs/checkpoint-light-client/src/wasm/mod.rs +++ b/gear-programs/checkpoint-light-client/src/wasm/mod.rs @@ -51,7 +51,8 @@ async fn init() { panic!("Current sync committee proof is not valid"); } - finalized_header.slot -= 1; + let period = eth_utils::calculate_period(finalized_header.slot) - 1; + finalized_header.slot = eth_utils::calculate_slot(period); match sync_update::verify( &network, &finalized_header, @@ -79,7 +80,11 @@ async fn init() { }) }, - _ => panic!("Incorrect initial sync committee update"), + Ok((finalized_header, sync_committee_next)) => panic!( + "Incorrect initial sync committee update ({}, {})", + finalized_header.is_some(), + sync_committee_next.is_some() + ), } } diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index cf671d10..bae262ac 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -11,7 +11,10 @@ gear-rpc-client.workspace = true prover.workspace = true anyhow.workspace = true +ark-serialize = { workspace = true, features = ["std"] } axum.workspace = true +checkpoint_light_client-io = { workspace = true, features = ["std"] } +checkpoint_light_client = { workspace = true, features = ["std"] } clap.workspace = true dotenv.workspace = true futures.workspace = true @@ -28,6 +31,7 @@ pretty_env_logger.workspace = true primitive-types = { workspace = true, features = ["std"] } prometheus.workspace = true rand.workspace = true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/relayer/src/ethereum_checkpoints/metrics.rs b/relayer/src/ethereum_checkpoints/metrics.rs new file mode 100644 index 00000000..714d9c47 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/metrics.rs @@ -0,0 +1,43 @@ +use prometheus::{IntCounter, IntGauge}; +use utils_prometheus::impl_metered_service; + +impl_metered_service! { + pub struct Updates { + pub fetched_sync_update_slot: IntGauge, + pub total_fetched_finality_updates: IntCounter, + pub total_fetched_committee_updates: IntCounter, + pub processed_finality_updates: IntCounter, + pub processed_committee_updates: IntCounter, + } +} + +impl Updates { + pub fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + fetched_sync_update_slot: IntGauge::new( + "checkpoints_relayer_fetched_sync_update_slot", + "The slot of the last applied update", + )?, + total_fetched_finality_updates: IntCounter::new( + "checkpoints_relayer_total_fetched_finality_updates", + "Total amount of fetched finality updates", + )?, + total_fetched_committee_updates: IntCounter::new( + "checkpoints_relayer_total_fetched_committee_updates", + "Total amount of fetched committee updates", + )?, + processed_finality_updates: IntCounter::new( + "checkpoints_relayer_processed_finality_updates", + "Amount of processed finality updates", + )?, + processed_committee_updates: IntCounter::new( + "checkpoints_relayer_processed_committee_updates", + "Amount of processed committee updates", + )?, + }) + } +} diff --git a/relayer/src/ethereum_checkpoints/mod.rs b/relayer/src/ethereum_checkpoints/mod.rs new file mode 100644 index 00000000..635841e0 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/mod.rs @@ -0,0 +1,199 @@ +use super::*; +use anyhow::{anyhow, Result as AnyResult}; +use checkpoint_light_client_io::{ + ethereum_common::{utils as eth_utils, SLOTS_PER_EPOCH}, + tree_hash::Hash256, + Handle, HandleResult, Slot, SyncCommitteeUpdate, G2, +}; +use futures::{ + future::{self, Either}, + pin_mut, +}; +use gclient::{EventListener, EventProcessor, GearApi, WSAddress}; +use parity_scale_codec::Decode; +use reqwest::Client; +use tokio::{ + signal::unix::{self, SignalKind}, + sync::mpsc::{self, Sender}, + time::{self, Duration}, +}; +use utils::{slots_batch::Iter as SlotsBatchIter, MAX_REQUEST_LIGHT_CLIENT_UPDATES}; + +#[cfg(test)] +mod tests; + +mod metrics; +mod replay_back; +mod sync_update; +mod utils; + +const SIZE_CHANNEL: usize = 100_000; +const SIZE_BATCH: u64 = 40 * SLOTS_PER_EPOCH; +const COUNT_FAILURE: usize = 3; +const DELAY_SECS_UPDATE_REQUEST: u64 = 30; + +pub async fn relay(args: RelayCheckpointsArgs) { + log::info!("Started"); + + let RelayCheckpointsArgs { + program_id, + beacon_endpoint, + vara_domain, + vara_port, + vara_suri, + prometheus_args: PrometheusArgs { + endpoint: endpoint_prometheus, + }, + } = args; + + let program_id_no_prefix = match program_id.starts_with("0x") { + true => &program_id[2..], + false => &program_id, + }; + + let program_id = hex::decode(program_id_no_prefix) + .ok() + .and_then(|bytes| <[u8; 32]>::try_from(bytes).ok()) + .expect("Expecting correct ProgramId"); + + let mut signal_interrupt = unix::signal(SignalKind::interrupt()).expect("Set SIGINT handler"); + + let (sender, mut receiver) = mpsc::channel(SIZE_CHANNEL); + let client_http = Client::new(); + + sync_update::spawn_receiver(client_http.clone(), beacon_endpoint.clone(), sender); + + let client = GearApi::init_with(WSAddress::new(vara_domain, vara_port), vara_suri) + .await + .expect("GearApi client should be created"); + + let gas_limit_block = client + .block_gas_limit() + .expect("Block gas limit should be determined"); + + // use 95% of block gas limit for all extrinsics + let gas_limit = gas_limit_block / 100 * 95; + log::info!("Gas limit for extrinsics: {gas_limit}"); + + let mut listener = client + .subscribe() + .await + .expect("Events listener should be created"); + + let sync_update = receiver + .recv() + .await + .expect("Updates receiver should be open before the loop"); + + let mut slot_last = sync_update.finalized_header.slot; + + match sync_update::try_to_apply( + &client, + &mut listener, + program_id, + sync_update.clone(), + gas_limit, + ) + .await + { + Err(e) => { + log::error!("{e:?}"); + return; + } + Ok(Err(sync_update::Error::ReplayBackRequired { + replayed_slot, + checkpoint, + })) => { + if let Err(e) = replay_back::execute( + &client_http, + &beacon_endpoint, + &client, + &mut listener, + program_id, + gas_limit, + replayed_slot, + checkpoint, + sync_update, + ) + .await + { + log::error!("{e:?}"); + } + + log::info!("Exiting"); + + return; + } + Ok(Ok(_) | Err(sync_update::Error::NotActual)) => (), + _ => { + slot_last = 0; + } + } + + let update_metrics = metrics::Updates::new(); + MetricsBuilder::new() + .register_service(&update_metrics) + .build() + .run(endpoint_prometheus) + .await; + + log::info!("Metrics service spawned"); + + loop { + let future_interrupt = signal_interrupt.recv(); + pin_mut!(future_interrupt); + + let future_update = receiver.recv(); + pin_mut!(future_update); + + let sync_update = match future::select(future_interrupt, future_update).await { + Either::Left((_interrupted, _)) => { + log::info!("Caught SIGINT. Exiting"); + return; + } + + Either::Right((Some(sync_update), _)) => sync_update, + Either::Right((None, _)) => { + log::info!("Updates receiver has been closed. Exiting"); + return; + } + }; + let slot = sync_update.finalized_header.slot; + + update_metrics + .fetched_sync_update_slot + .set(i64::from_le_bytes(slot.to_le_bytes())); + + if slot == slot_last { + update_metrics.total_fetched_finality_updates.inc(); + + continue; + } + + let committee_update = sync_update.sync_committee_next_pub_keys.is_some(); + match sync_update::try_to_apply(&client, &mut listener, program_id, sync_update, gas_limit) + .await + { + Ok(Ok(_)) => { + slot_last = slot; + + if committee_update { + update_metrics.total_fetched_committee_updates.inc(); + update_metrics.processed_committee_updates.inc(); + } else { + update_metrics.total_fetched_finality_updates.inc(); + update_metrics.processed_finality_updates.inc(); + } + } + Ok(Err(sync_update::Error::ReplayBackRequired { .. })) => { + log::info!("Replay back within the main loop. Exiting"); + return; + } + Ok(Err(e)) => log::info!("The program failed with: {e:?}. Skipping"), + Err(e) => { + log::error!("{e:?}"); + return; + } + } + } +} diff --git a/relayer/src/ethereum_checkpoints/replay_back.rs b/relayer/src/ethereum_checkpoints/replay_back.rs new file mode 100644 index 00000000..3443c381 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/replay_back.rs @@ -0,0 +1,251 @@ +use super::*; +use checkpoint_light_client_io::BeaconBlockHeader; +use utils::ErrorNotFound; + +#[allow(clippy::too_many_arguments)] +pub async fn execute( + client_http: &Client, + beacon_endpoint: &str, + client: &GearApi, + listener: &mut EventListener, + program_id: [u8; 32], + gas_limit: u64, + replayed_slot: Option, + checkpoint: (Slot, Hash256), + sync_update: SyncCommitteeUpdate, +) -> AnyResult<()> { + log::info!("Replaying back started"); + + let (mut slot_start, _) = checkpoint; + if let Some(slot_end) = replayed_slot { + let slots_batch_iter = SlotsBatchIter::new(slot_start, slot_end, SIZE_BATCH) + .ok_or(anyhow!("Failed to create slots_batch::Iter with slot_start = {slot_start}, slot_end = {slot_end}."))?; + + replay_back_slots( + client_http, + beacon_endpoint, + client, + listener, + program_id, + gas_limit, + slots_batch_iter, + ) + .await?; + + log::info!("The ongoing replaying back finished"); + + return Ok(()); + } + + let period_start = 1 + eth_utils::calculate_period(slot_start); + let updates = utils::get_updates( + client_http, + beacon_endpoint, + period_start, + MAX_REQUEST_LIGHT_CLIENT_UPDATES, + ) + .await + .map_err(|e| anyhow!("Failed to get updates for period {period_start}: {e:?}"))?; + + let slot_last = sync_update.finalized_header.slot; + for update in updates { + let slot_end = update.data.finalized_header.slot; + let mut slots_batch_iter = SlotsBatchIter::new(slot_start, slot_end, SIZE_BATCH) + .ok_or(anyhow!("Failed to create slots_batch::Iter with slot_start = {slot_start}, slot_end = {slot_end}."))?; + + slot_start = slot_end; + + let signature = ::deserialize_compressed( + &update.data.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .map_err(|e| anyhow!("Failed to deserialize point on G2 (replay back): {e:?}"))?; + + let sync_update = utils::sync_update_from_update(signature, update.data); + replay_back_slots_start( + client_http, + beacon_endpoint, + client, + listener, + program_id, + gas_limit, + slots_batch_iter.next(), + sync_update, + ) + .await?; + + replay_back_slots( + client_http, + beacon_endpoint, + client, + listener, + program_id, + gas_limit, + slots_batch_iter, + ) + .await?; + + if slot_end == slot_last { + // the provided sync_update is a sync committee update + return Ok(()); + } + } + + let mut slots_batch_iter = SlotsBatchIter::new(slot_start, slot_last, SIZE_BATCH) + .ok_or(anyhow!("Failed to create slots_batch::Iter with slot_start = {slot_start}, slot_last = {slot_last}."))?; + + replay_back_slots_start( + client_http, + beacon_endpoint, + client, + listener, + program_id, + gas_limit, + slots_batch_iter.next(), + sync_update, + ) + .await?; + + replay_back_slots( + client_http, + beacon_endpoint, + client, + listener, + program_id, + gas_limit, + slots_batch_iter, + ) + .await?; + + log::info!("Replaying back finished"); + + Ok(()) +} + +async fn replay_back_slots( + client_http: &Client, + beacon_endpoint: &str, + client: &GearApi, + listener: &mut EventListener, + program_id: [u8; 32], + gas_limit: u64, + slots_batch_iter: SlotsBatchIter, +) -> AnyResult<()> { + for (slot_start, slot_end) in slots_batch_iter { + replay_back_slots_inner( + client_http, + beacon_endpoint, + client, + listener, + program_id, + slot_start, + slot_end, + gas_limit, + ) + .await?; + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn replay_back_slots_inner( + client_http: &Client, + beacon_endpoint: &str, + client: &GearApi, + listener: &mut EventListener, + program_id: [u8; 32], + slot_start: Slot, + slot_end: Slot, + gas_limit: u64, +) -> AnyResult<()> { + let payload = Handle::ReplayBack( + request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?, + ); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await + .map_err(|e| anyhow!("Failed to send ReplayBack message: {e:?}"))?; + + let (_message_id, payload, _value) = listener + .reply_bytes_on(message_id) + .await + .map_err(|e| anyhow!("Failed to get reply to ReplayBack message: {e:?}"))?; + let payload = + payload.map_err(|e| anyhow!("Failed to get replay payload to ReplayBack: {e:?}"))?; + let result_decoded = HandleResult::decode(&mut &payload[..]) + .map_err(|e| anyhow!("Failed to decode HandleResult of ReplayBack: {e:?}"))?; + + log::debug!("replay_back_slots_inner; result_decoded = {result_decoded:?}"); + + match result_decoded { + HandleResult::ReplayBack(Some(_)) => Ok(()), + HandleResult::ReplayBack(None) => Err(anyhow!("Replaying back wasn't started")), + _ => Err(anyhow!("Wrong handle result to ReplayBack")), + } +} + +#[allow(clippy::too_many_arguments)] +async fn replay_back_slots_start( + client_http: &Client, + beacon_endpoint: &str, + client: &GearApi, + listener: &mut EventListener, + program_id: [u8; 32], + gas_limit: u64, + slots: Option<(Slot, Slot)>, + sync_update: SyncCommitteeUpdate, +) -> AnyResult<()> { + let Some((slot_start, slot_end)) = slots else { + return Ok(()); + }; + + let payload = Handle::ReplayBackStart { + sync_update, + headers: request_headers(client_http, beacon_endpoint, slot_start, slot_end).await?, + }; + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await + .map_err(|e| anyhow!("Failed to send ReplayBackStart message: {e:?}"))?; + + let (_message_id, payload, _value) = listener + .reply_bytes_on(message_id) + .await + .map_err(|e| anyhow!("Failed to get reply to ReplayBackStart message: {e:?}"))?; + let payload = + payload.map_err(|e| anyhow!("Failed to get replay payload to ReplayBackStart: {e:?}"))?; + let result_decoded = HandleResult::decode(&mut &payload[..]) + .map_err(|e| anyhow!("Failed to decode HandleResult of ReplayBackStart: {e:?}"))?; + + log::debug!("replay_back_slots_start; result_decoded = {result_decoded:?}"); + + match result_decoded { + HandleResult::ReplayBackStart(Ok(_)) => Ok(()), + HandleResult::ReplayBackStart(Err(e)) => Err(anyhow!("ReplayBackStart failed: {e:?}")), + _ => Err(anyhow!("Wrong handle result to ReplayBackStart")), + } +} + +async fn request_headers( + client_http: &Client, + beacon_endpoint: &str, + slot_start: Slot, + slot_end: Slot, +) -> AnyResult> { + let batch_size = (slot_end - slot_start) as usize; + let mut requests_headers = Vec::with_capacity(batch_size); + for i in slot_start..slot_end { + requests_headers.push(utils::get_block_header(client_http, beacon_endpoint, i)); + } + + futures::future::join_all(requests_headers) + .await + .into_iter() + .filter(|maybe_header| !matches!(maybe_header, Err(e) if e.downcast_ref::().is_some())) + .collect::, _>>() + .map_err(|e| { + anyhow!("Failed to fetch block headers ([{slot_start}; {slot_end})): {e:?}") + }) +} diff --git a/relayer/src/ethereum_checkpoints/sync_update.rs b/relayer/src/ethereum_checkpoints/sync_update.rs new file mode 100644 index 00000000..dad03014 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/sync_update.rs @@ -0,0 +1,104 @@ +use super::*; +pub use checkpoint_light_client_io::sync_update::Error; +use std::ops::ControlFlow::{self, *}; + +pub fn spawn_receiver( + client_http: Client, + beacon_endpoint: String, + sender: Sender, +) { + tokio::spawn(async move { + log::info!("Update receiver spawned"); + + let mut failures = 0; + loop { + match receive(&client_http, &beacon_endpoint, &sender).await { + Ok(Break(_)) => break, + Ok(Continue(_)) => (), + Err(e) => { + log::error!("{e:?}"); + + failures += 1; + if failures >= COUNT_FAILURE { + break; + } + } + } + + time::sleep(Duration::from_secs(DELAY_SECS_UPDATE_REQUEST)).await; + } + }); +} + +async fn receive( + client_http: &Client, + beacon_endpoint: &str, + sender: &Sender, +) -> AnyResult> { + let finality_update = utils::get_finality_update(client_http, beacon_endpoint) + .await + .map_err(|e| anyhow!("Unable to fetch FinalityUpdate: {e:?}"))?; + + let period = eth_utils::calculate_period(finality_update.finalized_header.slot); + let mut updates = utils::get_updates(client_http, beacon_endpoint, period, 1) + .await + .map_err(|e| anyhow!("Unable to fetch Updates: {e:?}"))?; + + let update = match updates.pop() { + Some(update) if updates.is_empty() => update.data, + _ => return Err(anyhow!("Requested single update")), + }; + + let reader_signature = if update.finalized_header.slot >= finality_update.finalized_header.slot + { + &update.sync_aggregate.sync_committee_signature.0 .0[..] + } else { + &finality_update.sync_aggregate.sync_committee_signature.0 .0[..] + }; + + let signature = + ::deserialize_compressed(reader_signature) + .map_err(|e| anyhow!("Failed to deserialize point on G2: {e:?}"))?; + + let sync_update = if update.finalized_header.slot >= finality_update.finalized_header.slot { + utils::sync_update_from_update(signature, update) + } else { + utils::sync_update_from_finality(signature, finality_update) + }; + + if sender.send(sync_update).await.is_err() { + return Ok(Break(())); + } + + Ok(Continue(())) +} + +pub async fn try_to_apply( + client: &GearApi, + listener: &mut EventListener, + program_id: [u8; 32], + sync_update: SyncCommitteeUpdate, + gas_limit: u64, +) -> AnyResult> { + let payload = Handle::SyncUpdate(sync_update); + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await + .map_err(|e| anyhow!("Failed to send message: {e:?}"))?; + + let (_message_id, payload, _value) = listener + .reply_bytes_on(message_id) + .await + .map_err(|e| anyhow!("Failed to get reply: {e:?}"))?; + let payload = + payload.map_err(|e| anyhow!("Failed to get replay payload to SyncUpdate: {e:?}"))?; + let result_decoded = HandleResult::decode(&mut &payload[..]) + .map_err(|e| anyhow!("Failed to decode HandleResult of SyncUpdate: {e:?}"))?; + + log::debug!("try_to_apply; result_decoded = {result_decoded:?}"); + + match result_decoded { + HandleResult::SyncUpdate(result) => Ok(result), + _ => Err(anyhow!("Wrong response type")), + } +} diff --git a/relayer/src/ethereum_checkpoints/tests/mod.rs b/relayer/src/ethereum_checkpoints/tests/mod.rs new file mode 100644 index 00000000..b7b4418a --- /dev/null +++ b/relayer/src/ethereum_checkpoints/tests/mod.rs @@ -0,0 +1,506 @@ +use super::utils::{self, slots_batch, FinalityUpdateResponse}; +use checkpoint_light_client::WASM_BINARY; +use checkpoint_light_client_io::{ + ethereum_common::{ + base_types::BytesFixed, network::Network, utils as eth_utils, SLOTS_PER_EPOCH, + }, + replay_back, sync_update, + tree_hash::TreeHash, + Handle, HandleResult, Init, G2, +}; +use gclient::{EventListener, EventProcessor, GearApi, Result}; +use parity_scale_codec::{Decode, Encode}; +use reqwest::Client; +use tokio::time::{self, Duration}; + +const RPC_URL: &str = "http://127.0.0.1:5052"; + +const FINALITY_UPDATE_5_254_112: &[u8; 4_940] = + include_bytes!("./sepolia-finality-update-5_254_112.json"); +const FINALITY_UPDATE_5_263_072: &[u8; 4_941] = + include_bytes!("./sepolia-finality-update-5_263_072.json"); + +async fn common_upload_program( + client: &GearApi, + code: Vec, + payload: impl Encode, +) -> Result<([u8; 32], [u8; 32])> { + let encoded_payload = payload.encode(); + let gas_limit = client + .calculate_upload_gas(None, code.clone(), encoded_payload, 0, true) + .await? + .min_limit; + println!("init gas {gas_limit:?}"); + let (message_id, program_id, _) = client + .upload_program( + code, + gclient::now_micros().to_le_bytes(), + payload, + gas_limit, + 0, + ) + .await?; + + Ok((message_id.into(), program_id.into())) +} + +async fn upload_program( + client: &GearApi, + listener: &mut EventListener, + payload: impl Encode, +) -> Result<[u8; 32]> { + let (message_id, program_id) = + common_upload_program(client, WASM_BINARY.to_vec(), payload).await?; + + assert!(listener + .message_processed(message_id.into()) + .await? + .succeed()); + + Ok(program_id) +} + +async fn init(network: Network) -> Result<()> { + let client_http = Client::new(); + + // use the latest finality header as a checkpoint for bootstrapping + let finality_update = utils::get_finality_update(&client_http, RPC_URL).await?; + let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot); + let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?; + + println!( + "finality_update slot = {}, period = {}", + finality_update.finalized_header.slot, current_period + ); + + let update = match updates.pop() { + Some(update) if updates.is_empty() => update.data, + _ => unreachable!("Requested single update"), + }; + + let checkpoint = update.finalized_header.tree_hash_root(); + let checkpoint_hex = hex::encode(checkpoint); + + println!( + "checkpoint slot = {}, hash = {}", + update.finalized_header.slot, checkpoint_hex + ); + + let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?; + + let signature = ::deserialize_compressed( + &update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + let sync_update = utils::sync_update_from_update(signature, update); + + println!("bootstrap slot = {}", bootstrap.header.slot); + + let pub_keys = utils::map_public_keys(&bootstrap.current_sync_committee.pubkeys); + let init = Init { + network, + sync_committee_current_pub_keys: pub_keys, + sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, + sync_committee_current_branch: bootstrap + .current_sync_committee_branch + .into_iter() + .map(|BytesFixed(bytes)| bytes.0) + .collect(), + update: sync_update, + }; + + let client = GearApi::dev().await?; + let mut listener = client.subscribe().await?; + + let program_id = upload_program(&client, &mut listener, init).await?; + + println!("program_id = {:?}", hex::encode(program_id)); + + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn init_sepolia() -> Result<()> { + init(Network::Sepolia).await +} + +#[ignore] +#[tokio::test] +async fn init_holesky() -> Result<()> { + init(Network::Holesky).await +} + +#[ignore] +#[tokio::test] +async fn init_mainnet() -> Result<()> { + init(Network::Mainnet).await +} + +#[ignore] +#[tokio::test] +async fn init_and_updating() -> Result<()> { + let client_http = Client::new(); + + // use the latest finality header as a checkpoint for bootstrapping + let finality_update = utils::get_finality_update(&client_http, RPC_URL).await?; + let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot); + let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?; + + println!( + "finality_update slot = {}, period = {}", + finality_update.finalized_header.slot, current_period + ); + + let update = match updates.pop() { + Some(update) if updates.is_empty() => update.data, + _ => unreachable!("Requested single update"), + }; + + let checkpoint = update.finalized_header.tree_hash_root(); + let checkpoint_hex = hex::encode(checkpoint); + + println!( + "checkpoint slot = {}, hash = {}", + update.finalized_header.slot, checkpoint_hex + ); + + let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?; + + let signature = ::deserialize_compressed( + &update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + let sync_update = utils::sync_update_from_update(signature, update); + + println!("bootstrap slot = {}", bootstrap.header.slot); + + let pub_keys = utils::map_public_keys(&bootstrap.current_sync_committee.pubkeys); + let init = Init { + network: Network::Holesky, + sync_committee_current_pub_keys: pub_keys, + sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, + sync_committee_current_branch: bootstrap + .current_sync_committee_branch + .into_iter() + .map(|BytesFixed(bytes)| bytes.0) + .collect(), + update: sync_update, + }; + + let client = GearApi::dev().await?; + let mut listener = client.subscribe().await?; + + let program_id = upload_program(&client, &mut listener, init).await?; + + println!("program_id = {:?}", hex::encode(program_id)); + + println!(); + println!(); + + for _ in 0..30 { + let update = utils::get_finality_update(&client_http, RPC_URL).await?; + + let slot: u64 = update.finalized_header.slot; + let current_period = eth_utils::calculate_period(slot); + let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?; + match updates.pop() { + Some(update) if updates.is_empty() && update.data.finalized_header.slot >= slot => { + println!("update sync committee"); + let signature = + ::deserialize_compressed( + &update.data.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + let payload = + Handle::SyncUpdate(utils::sync_update_from_update(signature, update.data)); + let gas_limit = client + .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) + .await? + .min_limit; + println!("update gas_limit {gas_limit:?}"); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await?; + + let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; + let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); + assert!( + matches!(result_decoded, HandleResult::SyncUpdate(result) if result.is_ok()) + ); + } + + _ => { + println!( + "slot = {slot:?}, attested slot = {:?}, signature slot = {:?}", + update.attested_header.slot, update.signature_slot + ); + let signature = ::deserialize_compressed( + &update.sync_aggregate.sync_committee_signature.0 .0[..], + ); + + let Ok(signature) = signature else { + println!("failed to deserialize point on G2"); + continue; + }; + + let payload = + Handle::SyncUpdate(utils::sync_update_from_finality(signature, update)); + + let gas_limit = client + .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) + .await? + .min_limit; + println!("finality_update gas_limit {gas_limit:?}"); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await?; + + let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; + let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); + assert!( + matches!(result_decoded, HandleResult::SyncUpdate(result) if result.is_ok()) + ); + } + } + + println!(); + println!(); + + time::sleep(Duration::from_secs(6 * 60)).await; + } + + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn replaying_back() -> Result<()> { + let client_http = Client::new(); + + let finality_update: FinalityUpdateResponse = + serde_json::from_slice(FINALITY_UPDATE_5_254_112).unwrap(); + let finality_update = finality_update.data; + println!( + "finality_update slot = {}", + finality_update.finalized_header.slot + ); + + // This SyncCommittee operated for about 13K slots, so we make adjustments + let current_period = eth_utils::calculate_period(finality_update.finalized_header.slot); + let mut updates = utils::get_updates(&client_http, RPC_URL, current_period - 1, 1).await?; + + let update = match updates.pop() { + Some(update) if updates.is_empty() => update.data, + _ => unreachable!("Requested single update"), + }; + let checkpoint = update.finalized_header.tree_hash_root(); + let checkpoint_hex = hex::encode(checkpoint); + + let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?; + println!("bootstrap slot = {}", bootstrap.header.slot); + + println!("update slot = {}", update.finalized_header.slot); + let signature = ::deserialize_compressed( + &update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + let sync_update = utils::sync_update_from_update(signature, update); + let slot_start = sync_update.finalized_header.slot; + let slot_end = finality_update.finalized_header.slot; + println!( + "Replaying back from {slot_start} to {slot_end} ({} headers)", + slot_end - slot_start + ); + + let pub_keys = utils::map_public_keys(&bootstrap.current_sync_committee.pubkeys); + let init = Init { + network: Network::Sepolia, + sync_committee_current_pub_keys: pub_keys, + sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, + sync_committee_current_branch: bootstrap + .current_sync_committee_branch + .into_iter() + .map(|BytesFixed(bytes)| bytes.0) + .collect(), + update: sync_update, + }; + + let client = GearApi::dev().await?; + let mut listener = client.subscribe().await?; + + let program_id = upload_program(&client, &mut listener, init).await?; + + println!("program_id = {:?}", hex::encode(program_id)); + + println!(); + println!(); + + let batch_size = 44 * SLOTS_PER_EPOCH; + let mut slots_batch_iter = slots_batch::Iter::new(slot_start, slot_end, batch_size).unwrap(); + // start to replay back + if let Some((slot_start, slot_end)) = slots_batch_iter.next() { + let mut requests_headers = Vec::with_capacity(batch_size as usize); + for i in slot_start..slot_end { + requests_headers.push(utils::get_block_header(&client_http, RPC_URL, i)); + } + + let headers = futures::future::join_all(requests_headers) + .await + .into_iter() + .filter_map(|maybe_header| maybe_header.ok()) + .collect::>(); + + let signature = ::deserialize_compressed( + &finality_update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + + let payload = Handle::ReplayBackStart { + sync_update: utils::sync_update_from_finality(signature, finality_update), + headers, + }; + + let gas_limit = client + .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) + .await? + .min_limit; + println!("ReplayBackStart gas_limit {gas_limit:?}"); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await?; + + let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; + let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); + assert!(matches!( + result_decoded, + HandleResult::ReplayBackStart(Ok(replay_back::StatusStart::InProgress)) + )); + } + + // replaying the blocks back + for (slot_start, slot_end) in slots_batch_iter { + let mut requests_headers = Vec::with_capacity(batch_size as usize); + for i in slot_start..slot_end { + requests_headers.push(utils::get_block_header(&client_http, RPC_URL, i)); + } + + let headers = futures::future::join_all(requests_headers) + .await + .into_iter() + .filter_map(|maybe_header| maybe_header.ok()) + .collect::>(); + + let payload = Handle::ReplayBack(headers); + + let gas_limit = client + .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) + .await? + .min_limit; + println!("ReplayBack gas_limit {gas_limit:?}"); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await?; + + let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; + let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); + assert!(matches!( + result_decoded, + HandleResult::ReplayBack(Some( + replay_back::Status::InProcess | replay_back::Status::Finished + )) + )); + } + + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn sync_update_requires_replaying_back() -> Result<()> { + let client_http = Client::new(); + + let finality_update: FinalityUpdateResponse = + serde_json::from_slice(FINALITY_UPDATE_5_263_072).unwrap(); + let finality_update = finality_update.data; + println!( + "finality_update slot = {}", + finality_update.finalized_header.slot + ); + + let slot = finality_update.finalized_header.slot; + let current_period = eth_utils::calculate_period(slot); + let mut updates = utils::get_updates(&client_http, RPC_URL, current_period, 1).await?; + + let update = match updates.pop() { + Some(update) if updates.is_empty() => update.data, + _ => unreachable!("Requested single update"), + }; + + let checkpoint = update.finalized_header.tree_hash_root(); + let checkpoint_hex = hex::encode(checkpoint); + + let bootstrap = utils::get_bootstrap(&client_http, RPC_URL, &checkpoint_hex).await?; + let signature = ::deserialize_compressed( + &update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + let sync_update = utils::sync_update_from_update(signature, update); + + let pub_keys = utils::map_public_keys(&bootstrap.current_sync_committee.pubkeys); + let init = Init { + network: Network::Sepolia, + sync_committee_current_pub_keys: pub_keys, + sync_committee_current_aggregate_pubkey: bootstrap.current_sync_committee.aggregate_pubkey, + sync_committee_current_branch: bootstrap + .current_sync_committee_branch + .into_iter() + .map(|BytesFixed(bytes)| bytes.0) + .collect(), + update: sync_update, + }; + + let client = GearApi::dev().await?; + let mut listener = client.subscribe().await?; + + let program_id = upload_program(&client, &mut listener, init).await?; + + println!("program_id = {:?}", hex::encode(program_id)); + + println!(); + println!(); + + println!( + "slot = {slot:?}, attested slot = {:?}, signature slot = {:?}", + finality_update.attested_header.slot, finality_update.signature_slot + ); + let signature = ::deserialize_compressed( + &finality_update.sync_aggregate.sync_committee_signature.0 .0[..], + ) + .unwrap(); + + let payload = Handle::SyncUpdate(utils::sync_update_from_finality(signature, finality_update)); + + let gas_limit = client + .calculate_handle_gas(None, program_id.into(), payload.encode(), 0, true) + .await? + .min_limit; + println!("finality_update gas_limit {gas_limit:?}"); + + let (message_id, _) = client + .send_message(program_id.into(), payload, gas_limit, 0) + .await?; + + let (_message_id, payload, _value) = listener.reply_bytes_on(message_id).await?; + let result_decoded = HandleResult::decode(&mut &payload.unwrap()[..]).unwrap(); + assert!(matches!( + result_decoded, + HandleResult::SyncUpdate(Err(sync_update::Error::ReplayBackRequired { .. })) + )); + + Ok(()) +} diff --git a/gear-programs/checkpoint-light-client/src/tests/sepolia-finality-update-5_254_112.json b/relayer/src/ethereum_checkpoints/tests/sepolia-finality-update-5_254_112.json similarity index 100% rename from gear-programs/checkpoint-light-client/src/tests/sepolia-finality-update-5_254_112.json rename to relayer/src/ethereum_checkpoints/tests/sepolia-finality-update-5_254_112.json diff --git a/gear-programs/checkpoint-light-client/src/tests/sepolia-finality-update-5_263_072.json b/relayer/src/ethereum_checkpoints/tests/sepolia-finality-update-5_263_072.json similarity index 100% rename from gear-programs/checkpoint-light-client/src/tests/sepolia-finality-update-5_263_072.json rename to relayer/src/ethereum_checkpoints/tests/sepolia-finality-update-5_263_072.json diff --git a/relayer/src/ethereum_checkpoints/utils/mod.rs b/relayer/src/ethereum_checkpoints/utils/mod.rs new file mode 100644 index 00000000..4552d246 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/utils/mod.rs @@ -0,0 +1,261 @@ +use anyhow::{Error as AnyError, Result as AnyResult}; +use ark_serialize::CanonicalDeserialize; +use checkpoint_light_client_io::{ + ethereum_common::{ + base_types::{BytesFixed, FixedArray}, + beacon::{BLSPubKey, Bytes32, SignedBeaconBlockHeader, SyncAggregate, SyncCommittee}, + utils as eth_utils, + }, + ArkScale, BeaconBlockHeader, G1TypeInfo, G2TypeInfo, SyncCommitteeKeys, SyncCommitteeUpdate, + G1, G2, SYNC_COMMITTEE_SIZE, +}; +use reqwest::{Client, RequestBuilder}; +use serde::{de::DeserializeOwned, Deserialize}; +use std::{cmp, error::Error, fmt}; + +pub mod slots_batch; + +// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#configuration +pub const MAX_REQUEST_LIGHT_CLIENT_UPDATES: u8 = 128; + +#[derive(Deserialize)] +#[serde(untagged)] +pub enum LightClientHeader { + Unwrapped(BeaconBlockHeader), + Wrapped(Beacon), +} + +#[derive(Deserialize)] +pub struct Beacon { + pub beacon: BeaconBlockHeader, +} + +#[derive(Deserialize, Debug)] +pub struct BeaconBlockHeaderResponse { + pub data: BeaconBlockHeaderData, +} + +#[derive(Deserialize, Debug)] +pub struct BeaconBlockHeaderData { + pub header: SignedBeaconBlockHeader, +} + +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +pub struct Bootstrap { + #[serde(deserialize_with = "deserialize_header")] + pub header: BeaconBlockHeader, + pub current_sync_committee: SyncCommittee, + pub current_sync_committee_branch: Vec, +} + +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +pub struct BootstrapResponse { + pub data: Bootstrap, +} + +pub fn deserialize_header<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let header: LightClientHeader = Deserialize::deserialize(deserializer)?; + + Ok(match header { + LightClientHeader::Unwrapped(header) => header, + LightClientHeader::Wrapped(header) => header.beacon, + }) +} + +#[derive(Deserialize)] +pub struct FinalityUpdateResponse { + pub data: FinalityUpdate, +} + +#[derive(Clone, Deserialize)] +pub struct FinalityUpdate { + #[serde(deserialize_with = "deserialize_header")] + pub attested_header: BeaconBlockHeader, + #[serde(deserialize_with = "deserialize_header")] + pub finalized_header: BeaconBlockHeader, + pub finality_branch: Vec, + pub sync_aggregate: SyncAggregate, + #[serde(deserialize_with = "eth_utils::deserialize_u64")] + pub signature_slot: u64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Update { + #[serde(deserialize_with = "deserialize_header")] + pub attested_header: BeaconBlockHeader, + pub next_sync_committee: SyncCommittee, + pub next_sync_committee_branch: Vec, + #[serde(deserialize_with = "deserialize_header")] + pub finalized_header: BeaconBlockHeader, + pub finality_branch: Vec, + pub sync_aggregate: SyncAggregate, + #[serde(deserialize_with = "eth_utils::deserialize_u64")] + pub signature_slot: u64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct UpdateData { + pub data: Update, +} + +pub type UpdateResponse = Vec; + +#[derive(Clone, Debug)] +pub struct ErrorNotFound; + +impl fmt::Display for ErrorNotFound { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("Not found (404)", f) + } +} + +impl Error for ErrorNotFound {} + +#[allow(dead_code)] +#[derive(Deserialize)] +struct CodeResponse { + code: u64, + message: String, +} + +pub async fn get(request_builder: RequestBuilder) -> AnyResult { + let bytes = request_builder + .send() + .await + .map_err(AnyError::from)? + .bytes() + .await + .map_err(AnyError::from)?; + + match serde_json::from_slice::(&bytes) { + Ok(code_response) if code_response.code == 404 => Err(ErrorNotFound.into()), + _ => Ok(serde_json::from_slice::(&bytes).map_err(AnyError::from)?), + } +} + +#[cfg(test)] +pub async fn get_bootstrap( + client: &Client, + rpc_url: &str, + checkpoint: &str, +) -> AnyResult { + let checkpoint_no_prefix = match checkpoint.starts_with("0x") { + true => &checkpoint[2..], + false => checkpoint, + }; + + let url = format!("{rpc_url}/eth/v1/beacon/light_client/bootstrap/0x{checkpoint_no_prefix}",); + + get::(client.get(&url)) + .await + .map(|response| response.data) +} + +pub async fn get_updates( + client: &Client, + rpc_url: &str, + period: u64, + count: u8, +) -> AnyResult { + let count = cmp::min(count, MAX_REQUEST_LIGHT_CLIENT_UPDATES); + let url = format!( + "{rpc_url}/eth/v1/beacon/light_client/updates?start_period={period}&count={count}", + ); + + get::(client.get(&url)).await +} + +pub async fn get_block_header( + client: &Client, + rpc_url: &str, + slot: u64, +) -> AnyResult { + let url = format!("{rpc_url}/eth/v1/beacon/headers/{slot}"); + + get::(client.get(&url)) + .await + .map(|response| response.data.header.message) +} + +pub async fn get_finality_update(client: &Client, rpc_url: &str) -> AnyResult { + let url = format!("{rpc_url}/eth/v1/beacon/light_client/finality_update"); + + get::(client.get(&url)) + .await + .map(|response| response.data) +} + +pub fn map_public_keys( + compressed_public_keys: &FixedArray, +) -> Box { + let keys = compressed_public_keys + .0 + .iter() + .map(|BytesFixed(pub_key_compressed)| { + let pub_key = ::deserialize_compressed_unchecked( + &pub_key_compressed.0[..], + ) + .expect("Public keys have the required size"); + + let ark_scale: ArkScale = G1TypeInfo(pub_key).into(); + + ark_scale + }) + .collect::>(); + + Box::new(FixedArray(keys.try_into().expect( + "The size of keys array is guaranteed on the type level", + ))) +} + +pub fn sync_update_from_finality( + signature: G2, + finality_update: FinalityUpdate, +) -> SyncCommitteeUpdate { + SyncCommitteeUpdate { + signature_slot: finality_update.signature_slot, + attested_header: finality_update.attested_header, + finalized_header: finality_update.finalized_header, + sync_aggregate: finality_update.sync_aggregate, + sync_committee_next_aggregate_pubkey: None, + sync_committee_signature: G2TypeInfo(signature).into(), + sync_committee_next_pub_keys: None, + sync_committee_next_branch: None, + finality_branch: finality_update + .finality_branch + .into_iter() + .map(|BytesFixed(array)| array.0) + .collect::<_>(), + } +} + +pub fn sync_update_from_update(signature: G2, update: Update) -> SyncCommitteeUpdate { + let next_sync_committee_keys = map_public_keys(&update.next_sync_committee.pubkeys); + + SyncCommitteeUpdate { + signature_slot: update.signature_slot, + attested_header: update.attested_header, + finalized_header: update.finalized_header, + sync_aggregate: update.sync_aggregate, + sync_committee_next_aggregate_pubkey: Some(update.next_sync_committee.aggregate_pubkey), + sync_committee_signature: G2TypeInfo(signature).into(), + sync_committee_next_pub_keys: Some(next_sync_committee_keys), + sync_committee_next_branch: Some( + update + .next_sync_committee_branch + .into_iter() + .map(|BytesFixed(array)| array.0) + .collect::<_>(), + ), + finality_branch: update + .finality_branch + .into_iter() + .map(|BytesFixed(array)| array.0) + .collect::<_>(), + } +} diff --git a/relayer/src/ethereum_checkpoints/utils/slots_batch.rs b/relayer/src/ethereum_checkpoints/utils/slots_batch.rs new file mode 100644 index 00000000..1fd30ac1 --- /dev/null +++ b/relayer/src/ethereum_checkpoints/utils/slots_batch.rs @@ -0,0 +1,78 @@ +use checkpoint_light_client_io::Slot; + +/// Iterator produces right open intervals of the specific size in backward direction. +pub struct Iter { + slot_start: Slot, + slot_end: Slot, + batch_size: Slot, +} + +impl Iter { + pub fn new(slot_start: Slot, slot_end: Slot, batch_size: Slot) -> Option { + if batch_size < 2 || slot_start >= slot_end { + return None; + } + + Some(Self { + slot_start, + slot_end, + batch_size, + }) + } +} + +impl Iterator for Iter { + // [slot_start; slot_end) + type Item = (Slot, Slot); + + fn next(&mut self) -> Option { + if self.slot_start + self.batch_size <= self.slot_end { + let slot_start = self.slot_end - self.batch_size + 1; + let slot_end = self.slot_end; + + self.slot_end = slot_start; + + return Some((slot_start, slot_end)); + } + + if self.slot_start < self.slot_end { + let slot_end = self.slot_end; + + self.slot_end = self.slot_start; + + return Some((self.slot_start, slot_end)); + } + + None + } +} + +#[test] +fn test_slots_batch_iterator() { + assert!(Iter::new(3, 10, 0).is_none()); + assert!(Iter::new(3, 10, 1).is_none()); + assert!(Iter::new(3, 3, 2).is_none()); + assert!(Iter::new(10, 3, 2).is_none()); + assert!(Iter::new(10, 3, 0).is_none()); + + let mut iter = Iter::new(3, 10, 2).unwrap(); + + // [9; 10), [8; 9), etc + assert_eq!(iter.next(), Some((9, 10))); + assert_eq!(iter.next(), Some((8, 9))); + assert_eq!(iter.next(), Some((7, 8))); + assert_eq!(iter.next(), Some((6, 7))); + assert_eq!(iter.next(), Some((5, 6))); + assert_eq!(iter.next(), Some((4, 5))); + assert_eq!(iter.next(), Some((3, 4))); + assert!(iter.next().is_none()); + + let mut iter = Iter::new(3, 10, 3).unwrap(); + + // [8; 10), [6; 8), [4; 6), [3; 4) + assert_eq!(iter.next(), Some((8, 10))); + assert_eq!(iter.next(), Some((6, 8))); + assert_eq!(iter.next(), Some((4, 6))); + assert_eq!(iter.next(), Some((3, 4))); + assert!(iter.next().is_none()); +} diff --git a/relayer/src/main.rs b/relayer/src/main.rs index 6abb31e8..0d08fd52 100644 --- a/relayer/src/main.rs +++ b/relayer/src/main.rs @@ -11,6 +11,7 @@ use prover::proving::GenesisConfig; use relay_merkle_roots::MerkleRootRelayer; use utils_prometheus::MetricsBuilder; +mod ethereum_checkpoints; mod message_relayer; mod proof_storage; mod prover_interface; @@ -37,6 +38,7 @@ struct Cli { command: CliCommands, } +#[allow(clippy::enum_variant_names)] #[derive(Subcommand)] enum CliCommands { /// Start service constantly relaying messages to ethereum @@ -45,6 +47,8 @@ enum CliCommands { /// Relay message to ethereum #[clap(visible_alias("rm"))] RelayMessages(RelayMessagesArgs), + /// Start service constantly relaying Ethereum checkpoints to the Vara program + RelayCheckpoints(RelayCheckpointsArgs), } #[derive(Args)] @@ -124,6 +128,35 @@ struct ProofStorageArgs { gear_fee_payer: Option, } +#[derive(Args)] +struct RelayCheckpointsArgs { + /// Specify ProgramId of the Checkpoint-light-client program + #[arg(long, env = "CHECKPOINT_LIGHT_CLIENT_ADDRESS")] + program_id: String, + + /// Specify an endpoint providing Beacon API + #[arg(long, env = "BEACON_ENDPOINT")] + beacon_endpoint: String, + + /// Domain of the VARA RPC endpoint + #[arg(long, default_value = "ws://127.0.0.1", env = "VARA_DOMAIN")] + vara_domain: String, + + /// Port of the VARA RPC endpoint + #[arg(long, default_value = "9944", env = "VARA_PORT")] + vara_port: u16, + + /// Substrate URI that identifies a user by a mnemonic phrase or + /// provides default users from the keyring (e.g., "//Alice", "//Bob", + /// etc.). The password for URI should be specified in the same `suri`, + /// separated by the ':' char + #[arg(long, default_value = "//Alice", env = "VARA_SURI")] + vara_suri: String, + + #[clap(flatten)] + prometheus_args: PrometheusArgs, +} + #[tokio::main] async fn main() { let _ = dotenv::dotenv(); @@ -136,6 +169,7 @@ async fn main() { .filter(Some("ethereum-client"), log::LevelFilter::Info) .filter(Some("metrics"), log::LevelFilter::Info) .format_timestamp(Some(TimestampPrecision::Seconds)) + .parse_default_env() .init(); let cli = Cli::parse(); @@ -201,6 +235,8 @@ async fn main() { relayer.run().await.unwrap(); } + + CliCommands::RelayCheckpoints(args) => ethereum_checkpoints::relay(args).await, }; }