diff --git a/Cargo.lock b/Cargo.lock index 8bc380ed..46b939a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9736,6 +9736,7 @@ dependencies = [ "checkpoint_light_client", "checkpoint_light_client-io", "clap", + "derive_more", "dotenv", "ethereum-client", "futures", diff --git a/gear-rpc-client/src/dto.rs b/gear-rpc-client/src/dto.rs index 40ab4ccd..4faa1ebe 100644 --- a/gear-rpc-client/src/dto.rs +++ b/gear-rpc-client/src/dto.rs @@ -47,7 +47,7 @@ pub struct MerkleProof { pub leaf_index: u64, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Message { pub nonce_le: [u8; 32], pub source: [u8; 32], diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index b9be71cd..01e91825 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -16,6 +16,7 @@ axum.workspace = true checkpoint_light_client-io = { workspace = true, features = ["std"] } checkpoint_light_client = { workspace = true, features = ["std"] } clap.workspace = true +derive_more.workspace = true dotenv.workspace = true futures.workspace = true gear-core.workspace = true diff --git a/relayer/src/main.rs b/relayer/src/main.rs index a9cc3a1d..775da11c 100644 --- a/relayer/src/main.rs +++ b/relayer/src/main.rs @@ -1,11 +1,13 @@ extern crate pretty_env_logger; +use std::time::Duration; + use clap::{Args, Parser, Subcommand}; +use message_relayer::{all_token_transfers, paid_token_transfers}; use pretty_env_logger::env_logger::fmt::TimestampPrecision; use ethereum_client::EthApi; use gear_rpc_client::GearApi; -use message_relayer::MessageRelayer; use proof_storage::{FileSystemProofStorage, GearProofStorage, ProofStorage}; use relay_merkle_roots::MerkleRootRelayer; use utils_prometheus::MetricsBuilder; @@ -221,33 +223,54 @@ async fn main() { let gear_api = create_gear_client(&args.vara_endpoint).await; let eth_api = create_eth_client(&args.ethereum_args); - let bridging_payment_address = args.bridging_payment_address.map(|addr| { - let addr = if &addr[..2] == "0x" { - &addr[2..] + if let Some(bridging_payment_address) = args.bridging_payment_address { + let bridging_payment_address = if &bridging_payment_address[..2] == "0x" { + &bridging_payment_address[2..] } else { - &addr + &bridging_payment_address }; - let arr: [u8; 32] = hex::decode(addr) + let bridging_payment_address: [u8; 32] = hex::decode(bridging_payment_address) .expect("Wrong format of bridging-payment-address") .try_into() .expect("Wrong format of bridging-payment-address"); - arr.into() - }); + let bridging_payment_address = bridging_payment_address.into(); - let relayer = - MessageRelayer::new(gear_api, eth_api, args.from_block, bridging_payment_address) + let relayer = paid_token_transfers::Relayer::new( + gear_api, + eth_api, + args.from_block, + bridging_payment_address, + ) + .await + .unwrap(); + + MetricsBuilder::new() + .register_service(&relayer) + .build() + .run(args.prometheus_args.endpoint) + .await; + + relayer.run(); + } else { + let relayer = all_token_transfers::Relayer::new(gear_api, eth_api, args.from_block) .await .unwrap(); - MetricsBuilder::new() - .register_service(&relayer) - .build() - .run(args.prometheus_args.endpoint) - .await; + MetricsBuilder::new() + .register_service(&relayer) + .build() + .run(args.prometheus_args.endpoint) + .await; + + relayer.run(); + } - relayer.run().await.unwrap(); + loop { + // relayer.run() spawns thread and exits, so we need to add this loop after calling run. + std::thread::sleep(Duration::from_millis(100)); + } } CliCommands::RelayCheckpoints(args) => ethereum_checkpoints::relay(args).await, CliCommands::FetchAuthoritySetState(args) => { diff --git a/relayer/src/message_relayer/all_token_transfers.rs b/relayer/src/message_relayer/all_token_transfers.rs new file mode 100644 index 00000000..19cb4404 --- /dev/null +++ b/relayer/src/message_relayer/all_token_transfers.rs @@ -0,0 +1,80 @@ +use std::iter; + +use ethereum_client::EthApi; +use gear_rpc_client::GearApi; +use utils_prometheus::MeteredService; + +use super::common::{ + ethereum_block_listener::EthereumBlockListener, ethereum_message_sender::EthereumMessageSender, + gear_block_listener::GearBlockListener, merkle_root_extractor::MerkleRootExtractor, + message_queued_event_extractor::MessageQueuedEventExtractor, +}; + +pub struct Relayer { + gear_block_listener: GearBlockListener, + ethereum_block_listener: EthereumBlockListener, + + message_sent_listener: MessageQueuedEventExtractor, + + merkle_root_extractor: MerkleRootExtractor, + message_sender: EthereumMessageSender, +} + +impl MeteredService for Relayer { + fn get_sources(&self) -> impl IntoIterator> { + iter::empty() + .chain(self.gear_block_listener.get_sources()) + .chain(self.ethereum_block_listener.get_sources()) + .chain(self.message_sent_listener.get_sources()) + .chain(self.merkle_root_extractor.get_sources()) + .chain(self.message_sender.get_sources()) + } +} + +impl Relayer { + pub async fn new( + gear_api: GearApi, + eth_api: EthApi, + from_block: Option, + ) -> anyhow::Result { + let from_gear_block = if let Some(block) = from_block { + block + } else { + let block = gear_api.latest_finalized_block().await?; + gear_api.block_hash_to_number(block).await? + }; + + let from_eth_block = eth_api.block_number().await?; + + let gear_block_listener = GearBlockListener::new(gear_api.clone(), from_gear_block); + + let ethereum_block_listener = EthereumBlockListener::new(eth_api.clone(), from_eth_block); + + let message_sent_listener = MessageQueuedEventExtractor::new(gear_api.clone()); + + let merkle_root_listener = MerkleRootExtractor::new(eth_api.clone(), gear_api.clone()); + + let message_sender = EthereumMessageSender::new(eth_api, gear_api); + + Ok(Self { + gear_block_listener, + ethereum_block_listener, + + message_sent_listener, + + merkle_root_extractor: merkle_root_listener, + message_sender, + }) + } + + pub fn run(self) { + let [gear_blocks] = self.gear_block_listener.run(); + let ethereum_blocks = self.ethereum_block_listener.run(); + + let messages = self.message_sent_listener.run(gear_blocks); + + let merkle_roots = self.merkle_root_extractor.run(ethereum_blocks); + + self.message_sender.run(messages, merkle_roots); + } +} diff --git a/relayer/src/message_relayer/common/ethereum_block_listener.rs b/relayer/src/message_relayer/common/ethereum_block_listener.rs new file mode 100644 index 00000000..f02e4d0c --- /dev/null +++ b/relayer/src/message_relayer/common/ethereum_block_listener.rs @@ -0,0 +1,93 @@ +use std::{ + sync::mpsc::{channel, Receiver, Sender}, + time::Duration, +}; + +use ethereum_client::EthApi; +use prometheus::IntGauge; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use super::EthereumBlockNumber; + +const ETHEREUM_BLOCK_TIME_APPROX: Duration = Duration::from_secs(12); + +pub struct EthereumBlockListener { + eth_api: EthApi, + from_block: u64, + + metrics: Metrics, +} + +impl MeteredService for EthereumBlockListener { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} + +impl_metered_service! { + struct Metrics { + latest_block: IntGauge, + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + latest_block: IntGauge::new( + "ethereum_block_listener_latest_block", + "Latest ethereum block discovered by listener", + )?, + }) + } +} + +impl EthereumBlockListener { + pub fn new(eth_api: EthApi, from_block: u64) -> Self { + Self { + eth_api, + from_block, + + metrics: Metrics::new(), + } + } + + pub fn run(self) -> Receiver { + let (sender, receiver) = channel(); + + tokio::spawn(async move { + loop { + let res = self.run_inner(&sender).await; + if let Err(err) = res { + log::error!("Ethereum block listener failed: {}", err); + } + } + }); + + receiver + } + + async fn run_inner(&self, sender: &Sender) -> anyhow::Result<()> { + let mut current_block = self.from_block; + + self.metrics.latest_block.set(current_block as i64); + + loop { + let latest = self.eth_api.block_number().await?; + if latest >= current_block { + for block in current_block..=latest { + sender.send(EthereumBlockNumber(block))?; + } + + current_block = latest + 1; + + self.metrics.latest_block.set(latest as i64); + } else { + tokio::time::sleep(ETHEREUM_BLOCK_TIME_APPROX / 2).await; + } + } + } +} diff --git a/relayer/src/message_relayer/common/ethereum_message_sender/era.rs b/relayer/src/message_relayer/common/ethereum_message_sender/era.rs new file mode 100644 index 00000000..7b92fd91 --- /dev/null +++ b/relayer/src/message_relayer/common/ethereum_message_sender/era.rs @@ -0,0 +1,287 @@ +use std::collections::{btree_map::Entry, BTreeMap}; + +use ethereum_client::{EthApi, TxHash, TxStatus}; +use gear_rpc_client::{dto::Message, GearApi}; +use keccak_hash::keccak_256; +use primitive_types::H256; +use prometheus::IntCounter; +use utils_prometheus::impl_metered_service; + +use crate::message_relayer::common::{GearBlockNumber, MessageInBlock, RelayedMerkleRoot}; + +pub struct Era { + latest_merkle_root: Option, + messages: BTreeMap>, + pending_txs: Vec, + + metrics: Metrics, +} + +struct RelayMessagePendingTx { + hash: TxHash, + message_block: GearBlockNumber, + message: Message, +} + +impl_metered_service! { + pub struct Metrics { + total_submitted_txs: IntCounter, + total_failed_txs: IntCounter, + total_failed_txs_because_processed: IntCounter, + } +} + +impl Metrics { + pub fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + total_submitted_txs: IntCounter::new( + "ethereum_message_sender_total_submitted_txs", + "Total amount of txs sent to ethereum", + )?, + total_failed_txs: IntCounter::new( + "ethereum_message_sender_total_failed_txs", + "Total amount of txs sent to ethereum and failed", + )?, + total_failed_txs_because_processed: IntCounter::new( + "ethereum_message_sender_total_failed_txs_because_processed", + "Amount of txs sent to ethereum and failed because they've already been processed", + )?, + }) + } +} + +impl Era { + pub fn new(metrics: Metrics) -> Self { + Self { + latest_merkle_root: None, + messages: BTreeMap::new(), + pending_txs: vec![], + + metrics, + } + } + + pub fn push_message(&mut self, message: MessageInBlock) { + match self.messages.entry(message.block) { + Entry::Occupied(mut entry) => { + entry.get_mut().push(message.message); + } + Entry::Vacant(entry) => { + entry.insert(vec![message.message]); + } + } + } + + pub fn push_merkle_root(&mut self, merkle_root: RelayedMerkleRoot) { + if let Some(mr) = self.latest_merkle_root.as_ref() { + if mr.block < merkle_root.block { + self.latest_merkle_root = Some(merkle_root); + } + } else { + self.latest_merkle_root = Some(merkle_root); + } + } + + pub fn pending_tx_count(&self) -> usize { + self.pending_txs.len() + } + + pub async fn process(&mut self, gear_api: &GearApi, eth_api: &EthApi) -> anyhow::Result<()> { + let Some(latest_merkle_root) = self.latest_merkle_root else { + return Ok(()); + }; + + let mut processed_blocks = vec![]; + + for (&message_block, messages) in self.messages.iter() { + if message_block > latest_merkle_root.block { + break; + } + + let merkle_root_block_hash = gear_api + .block_number_to_hash(latest_merkle_root.block.0) + .await?; + + for message in messages { + let tx_hash = submit_message( + gear_api, + eth_api, + message, + latest_merkle_root.block, + merkle_root_block_hash, + ) + .await?; + + self.metrics.total_submitted_txs.inc(); + + self.pending_txs.push(RelayMessagePendingTx { + hash: tx_hash, + message_block, + message: message.clone(), + }); + } + + processed_blocks.push(message_block); + } + + for block in processed_blocks { + self.messages.remove_entry(&block); + } + + Ok(()) + } + + pub async fn try_finalize( + &mut self, + eth_api: &EthApi, + gear_api: &GearApi, + ) -> anyhow::Result { + for i in (0..self.pending_txs.len()).rev() { + if self.try_finalize_tx(i, eth_api, gear_api).await? { + self.pending_txs.remove(i); + } + } + + Ok(self.pending_txs.is_empty()) + } + + async fn try_finalize_tx( + &mut self, + tx: usize, + eth_api: &EthApi, + gear_api: &GearApi, + ) -> anyhow::Result { + let tx = &mut self.pending_txs[tx]; + let status = eth_api.get_tx_status(tx.hash).await?; + + let nonce = H256::from(tx.message.nonce_le); + + match status { + TxStatus::Finalized => { + log::info!( + "Message at block #{} with nonce {} finalized", + tx.message_block, + nonce + ); + Ok(true) + } + TxStatus::Pending => { + log::info!( + "Tx for message at block #{} with nonce {} is waiting for finalization", + tx.message_block, + nonce + ); + Ok(false) + } + TxStatus::Failed => { + self.metrics.total_failed_txs.inc(); + + let already_processed = eth_api.is_message_processed(tx.message.nonce_le).await?; + + if already_processed { + self.metrics.total_failed_txs_because_processed.inc(); + return Ok(true); + } + + let merkle_root_block = self + .latest_merkle_root + .ok_or(anyhow::anyhow!( + "Cannot finalize era without any merkle roots" + ))? + .block; + + if merkle_root_block < tx.message_block { + anyhow::bail!( + "Cannot relay message at block #{}: latest merkle root is at block #{}", + tx.message_block, + merkle_root_block + ); + } + + let merkle_root_block_hash = + gear_api.block_number_to_hash(merkle_root_block.0).await?; + + let tx_hash = submit_message( + gear_api, + eth_api, + &tx.message, + merkle_root_block, + merkle_root_block_hash, + ) + .await?; + + self.metrics.total_submitted_txs.inc(); + + log::warn!( + "Retrying to send failed tx {} for message #{}. New tx: {}", + hex::encode(tx.hash.0), + nonce, + hex::encode(tx_hash.0) + ); + + tx.hash = tx_hash; + + Ok(false) + } + } + } +} + +async fn submit_message( + gear_api: &GearApi, + eth_api: &EthApi, + message: &Message, + merkle_root_block: GearBlockNumber, + merkle_root_block_hash: H256, +) -> anyhow::Result { + let message_hash = message_hash(message); + + log::info!( + "Relaying message with hash {} and nonce {}", + hex::encode(message_hash), + hex::encode(message.nonce_le) + ); + + let proof = gear_api + .fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into()) + .await?; + + let tx_hash = eth_api + .provide_content_message( + merkle_root_block.0, + proof.num_leaves as u32, + proof.leaf_index as u32, + message.nonce_le, + message.source, + message.destination, + message.payload.to_vec(), + proof.proof, + ) + .await?; + + log::info!( + "Message with nonce {} relaying started", + hex::encode(message.nonce_le) + ); + + Ok(tx_hash) +} + +fn message_hash(message: &Message) -> [u8; 32] { + let data = [ + message.nonce_le.as_ref(), + message.source.as_ref(), + message.destination.as_ref(), + message.payload.as_ref(), + ] + .concat(); + + let mut hash = [0; 32]; + keccak_256(&data, &mut hash); + + hash +} diff --git a/relayer/src/message_relayer/common/ethereum_message_sender/mod.rs b/relayer/src/message_relayer/common/ethereum_message_sender/mod.rs new file mode 100644 index 00000000..fca39c71 --- /dev/null +++ b/relayer/src/message_relayer/common/ethereum_message_sender/mod.rs @@ -0,0 +1,162 @@ +use std::{ + collections::{btree_map::Entry, BTreeMap}, + sync::mpsc::Receiver, +}; + +use ethereum_client::EthApi; +use futures::executor::block_on; +use gear_rpc_client::GearApi; +use prometheus::{Gauge, IntGauge}; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use crate::message_relayer::common::{AuthoritySetId, MessageInBlock}; + +mod era; +use era::{Era, Metrics as EraMetrics}; + +use super::RelayedMerkleRoot; + +pub struct EthereumMessageSender { + eth_api: EthApi, + gear_api: GearApi, + + metrics: Metrics, + era_metrics: EraMetrics, +} + +impl MeteredService for EthereumMessageSender { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics + .get_sources() + .into_iter() + .chain(self.era_metrics.get_sources()) + } +} + +impl_metered_service! { + struct Metrics { + pending_tx_count: IntGauge, + fee_payer_balance: Gauge + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + pending_tx_count: IntGauge::new( + "ethereum_message_sender_pending_tx_count", + "Amount of txs pending finalization on ethereum", + )?, + fee_payer_balance: Gauge::new( + "ethereum_message_sender_fee_payer_balance", + "Transaction fee payer balance", + )?, + }) + } +} + +impl EthereumMessageSender { + pub fn new(eth_api: EthApi, gear_api: GearApi) -> Self { + Self { + eth_api, + gear_api, + + metrics: Metrics::new(), + era_metrics: EraMetrics::new(), + } + } + + pub fn run( + self, + messages: Receiver, + merkle_roots: Receiver, + ) { + tokio::task::spawn_blocking(move || loop { + let res = block_on(self.run_inner(&messages, &merkle_roots)); + if let Err(err) = res { + log::error!("Ethereum message sender failed: {}", err); + } + }); + } + + async fn run_inner( + &self, + messages: &Receiver, + merkle_roots: &Receiver, + ) -> anyhow::Result<()> { + let mut eras: BTreeMap = BTreeMap::new(); + + loop { + let fee_payer_balance = self.eth_api.get_approx_balance().await?; + self.metrics.fee_payer_balance.set(fee_payer_balance); + + for message in messages.try_iter() { + let authority_set_id = AuthoritySetId( + self.gear_api + .signed_by_authority_set_id(message.block_hash) + .await?, + ); + + match eras.entry(authority_set_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().push_message(message); + } + Entry::Vacant(entry) => { + let mut era = Era::new(self.era_metrics.clone()); + era.push_message(message); + + entry.insert(era); + } + } + } + + for merkle_root in merkle_roots.try_iter() { + match eras.entry(merkle_root.authority_set_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().push_merkle_root(merkle_root); + } + Entry::Vacant(entry) => { + let mut era = Era::new(self.era_metrics.clone()); + era.push_merkle_root(merkle_root); + + entry.insert(era); + } + } + } + + let latest_era = eras.last_key_value().map(|(k, _)| *k); + let Some(latest_era) = latest_era else { + continue; + }; + + let mut finalized_eras = vec![]; + + for (&era_id, era) in eras.iter_mut() { + let res = era.process(&self.gear_api, &self.eth_api).await; + if let Err(err) = res { + log::error!("Failed to process era #{}: {}", era_id, err); + continue; + } + + let finalized = era.try_finalize(&self.eth_api, &self.gear_api).await?; + + // Latest era cannot be finalized. + if finalized && era_id != latest_era { + log::info!("Era #{} finalized", era_id); + finalized_eras.push(era_id); + } + } + + let pending_tx_count: usize = eras.iter().map(|era| era.1.pending_tx_count()).sum(); + self.metrics.pending_tx_count.set(pending_tx_count as i64); + + for finalized in finalized_eras { + eras.remove(&finalized); + } + } + } +} diff --git a/relayer/src/message_relayer/common/gear_block_listener.rs b/relayer/src/message_relayer/common/gear_block_listener.rs new file mode 100644 index 00000000..3873554a --- /dev/null +++ b/relayer/src/message_relayer/common/gear_block_listener.rs @@ -0,0 +1,99 @@ +use std::{ + sync::mpsc::{channel, Receiver, Sender}, + time::Duration, +}; + +use gear_rpc_client::GearApi; +use prometheus::IntGauge; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use super::GearBlockNumber; + +const GEAR_BLOCK_TIME_APPROX: Duration = Duration::from_secs(3); + +pub struct GearBlockListener { + gear_api: GearApi, + from_block: u32, + + metrics: Metrics, +} + +impl MeteredService for GearBlockListener { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} + +impl_metered_service! { + struct Metrics { + latest_block: IntGauge, + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + latest_block: IntGauge::new( + "gear_block_listener_latest_block", + "Latest gear block discovered by gear block listener", + )?, + }) + } +} + +impl GearBlockListener { + pub fn new(gear_api: GearApi, from_block: u32) -> Self { + Self { + gear_api, + from_block, + + metrics: Metrics::new(), + } + } + + pub fn run(self) -> [Receiver; RECEIVER_COUNT] { + let (senders, receivers): (Vec<_>, Vec<_>) = (0..RECEIVER_COUNT).map(|_| channel()).unzip(); + + tokio::spawn(async move { + loop { + let res = self.run_inner(&senders).await; + if let Err(err) = res { + log::error!("Gear block listener failed: {}", err); + } + } + }); + + receivers + .try_into() + .expect("Expected Vec of correct length") + } + + async fn run_inner(&self, senders: &[Sender]) -> anyhow::Result<()> { + let mut current_block = self.from_block; + + self.metrics.latest_block.set(current_block as i64); + + loop { + let finalized_head = self.gear_api.latest_finalized_block().await?; + let finalized_head = self.gear_api.block_hash_to_number(finalized_head).await?; + + if finalized_head >= current_block { + for block in current_block..=finalized_head { + for sender in senders { + sender.send(GearBlockNumber(block))?; + } + + self.metrics.latest_block.inc(); + } + + current_block = finalized_head + 1; + } else { + tokio::time::sleep(GEAR_BLOCK_TIME_APPROX).await; + } + } + } +} diff --git a/relayer/src/message_relayer/merkle_root_listener.rs b/relayer/src/message_relayer/common/merkle_root_extractor.rs similarity index 50% rename from relayer/src/message_relayer/merkle_root_listener.rs rename to relayer/src/message_relayer/common/merkle_root_extractor.rs index 005ab2df..6bcbd511 100644 --- a/relayer/src/message_relayer/merkle_root_listener.rs +++ b/relayer/src/message_relayer/common/merkle_root_extractor.rs @@ -1,27 +1,21 @@ -use std::{ - sync::mpsc::{channel, Receiver, Sender}, - time::Duration, -}; +use std::sync::mpsc::{channel, Receiver, Sender}; use ethereum_client::EthApi; +use futures::executor::block_on; use gear_rpc_client::GearApi; use prometheus::IntGauge; - use utils_prometheus::{impl_metered_service, MeteredService}; -use super::RelayedMerkleRoot; - -const ETHEREUM_BLOCK_TIME_APPROX: Duration = Duration::from_secs(12); +use super::{AuthoritySetId, EthereumBlockNumber, GearBlockNumber, RelayedMerkleRoot}; -pub struct MerkleRootListener { +pub struct MerkleRootExtractor { eth_api: EthApi, gear_api: GearApi, - from_block: u64, metrics: Metrics, } -impl MeteredService for MerkleRootListener { +impl MeteredService for MerkleRootExtractor { fn get_sources(&self) -> impl IntoIterator> { self.metrics.get_sources() } @@ -29,7 +23,6 @@ impl MeteredService for MerkleRootListener { impl_metered_service! { struct Metrics { - latest_processed_block: IntGauge, latest_merkle_root_for_block: IntGauge } } @@ -41,62 +34,55 @@ impl Metrics { fn new_inner() -> prometheus::Result { Ok(Self { - latest_processed_block: IntGauge::new( - "message_relayer_merkle_root_listener_latest_processed_block", - "Latest ethereum block processed by merkle root listener", - )?, latest_merkle_root_for_block: IntGauge::new( - "message_relayer_merkle_root_listener_latest_merkle_root_for_block", + "merkle_root_extractor_latest_merkle_root_for_block", "Latest gear block present in found merkle roots", )?, }) } } -impl MerkleRootListener { - pub fn new(eth_api: EthApi, gear_api: GearApi, from_block: u64) -> Self { +impl MerkleRootExtractor { + pub fn new(eth_api: EthApi, gear_api: GearApi) -> Self { Self { eth_api, gear_api, - from_block, metrics: Metrics::new(), } } - pub fn run(self) -> Receiver { + pub fn run(self, blocks: Receiver) -> Receiver { let (sender, receiver) = channel(); - tokio::spawn(async move { - loop { - let res = self.run_inner(&sender).await; - if let Err(err) = res { - log::error!("Merkle root listener failed: {}", err); - } + tokio::task::spawn_blocking(move || loop { + let res = block_on(self.run_inner(&blocks, &sender)); + if let Err(err) = res { + log::error!("Merkle root extractor failed: {}", err); } }); receiver } - async fn run_inner(&self, sender: &Sender) -> anyhow::Result<()> { - let mut current_block = self.from_block; - - self.metrics - .latest_processed_block - .set(current_block as i64); - + async fn run_inner( + &self, + blocks: &Receiver, + sender: &Sender, + ) -> anyhow::Result<()> { loop { - let latest = self.eth_api.block_number().await?; - if latest >= current_block { - log::info!("Processing ethereum blocks #{}..#{}", current_block, latest); + for block in blocks.try_iter() { let merkle_roots = self .eth_api - .fetch_merkle_roots_in_range(current_block, latest) + .fetch_merkle_roots_in_range(block.0, block.0) .await?; if !merkle_roots.is_empty() { - log::info!("Found {} merkle roots", merkle_roots.len()); + log::info!( + "Found {} merkle roots at block #{}", + merkle_roots.len(), + block + ); } for merkle_root in merkle_roots { @@ -110,7 +96,7 @@ impl MerkleRootListener { .await?; let authority_set_id = - self.gear_api.signed_by_authority_set_id(block_hash).await?; + AuthoritySetId(self.gear_api.signed_by_authority_set_id(block_hash).await?); log::info!( "Found merkle root for gear block #{} and era #{}", @@ -119,15 +105,10 @@ impl MerkleRootListener { ); sender.send(RelayedMerkleRoot { - gear_block: merkle_root.block_number as u32, + block: GearBlockNumber(merkle_root.block_number as u32), authority_set_id, })?; } - - current_block = latest + 1; - self.metrics.latest_processed_block.inc(); - } else { - tokio::time::sleep(ETHEREUM_BLOCK_TIME_APPROX / 2).await; } } } diff --git a/relayer/src/message_relayer/common/message_paid_event_extractor.rs b/relayer/src/message_relayer/common/message_paid_event_extractor.rs new file mode 100644 index 00000000..32d996ad --- /dev/null +++ b/relayer/src/message_relayer/common/message_paid_event_extractor.rs @@ -0,0 +1,112 @@ +use std::sync::mpsc::{channel, Receiver, Sender}; + +use bridging_payment::services::BridgingPaymentEvents; +use futures::executor::block_on; +use gear_rpc_client::GearApi; +use parity_scale_codec::Decode; +use primitive_types::H256; +use prometheus::IntCounter; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use super::{GearBlockNumber, PaidMessage}; + +pub struct MessagePaidEventExtractor { + bridging_payment_address: H256, + + gear_api: GearApi, + + metrics: Metrics, +} + +impl MeteredService for MessagePaidEventExtractor { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} + +impl_metered_service! { + struct Metrics { + total_messages_found: IntCounter, + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + total_messages_found: IntCounter::new( + "message_paid_event_extractor_total_messages_found", + "Total amount of paid messages discovered", + )?, + }) + } +} + +impl MessagePaidEventExtractor { + pub fn new(gear_api: GearApi, bridging_payment_address: H256) -> Self { + Self { + bridging_payment_address, + gear_api, + metrics: Metrics::new(), + } + } + + pub fn run(self, blocks: Receiver) -> Receiver { + let (sender, receiver) = channel(); + + tokio::task::spawn_blocking(move || loop { + let res = block_on(self.run_inner(&sender, &blocks)); + if let Err(err) = res { + log::error!("Message paid event extractor failed: {}", err); + } + }); + + receiver + } + + async fn run_inner( + &self, + sender: &Sender, + blocks: &Receiver, + ) -> anyhow::Result<()> { + loop { + for block in blocks.try_iter() { + self.process_block_events(block.0, sender).await?; + } + } + } + + async fn process_block_events( + &self, + block: u32, + sender: &Sender, + ) -> anyhow::Result<()> { + let block_hash = self.gear_api.block_number_to_hash(block).await?; + + let messages = self + .gear_api + .user_message_sent_events(self.bridging_payment_address, block_hash) + .await?; + if !messages.is_empty() { + log::info!("Found {} paid messages at block #{}", messages.len(), block); + self.metrics + .total_messages_found + .inc_by(messages.len() as u64); + + for message in messages { + let user_reply = BridgingPaymentEvents::decode(&mut &message.payload[..])?; + let BridgingPaymentEvents::TeleportVaraToEth { nonce, .. } = user_reply; + + let mut nonce_le = [0; 32]; + nonce.to_little_endian(&mut nonce_le); + + sender.send(PaidMessage { nonce: nonce_le })?; + } + } + + Ok(()) + } +} diff --git a/relayer/src/message_relayer/common/message_queued_event_extractor.rs b/relayer/src/message_relayer/common/message_queued_event_extractor.rs new file mode 100644 index 00000000..5aab0c5e --- /dev/null +++ b/relayer/src/message_relayer/common/message_queued_event_extractor.rs @@ -0,0 +1,105 @@ +use std::sync::mpsc::{channel, Receiver, Sender}; + +use futures::executor::block_on; +use gear_rpc_client::GearApi; +use prometheus::IntCounter; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use super::{GearBlockNumber, MessageInBlock}; + +pub struct MessageQueuedEventExtractor { + gear_api: GearApi, + + metrics: Metrics, +} + +impl MeteredService for MessageQueuedEventExtractor { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} + +impl_metered_service! { + struct Metrics { + total_messages_found: IntCounter, + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + total_messages_found: IntCounter::new( + "message_queued_event_extractor_total_messages_found", + "Total amount of messages discovered", + )?, + }) + } +} + +impl MessageQueuedEventExtractor { + pub fn new(gear_api: GearApi) -> Self { + Self { + gear_api, + metrics: Metrics::new(), + } + } + + pub fn run(self, blocks: Receiver) -> Receiver { + let (sender, receiver) = channel(); + + tokio::task::spawn_blocking(move || loop { + let res = block_on(self.run_inner(&sender, &blocks)); + if let Err(err) = res { + log::error!("Message queued extractor failed: {}", err); + } + }); + + receiver + } + + async fn run_inner( + &self, + sender: &Sender, + blocks: &Receiver, + ) -> anyhow::Result<()> { + loop { + for block in blocks.try_iter() { + self.process_block_events(block, sender).await?; + } + } + } + + async fn process_block_events( + &self, + block: GearBlockNumber, + sender: &Sender, + ) -> anyhow::Result<()> { + let block_hash = self.gear_api.block_number_to_hash(block.0).await?; + + let messages = self.gear_api.message_queued_events(block_hash).await?; + if !messages.is_empty() { + log::info!( + "Found {} queued messages in block #{}", + messages.len(), + block + ); + self.metrics + .total_messages_found + .inc_by(messages.len() as u64); + + for message in messages { + sender.send(MessageInBlock { + message, + block, + block_hash, + })?; + } + } + + Ok(()) + } +} diff --git a/relayer/src/message_relayer/common/mod.rs b/relayer/src/message_relayer/common/mod.rs new file mode 100644 index 00000000..8924cfdc --- /dev/null +++ b/relayer/src/message_relayer/common/mod.rs @@ -0,0 +1,37 @@ +use gear_rpc_client::dto::Message; +use primitive_types::H256; + +pub mod ethereum_block_listener; +pub mod ethereum_message_sender; +pub mod gear_block_listener; +pub mod merkle_root_extractor; +pub mod message_paid_event_extractor; +pub mod message_queued_event_extractor; +pub mod paid_messages_filter; + +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, derive_more::Display)] +pub struct AuthoritySetId(pub u64); + +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, derive_more::Display)] +pub struct GearBlockNumber(pub u32); + +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, derive_more::Display)] +pub struct EthereumBlockNumber(pub u64); + +#[derive(Clone, Debug)] +pub struct MessageInBlock { + pub message: Message, + pub block: GearBlockNumber, + pub block_hash: H256, +} + +#[derive(Clone, Copy, Debug)] +pub struct PaidMessage { + pub nonce: [u8; 32], +} + +#[derive(Clone, Copy, Debug)] +pub struct RelayedMerkleRoot { + pub block: GearBlockNumber, + pub authority_set_id: AuthoritySetId, +} diff --git a/relayer/src/message_relayer/common/paid_messages_filter.rs b/relayer/src/message_relayer/common/paid_messages_filter.rs new file mode 100644 index 00000000..7d28296f --- /dev/null +++ b/relayer/src/message_relayer/common/paid_messages_filter.rs @@ -0,0 +1,107 @@ +use std::{ + collections::HashMap, + sync::mpsc::{channel, Receiver, Sender}, +}; + +use prometheus::IntGauge; +use utils_prometheus::{impl_metered_service, MeteredService}; + +use super::{MessageInBlock, PaidMessage}; + +pub struct PaidMessagesFilter { + pending_messages: HashMap<[u8; 32], MessageInBlock>, + pending_nonces: Vec<[u8; 32]>, + + metrics: Metrics, +} + +impl MeteredService for PaidMessagesFilter { + fn get_sources(&self) -> impl IntoIterator> { + self.metrics.get_sources() + } +} + +impl_metered_service! { + struct Metrics { + pending_messages_count: IntGauge + } +} + +impl Metrics { + fn new() -> Self { + Self::new_inner().expect("Failed to create metrics") + } + + fn new_inner() -> prometheus::Result { + Ok(Self { + pending_messages_count: IntGauge::new( + "paid_messages_filter_pending_messages_count", + "Amount of discovered but not paid messages", + )?, + }) + } +} + +impl PaidMessagesFilter { + pub fn new() -> Self { + Self { + pending_messages: HashMap::default(), + pending_nonces: vec![], + + metrics: Metrics::new(), + } + } + + pub fn run( + mut self, + messages: Receiver, + paid_messages: Receiver, + ) -> Receiver { + let (sender, receiver) = channel(); + + tokio::spawn(async move { + loop { + let res = self.run_inner(&sender, &messages, &paid_messages); + if let Err(err) = res { + log::error!("Paid messages filter failed: {}", err); + } + } + }); + + receiver + } + + fn run_inner( + &mut self, + sender: &Sender, + messages: &Receiver, + paid_messages: &Receiver, + ) -> anyhow::Result<()> { + loop { + for message in messages.try_iter() { + self.pending_messages + .insert(message.message.nonce_le, message) + .expect("Received 2 messages with the same nonce"); + } + + for PaidMessage { nonce } in paid_messages.try_iter() { + self.pending_nonces.push(nonce); + } + + for i in (0..self.pending_nonces.len()).rev() { + if let Some(message) = self.pending_messages.remove(&self.pending_nonces[i]) { + sender.send(message)?; + self.pending_nonces.remove(i); + } + } + + if !self.pending_nonces.is_empty() { + log::warn!("Discovered message that was paid but it's contents haven't discovered"); + } + + self.metrics + .pending_messages_count + .set(self.pending_messages.len() as i64); + } + } +} diff --git a/relayer/src/message_relayer/event_listener.rs b/relayer/src/message_relayer/event_listener.rs deleted file mode 100644 index 00bd3f27..00000000 --- a/relayer/src/message_relayer/event_listener.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::{ - sync::mpsc::{channel, Receiver, Sender}, - time::Duration, -}; - -use bridging_payment::services::BridgingPaymentEvents; -use gear_rpc_client::GearApi; -use parity_scale_codec::Decode; -use primitive_types::H256; -use prometheus::{IntCounter, IntGauge}; - -use utils_prometheus::{impl_metered_service, MeteredService}; - -use super::{BlockEvent, MessageInBlock}; - -const GEAR_BLOCK_TIME_APPROX: Duration = Duration::from_secs(3); - -pub struct EventListener { - gear_api: GearApi, - from_block: u32, - bridging_payment_address: Option, - - metrics: EventListenerMetrics, -} - -impl MeteredService for EventListener { - fn get_sources(&self) -> impl IntoIterator> { - self.metrics.get_sources() - } -} - -impl_metered_service! { - struct EventListenerMetrics { - processed_block: IntGauge, - total_messages_found: IntCounter, - total_paid_messages_found: IntCounter, - } -} - -impl EventListenerMetrics { - fn new() -> Self { - Self::new_inner().expect("Failed to create metrics") - } - - fn new_inner() -> prometheus::Result { - Ok(Self { - processed_block: IntGauge::new( - "message_relayer_event_listener_processed_block", - "Gear block processed by event listener", - )?, - total_messages_found: IntCounter::new( - "message_relayer_event_listener_total_messages_found", - "Total amount of messages found by event listener, including not paid", - )?, - total_paid_messages_found: IntCounter::new( - "message_relayer_event_listener_total_paid_messages_found", - "Total amount of paid messages found by event listener", - )?, - }) - } -} - -impl EventListener { - pub fn new(gear_api: GearApi, from_block: u32, bridging_payment_address: Option) -> Self { - Self { - gear_api, - from_block, - bridging_payment_address, - - metrics: EventListenerMetrics::new(), - } - } - - pub fn run(self) -> Receiver { - let (sender, receiver) = channel(); - - tokio::spawn(async move { - loop { - let res = self.run_inner(&sender).await; - if let Err(err) = res { - log::error!("Event processor failed: {}", err); - } - } - }); - - receiver - } - - async fn run_inner(&self, sender: &Sender) -> anyhow::Result<()> { - let mut current_block = self.from_block; - - self.metrics.processed_block.set(current_block as i64); - - loop { - let finalized_head = self.gear_api.latest_finalized_block().await?; - let finalized_head = self.gear_api.block_hash_to_number(finalized_head).await?; - - if finalized_head >= current_block { - for block in current_block..=finalized_head { - self.process_block_events(block, sender).await?; - - self.metrics.processed_block.inc(); - } - - current_block = finalized_head + 1; - } else { - tokio::time::sleep(GEAR_BLOCK_TIME_APPROX).await; - } - } - } - - async fn process_block_events( - &self, - block: u32, - sender: &Sender, - ) -> anyhow::Result<()> { - log::info!("Processing gear block #{}", block); - let block_hash = self.gear_api.block_number_to_hash(block).await?; - - let messages = self.gear_api.message_queued_events(block_hash).await?; - if !messages.is_empty() { - log::info!("Found {} sent messages", messages.len()); - self.metrics - .total_messages_found - .inc_by(messages.len() as u64); - - for message in messages { - sender.send(BlockEvent::MessageSent { - message: MessageInBlock { - message, - block, - block_hash, - }, - })?; - } - } - - if let Some(bridging_payment_address) = self.bridging_payment_address { - let messages = self - .gear_api - .user_message_sent_events(bridging_payment_address, block_hash) - .await?; - if !messages.is_empty() { - log::info!("Found {} paid messages", messages.len()); - self.metrics - .total_paid_messages_found - .inc_by(messages.len() as u64); - - for message in messages { - let user_reply = BridgingPaymentEvents::decode(&mut &message.payload[..])?; - let BridgingPaymentEvents::TeleportVaraToEth { nonce, .. } = user_reply; - - sender.send(BlockEvent::MessagePaid { nonce })?; - } - } - } - - Ok(()) - } -} diff --git a/relayer/src/message_relayer/message_processor.rs b/relayer/src/message_relayer/message_processor.rs deleted file mode 100644 index cf751688..00000000 --- a/relayer/src/message_relayer/message_processor.rs +++ /dev/null @@ -1,423 +0,0 @@ -use keccak_hash::keccak_256; -use std::{ - collections::{btree_map::Entry, BTreeMap, HashSet}, - sync::mpsc::Receiver, -}; - -use ethereum_client::{EthApi, TxHash, TxStatus}; -use gear_rpc_client::{dto::Message, GearApi}; -use primitive_types::H256; -use prometheus::{Gauge, IntCounter, IntGauge}; - -use utils_prometheus::{impl_metered_service, MeteredService}; - -use super::{AuthoritySetId, BlockEvent, BlockNumber, RelayedMerkleRoot}; - -pub struct MessageProcessor { - eth_api: EthApi, - gear_api: GearApi, - - metrics: Metrics, - era_metrics: EraMetrics, -} - -struct Era { - latest_merkle_root: Option, - messages: BTreeMap>, - pending_txs: Vec, - - metrics: EraMetrics, -} - -impl_metered_service! { - struct EraMetrics { - total_submitted_txs: IntCounter, - total_failed_txs: IntCounter, - total_failed_txs_because_processed: IntCounter, - } -} - -impl EraMetrics { - fn new() -> Self { - Self::new_inner().expect("Failed to create metrics") - } - - fn new_inner() -> prometheus::Result { - Ok(Self { - total_submitted_txs: IntCounter::new( - "message_relayer_message_processor_total_submitted_txs", - "Total amount of txs sent to ethereum", - )?, - total_failed_txs: IntCounter::new( - "message_relayer_message_processor_total_failed_txs", - "Total amount of txs sent to ethereum and failed", - )?, - total_failed_txs_because_processed: IntCounter::new( - "message_relayer_message_processor_total_failed_txs_because_processed", - "Amount of txs sent to ethereum and failed because they've already bee processed", - )?, - }) - } -} - -struct RelayMessagePendingTx { - hash: TxHash, - message_block: u32, - message: Message, -} - -impl_metered_service! { - struct Metrics { - pending_tx_count: IntGauge, - fee_payer_balance: Gauge - } -} - -impl MeteredService for MessageProcessor { - fn get_sources(&self) -> impl IntoIterator> { - self.metrics - .get_sources() - .into_iter() - .chain(self.era_metrics.get_sources()) - } -} - -impl Metrics { - fn new() -> Self { - Self::new_inner().expect("Failed to create metrics") - } - - fn new_inner() -> prometheus::Result { - Ok(Self { - pending_tx_count: IntGauge::new( - "message_relayer_message_processor_pending_tx_count", - "Amount of txs pending finalization on ethereum", - )?, - fee_payer_balance: Gauge::new( - "message_relayer_message_processor_fee_payer_balance", - "Transaction fee payer balance", - )?, - }) - } -} - -impl MessageProcessor { - pub fn new(eth_api: EthApi, gear_api: GearApi) -> Self { - Self { - eth_api, - gear_api, - metrics: Metrics::new(), - era_metrics: EraMetrics::new(), - } - } - - pub async fn run( - self, - block_events: Receiver, - merkle_roots: Receiver, - ) { - loop { - let res = self.run_inner(&block_events, &merkle_roots).await; - if let Err(err) = res { - log::error!("Message relayer failed: {}", err); - } - } - } - - async fn run_inner( - &self, - block_events: &Receiver, - merkle_roots: &Receiver, - ) -> anyhow::Result<()> { - let mut eras: BTreeMap = BTreeMap::new(); - - let mut paid_messages = HashSet::new(); - - loop { - let fee_payer_balance = self.eth_api.get_approx_balance().await?; - self.metrics.fee_payer_balance.set(fee_payer_balance); - - for event in block_events.try_iter() { - match event { - BlockEvent::MessageSent { message } => { - let authority_set_id = self - .gear_api - .signed_by_authority_set_id(message.block_hash) - .await?; - - match eras.entry(authority_set_id) { - Entry::Occupied(mut entry) => { - match entry.get_mut().messages.entry(message.block) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(message.message); - } - Entry::Vacant(entry) => { - entry.insert(vec![message.message]); - } - } - } - Entry::Vacant(entry) => { - let mut messages = BTreeMap::new(); - messages.insert(message.block, vec![message.message]); - - entry.insert(Era { - latest_merkle_root: None, - messages, - pending_txs: vec![], - metrics: self.era_metrics.clone(), - }); - } - } - } - BlockEvent::MessagePaid { nonce } => { - paid_messages.insert(nonce); - } - } - } - - for new_merkle_root in merkle_roots.try_iter() { - match eras.entry(new_merkle_root.authority_set_id) { - Entry::Occupied(mut entry) => { - let era = entry.get_mut(); - - if let Some(mr) = era.latest_merkle_root.as_ref() { - if mr.gear_block < new_merkle_root.gear_block { - era.latest_merkle_root = Some(new_merkle_root); - } - } else { - era.latest_merkle_root = Some(new_merkle_root); - } - } - Entry::Vacant(entry) => { - entry.insert(Era { - latest_merkle_root: Some(new_merkle_root), - messages: BTreeMap::new(), - pending_txs: vec![], - metrics: self.era_metrics.clone(), - }); - } - } - } - - let latest_era = eras.last_key_value().map(|(k, _)| *k); - let Some(latest_era) = latest_era else { - continue; - }; - - let mut finalized_eras = vec![]; - - for (&era_id, era) in eras.iter_mut() { - let res = era.process(&self.gear_api, &self.eth_api).await; - if let Err(err) = res { - log::error!("Failed to process era #{}: {}", era_id, err); - continue; - } - - let finalized = era.try_finalize(&self.eth_api, &self.gear_api).await?; - - // Latest era cannot be finalized. - if finalized && era_id != latest_era { - log::info!("Era #{} finalized", era_id); - finalized_eras.push(era_id); - } - } - - let pending_tx_count: usize = eras.iter().map(|era| era.1.pending_txs.len()).sum(); - self.metrics.pending_tx_count.set(pending_tx_count as i64); - - for finalized in finalized_eras { - eras.remove(&finalized); - } - } - } -} - -impl Era { - pub async fn process(&mut self, gear_api: &GearApi, eth_api: &EthApi) -> anyhow::Result<()> { - let Some(latest_merkle_root) = self.latest_merkle_root else { - return Ok(()); - }; - - let mut processed_blocks = vec![]; - - for (&message_block, messages) in self.messages.iter() { - if message_block > latest_merkle_root.gear_block { - break; - } - - let merkle_root_block_hash = gear_api - .block_number_to_hash(latest_merkle_root.gear_block) - .await?; - - for message in messages { - let tx_hash = submit_message( - gear_api, - eth_api, - message, - latest_merkle_root.gear_block, - merkle_root_block_hash, - ) - .await?; - - self.metrics.total_submitted_txs.inc(); - - self.pending_txs.push(RelayMessagePendingTx { - hash: tx_hash, - message_block, - message: message.clone(), - }); - } - - processed_blocks.push(message_block); - } - - for block in processed_blocks { - self.messages.remove_entry(&block); - } - - Ok(()) - } - - pub async fn try_finalize( - &mut self, - eth_api: &EthApi, - gear_api: &GearApi, - ) -> anyhow::Result { - for i in (0..self.pending_txs.len()).rev() { - if self.try_finalize_tx(i, eth_api, gear_api).await? { - self.pending_txs.remove(i); - } - } - - Ok(self.pending_txs.is_empty()) - } - - async fn try_finalize_tx( - &mut self, - tx: usize, - eth_api: &EthApi, - gear_api: &GearApi, - ) -> anyhow::Result { - let tx = &mut self.pending_txs[tx]; - let status = eth_api.get_tx_status(tx.hash).await?; - - let nonce = H256::from(tx.message.nonce_le); - - match status { - TxStatus::Finalized => { - log::info!( - "Message at block #{} with nonce {} finalized", - tx.message_block, - nonce - ); - Ok(true) - } - TxStatus::Pending => { - log::info!( - "Tx for message at block #{} with nonce {} is waiting for finalization", - tx.message_block, - nonce - ); - Ok(false) - } - TxStatus::Failed => { - self.metrics.total_failed_txs.inc(); - - let already_processed = eth_api.is_message_processed(tx.message.nonce_le).await?; - - if already_processed { - self.metrics.total_failed_txs_because_processed.inc(); - return Ok(true); - } - - let merkle_root_block = self - .latest_merkle_root - .ok_or(anyhow::anyhow!( - "Cannot finalize era without any merkle roots" - ))? - .gear_block; - - if merkle_root_block < tx.message_block { - anyhow::bail!( - "Cannot relay message at block #{}: latest merkle root is at block #{}", - tx.message_block, - merkle_root_block - ); - } - - let merkle_root_block_hash = - gear_api.block_number_to_hash(merkle_root_block).await?; - - let tx_hash = submit_message( - gear_api, - eth_api, - &tx.message, - merkle_root_block, - merkle_root_block_hash, - ) - .await?; - - self.metrics.total_submitted_txs.inc(); - - log::warn!( - "Retrying to send failed tx {} for message #{}. New tx: {}", - hex::encode(tx.hash.0), - nonce, - hex::encode(tx_hash.0) - ); - - tx.hash = tx_hash; - - Ok(false) - } - } - } -} - -async fn submit_message( - gear_api: &GearApi, - eth_api: &EthApi, - message: &Message, - merkle_root_block: u32, - merkle_root_block_hash: H256, -) -> anyhow::Result { - let message_hash = message_hash(message); - - log::info!("Relaying message with hash {}", hex::encode(message_hash)); - - let proof = gear_api - .fetch_message_inclusion_merkle_proof(merkle_root_block_hash, message_hash.into()) - .await?; - - let tx_hash = eth_api - .provide_content_message( - merkle_root_block, - proof.num_leaves as u32, - proof.leaf_index as u32, - message.nonce_le, - message.source, - message.destination, - message.payload.to_vec(), - proof.proof, - ) - .await?; - - log::info!("Message #{:?} relaying started", message.nonce_le); - - Ok(tx_hash) -} - -fn message_hash(message: &Message) -> [u8; 32] { - let data = [ - message.nonce_le.as_ref(), - message.source.as_ref(), - message.destination.as_ref(), - message.payload.as_ref(), - ] - .concat(); - - let mut hash = [0; 32]; - keccak_256(&data, &mut hash); - - hash -} diff --git a/relayer/src/message_relayer/mod.rs b/relayer/src/message_relayer/mod.rs index 117cb8b9..d6dca88f 100644 --- a/relayer/src/message_relayer/mod.rs +++ b/relayer/src/message_relayer/mod.rs @@ -1,97 +1,4 @@ -use ethereum_client::EthApi; -use gear_rpc_client::{dto::Message, GearApi}; -use primitive_types::{H256, U256}; +mod common; -use utils_prometheus::MeteredService; - -mod event_listener; -mod merkle_root_listener; -mod message_processor; - -use event_listener::EventListener; -use merkle_root_listener::MerkleRootListener; -use message_processor::MessageProcessor; - -type AuthoritySetId = u64; -type BlockNumber = u32; - -enum BlockEvent { - MessageSent { message: MessageInBlock }, - MessagePaid { nonce: U256 }, -} - -struct MessageInBlock { - message: Message, - block: u32, - block_hash: H256, -} - -#[derive(Clone, Copy)] -struct RelayedMerkleRoot { - gear_block: u32, - authority_set_id: AuthoritySetId, -} - -pub struct MessageRelayer { - event_processor: EventListener, - merkle_root_listener: MerkleRootListener, - message_processor: MessageProcessor, -} - -impl MeteredService for MessageRelayer { - fn get_sources(&self) -> impl IntoIterator> { - self.event_processor - .get_sources() - .into_iter() - .chain(self.merkle_root_listener.get_sources()) - .chain(self.message_processor.get_sources()) - } -} - -impl MessageRelayer { - pub async fn new( - gear_api: GearApi, - eth_api: EthApi, - from_block: Option, - bridging_payment_address: Option, - ) -> anyhow::Result { - let from_gear_block = if let Some(block) = from_block { - block - } else { - let block = gear_api.latest_finalized_block().await?; - gear_api.block_hash_to_number(block).await? - }; - - let from_eth_block = eth_api.block_number().await?; - - log::info!( - "Starting gear event processing from block #{}", - from_gear_block - ); - log::info!("Starting ethereum listener from block #{}", from_eth_block); - - let event_processor = - EventListener::new(gear_api.clone(), from_gear_block, bridging_payment_address); - - let merkle_root_listener = - MerkleRootListener::new(eth_api.clone(), gear_api.clone(), from_eth_block); - - let message_processor = MessageProcessor::new(eth_api, gear_api); - - Ok(Self { - event_processor, - merkle_root_listener, - message_processor, - }) - } - - pub async fn run(self) -> anyhow::Result<()> { - let messages = self.event_processor.run(); - let merkle_roots = self.merkle_root_listener.run(); - - log::info!("Starting message relayer"); - self.message_processor.run(messages, merkle_roots).await; - - Ok(()) - } -} +pub mod all_token_transfers; +pub mod paid_token_transfers; diff --git a/relayer/src/message_relayer/paid_token_transfers.rs b/relayer/src/message_relayer/paid_token_transfers.rs new file mode 100644 index 00000000..09dc3968 --- /dev/null +++ b/relayer/src/message_relayer/paid_token_transfers.rs @@ -0,0 +1,106 @@ +use std::iter; + +use ethereum_client::EthApi; +use gear_rpc_client::GearApi; +use primitive_types::H256; +use utils_prometheus::MeteredService; + +use super::common::{ + ethereum_block_listener::EthereumBlockListener, ethereum_message_sender::EthereumMessageSender, + gear_block_listener::GearBlockListener, merkle_root_extractor::MerkleRootExtractor, + message_paid_event_extractor::MessagePaidEventExtractor, + message_queued_event_extractor::MessageQueuedEventExtractor, + paid_messages_filter::PaidMessagesFilter, +}; + +pub struct Relayer { + gear_block_listener: GearBlockListener, + ethereum_block_listener: EthereumBlockListener, + + message_sent_listener: MessageQueuedEventExtractor, + message_paid_listener: MessagePaidEventExtractor, + + paid_messages_filter: PaidMessagesFilter, + + merkle_root_extractor: MerkleRootExtractor, + message_sender: EthereumMessageSender, +} + +impl MeteredService for Relayer { + fn get_sources(&self) -> impl IntoIterator> { + iter::empty() + .chain(self.gear_block_listener.get_sources()) + .chain(self.ethereum_block_listener.get_sources()) + .chain(self.message_sent_listener.get_sources()) + .chain(self.message_paid_listener.get_sources()) + .chain(self.paid_messages_filter.get_sources()) + .chain(self.merkle_root_extractor.get_sources()) + .chain(self.message_sender.get_sources()) + } +} + +impl Relayer { + pub async fn new( + gear_api: GearApi, + eth_api: EthApi, + from_block: Option, + bridging_payment_address: H256, + ) -> anyhow::Result { + let from_gear_block = if let Some(block) = from_block { + block + } else { + let block = gear_api.latest_finalized_block().await?; + gear_api.block_hash_to_number(block).await? + }; + + let from_eth_block = eth_api.block_number().await?; + + log::info!( + "Starting gear event processing from block #{}", + from_gear_block + ); + log::info!("Starting ethereum listener from block #{}", from_eth_block); + + let gear_block_listener = GearBlockListener::new(gear_api.clone(), from_gear_block); + + let ethereum_block_listener = EthereumBlockListener::new(eth_api.clone(), from_eth_block); + + let message_sent_listener = MessageQueuedEventExtractor::new(gear_api.clone()); + + let message_paid_listener = + MessagePaidEventExtractor::new(gear_api.clone(), bridging_payment_address); + + let paid_messages_filter = PaidMessagesFilter::new(); + + let merkle_root_listener = MerkleRootExtractor::new(eth_api.clone(), gear_api.clone()); + + let message_sender = EthereumMessageSender::new(eth_api, gear_api); + + Ok(Self { + gear_block_listener, + ethereum_block_listener, + + message_sent_listener, + message_paid_listener, + + paid_messages_filter, + + merkle_root_extractor: merkle_root_listener, + message_sender, + }) + } + + pub fn run(self) { + let [gear_blocks_0, gear_blocks_1] = self.gear_block_listener.run(); + let ethereum_blocks = self.ethereum_block_listener.run(); + + let messages = self.message_sent_listener.run(gear_blocks_0); + let paid_messages = self.message_paid_listener.run(gear_blocks_1); + + let filtered_messages = self.paid_messages_filter.run(messages, paid_messages); + + let merkle_roots = self.merkle_root_extractor.run(ethereum_blocks); + + self.message_sender.run(filtered_messages, merkle_roots); + } +}