Skip to content

Commit

Permalink
feat : updated l2 messaging implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ocdbytes committed Dec 27, 2024
1 parent 62fb337 commit d4aa988
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 68 deletions.
21 changes: 0 additions & 21 deletions crates/client/settlement_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use crate::eth::StarknetCoreContract::StarknetCoreContractInstance;
use crate::gas_price::L1BlockMetrics;
use crate::state_update::StateUpdate;
use alloy::contract::Event;
use alloy::providers::RootProvider;
use alloy::sol_types::SolEvent;
use alloy::transports::http::{Client, Http};
use async_trait::async_trait;
use mc_db::l1_db::LastSyncedEventBlock;
use mc_db::MadaraBackend;
Expand All @@ -15,21 +10,6 @@ use starknet_api::transaction::L1HandlerTransaction;
use starknet_types_core::felt::Felt;
use std::sync::Arc;

pub enum CoreContractInstance {
Ethereum(StarknetCoreContractInstance<Http<Client>, RootProvider<Http<Client>>>),
Starknet(Felt),
}

impl CoreContractInstance {
#[allow(clippy::type_complexity)]
pub fn event_filter<T: SolEvent>(&self) -> anyhow::Result<Event<Http<Client>, &RootProvider<Http<Client>>, T>> {
match self {
CoreContractInstance::Ethereum(contract) => Ok(contract.event_filter()),
CoreContractInstance::Starknet(_) => Err(anyhow::anyhow!("Starknet doesn't support event filters")),
}
}
}

#[async_trait]
pub trait ClientTrait: Send + Sync {
// Configuration type used for initialization
Expand All @@ -39,7 +19,6 @@ pub trait ClientTrait: Send + Sync {

// Basic getter functions
fn get_l1_block_metrics(&self) -> &L1BlockMetrics;
fn get_core_contract_instance(&self) -> CoreContractInstance;

// Create a new instance of the client
async fn new(config: Self::Config) -> anyhow::Result<Self>
Expand Down
16 changes: 5 additions & 11 deletions crates/client/settlement_client/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::client::{ClientTrait, CoreContractInstance};
use crate::client::ClientTrait;
use crate::eth::StarknetCoreContract::{LogMessageToL2, StarknetCoreContractInstance};
use crate::gas_price::L1BlockMetrics;
use crate::state_update::{update_l1, StateUpdate};
Expand Down Expand Up @@ -68,10 +68,6 @@ impl ClientTrait for EthereumClient {
&self.l1_block_metrics
}

fn get_core_contract_instance(&self) -> CoreContractInstance {
CoreContractInstance::Ethereum(self.l1_core_contract.clone())
}

/// Create a new EthereumClient instance with the given RPC URL
async fn new(config: EthereumClientConfig) -> anyhow::Result<Self> {
let provider = ProviderBuilder::new().on_http(config.url);
Expand Down Expand Up @@ -152,10 +148,9 @@ impl ClientTrait for EthereumClient {
mut ctx: ServiceContext,
) -> anyhow::Result<()> {
// Listen to LogStateUpdate (0x77552641) update and send changes continuously
let contract_instance = self.get_core_contract_instance();
let event_filter = contract_instance.event_filter::<StarknetCoreContract::LogStateUpdate>();
let event_filter = self.l1_core_contract.event_filter::<StarknetCoreContract::LogStateUpdate>();

let mut event_stream = match ctx.run_until_cancelled(event_filter?.watch()).await {
let mut event_stream = match ctx.run_until_cancelled(event_filter.watch()).await {
Some(res) => res.context(ERR_ARCHIVE)?.into_stream(),
None => return anyhow::Ok(()),
};
Expand All @@ -178,10 +173,9 @@ impl ClientTrait for EthereumClient {
chain_id: ChainId,
mempool: Arc<Mempool>,
) -> anyhow::Result<()> {
let contract_instance = self.get_core_contract_instance();
let event_filter = contract_instance.event_filter::<LogMessageToL2>();
let event_filter = self.l1_core_contract.event_filter::<LogMessageToL2>();

let mut event_stream = event_filter?
let mut event_stream = event_filter
.from_block(last_synced_event_block.block_number)
.to_block(BlockNumberOrTag::Finalized)
.watch()
Expand Down
76 changes: 57 additions & 19 deletions crates/client/settlement_client/src/starknet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::client::{ClientTrait, CoreContractInstance};
use crate::client::ClientTrait;
use crate::gas_price::L1BlockMetrics;
use crate::messaging::MessageSent;
use crate::state_update::{update_l1, StateUpdate};
Expand Down Expand Up @@ -62,10 +62,6 @@ impl ClientTrait for StarknetClient {
&self.l1_block_metrics
}

fn get_core_contract_instance(&self) -> CoreContractInstance {
CoreContractInstance::Starknet(self.l2_core_contract)
}

async fn new(config: Self::Config) -> anyhow::Result<Self>
where
Self: Sized,
Expand Down Expand Up @@ -201,13 +197,17 @@ impl ClientTrait for StarknetClient {
let mut sync_flag = false;

loop {
let latest_block_number = self.get_latest_block_number().await?;
let starting_block = if !sync_flag {
last_synced_event_block.block_number
} else if latest_block_number < 100 {
0
} else {
latest_block_number - 100
};
let events_response = ctx.run_until_cancelled(self.get_events(
BlockId::Number(if !sync_flag {
last_synced_event_block.block_number
} else {
self.get_latest_block_number().await?
}),
BlockId::Number(self.get_latest_block_number().await?),
BlockId::Number(starting_block),
BlockId::Number(latest_block_number),
self.l2_core_contract,
vec![get_selector_from_name("MessageSent")?],
));
Expand All @@ -218,6 +218,11 @@ impl ClientTrait for StarknetClient {
match events_response.await {
Some(Ok(emitted_events)) => {
for event in emitted_events {
// Check if the message is already processed, if yes then skip that message
if backend.has_l1_messaging_nonce(Nonce(event.data[4]))? {
continue;
}

tracing::info!(
"🔵 Processing L2 Message from block: {:?}, transaction_hash: {:?}, fromAddress: {:?}",
event.block_number,
Expand Down Expand Up @@ -482,6 +487,12 @@ pub mod starknet_client_tests {
use crate::starknet::{StarknetClient, StarknetClientConfig};
use crate::state_update::StateUpdate;
use serial_test::serial;
use starknet_accounts::ConnectedAccount;
use starknet_core::types::BlockId;
use starknet_core::types::MaybePendingBlockWithTxHashes::{Block, PendingBlock};
use starknet_providers::jsonrpc::HttpTransport;
use starknet_providers::ProviderError::StarknetError;
use starknet_providers::{JsonRpcClient, Provider};
use starknet_types_core::felt::Felt;
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -552,8 +563,7 @@ pub mod starknet_client_tests {
)
.await?;

// It takes time on madara for events to be stored
sleep(Duration::from_secs(10)).await;
poll_on_block_completion(last_event_block_number, account.provider(), 100).await?;

let latest_event_block_number = starknet_client.get_last_event_block_number().await?;
assert_eq!(latest_event_block_number, last_event_block_number, "Latest event should have block number 100");
Expand All @@ -574,13 +584,13 @@ pub mod starknet_client_tests {

// sending state updates :
let data_felt = Felt::from_hex("0xdeadbeef")?;
send_state_update(
let block_number = send_state_update(
&account,
deployed_address,
StateUpdate { block_number: 100, global_root: data_felt, block_hash: data_felt },
)
.await?;
sleep(Duration::from_secs(5)).await;
poll_on_block_completion(block_number, account.provider(), 100).await?;

let last_verified_block_hash = starknet_client.get_last_verified_block_hash().await?;
assert_eq!(last_verified_block_hash, data_felt, "Block hash should match");
Expand All @@ -602,13 +612,13 @@ pub mod starknet_client_tests {

// sending state updates :
let data_felt = Felt::from_hex("0xdeadbeef")?;
send_state_update(
let block_number = send_state_update(
&account,
deployed_address,
StateUpdate { block_number: 100, global_root: data_felt, block_hash: data_felt },
)
.await?;
sleep(Duration::from_secs(5)).await;
poll_on_block_completion(block_number, account.provider(), 100).await?;

let last_verified_state_root = starknet_client.get_last_state_root().await?;
assert_eq!(last_verified_state_root, data_felt, "Last state root should match");
Expand All @@ -631,17 +641,45 @@ pub mod starknet_client_tests {
// sending state updates :
let data_felt = Felt::from_hex("0xdeadbeef")?;
let block_number = 100;
send_state_update(
let event_block_number = send_state_update(
&account,
deployed_address,
StateUpdate { block_number, global_root: data_felt, block_hash: data_felt },
)
.await?;
sleep(Duration::from_secs(5)).await;
poll_on_block_completion(event_block_number, account.provider(), 100).await?;

let last_verified_block_number = starknet_client.get_last_verified_block_number().await?;
assert_eq!(last_verified_block_number, block_number, "Last verified block should match");

Ok(())
}

const RETRY_DELAY: Duration = Duration::from_millis(100);

pub async fn poll_on_block_completion(
block_number: u64,
provider: &JsonRpcClient<HttpTransport>,
max_retries: u64,
) -> anyhow::Result<()> {
for try_count in 0..=max_retries {
match provider.get_block_with_tx_hashes(BlockId::Number(block_number)).await {
Ok(Block(_)) => {
return Ok(());
}
Ok(PendingBlock(_)) | Err(StarknetError(starknet_core::types::StarknetError::BlockNotFound)) => {
if try_count == max_retries {
return Err(anyhow::anyhow!("Max retries reached while polling for block {}", block_number));
}
sleep(RETRY_DELAY).await;
}
Err(e) => {
return Err(anyhow::anyhow!("Provider error while polling block {}: {}", block_number, e));
}
}
}

// This line should never be reached due to the return in the loop
Err(anyhow::anyhow!("Max retries reached while polling for block {}", block_number))
}
}
26 changes: 12 additions & 14 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ async fn main() -> anyhow::Result<()> {
)
.await
.context("Initializing the l1 sync service")?,
MadaraSettlementLayer::Starknet => {
L1SyncService::<StarknetClientConfig, MessageSent>::create(
&run_cmd.l1_sync_params,
&service_db,
l1_gas_setter,
chain_config.chain_id.clone(),
chain_config.eth_core_contract_address.clone(),
run_cmd.is_sequencer(),
run_cmd.is_devnet(),
Arc::clone(&mempool),
)
.await
.context("Initializing the l1 sync service")?
}
MadaraSettlementLayer::Starknet => L1SyncService::<StarknetClientConfig, MessageSent>::create(
&run_cmd.l1_sync_params,
&service_db,
l1_gas_setter,
chain_config.chain_id.clone(),
chain_config.eth_core_contract_address.clone(),
run_cmd.is_sequencer(),
run_cmd.is_devnet(),
Arc::clone(&mempool),
)
.await
.context("Initializing the l1 sync service")?,
};

// L2 Sync
Expand Down
5 changes: 2 additions & 3 deletions crates/node/src/service/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ impl StarknetSyncService {
.context("Creating starknet client")?;

// StarknetClientConfig, Arc<JsonRpcClient<HttpTransport>>, Felt
let client_converted: Box<
dyn ClientTrait<Config = StarknetClientConfig, EventStruct = MessageSent>,
> = Box::new(client);
let client_converted: Box<dyn ClientTrait<Config = StarknetClientConfig, EventStruct = MessageSent>> =
Box::new(client);
Some(Arc::new(client_converted))
} else {
anyhow::bail!(
Expand Down

0 comments on commit d4aa988

Please sign in to comment.