Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(message relayer): Split message relayer into reusable parts #117

Merged
merged 17 commits into from
Sep 3, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gear-rpc-client/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct MerkleProof {
pub leaf_index: u64,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Message {
pub nonce_le: [u8; 32],
pub source: [u8; 32],
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ axum.workspace = true
checkpoint_light_client-io = { workspace = true, features = ["std"] }
checkpoint_light_client = { workspace = true, features = ["std"] }
clap.workspace = true
derive_more.workspace = true
dotenv.workspace = true
futures.workspace = true
gear-core.workspace = true
Expand Down
55 changes: 39 additions & 16 deletions relayer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
extern crate pretty_env_logger;

use std::time::Duration;

use clap::{Args, Parser, Subcommand};
use message_relayer::{all_token_transfers, paid_token_transfers};
use pretty_env_logger::env_logger::fmt::TimestampPrecision;

use ethereum_client::EthApi;
use gear_rpc_client::GearApi;
use message_relayer::MessageRelayer;
use proof_storage::{FileSystemProofStorage, GearProofStorage, ProofStorage};
use relay_merkle_roots::MerkleRootRelayer;
use utils_prometheus::MetricsBuilder;
Expand Down Expand Up @@ -221,33 +223,54 @@ async fn main() {
let gear_api = create_gear_client(&args.vara_endpoint).await;
let eth_api = create_eth_client(&args.ethereum_args);

let bridging_payment_address = args.bridging_payment_address.map(|addr| {
let addr = if &addr[..2] == "0x" {
&addr[2..]
if let Some(bridging_payment_address) = args.bridging_payment_address {
let bridging_payment_address = if &bridging_payment_address[..2] == "0x" {
&bridging_payment_address[2..]
} else {
&addr
&bridging_payment_address
};

let arr: [u8; 32] = hex::decode(addr)
let bridging_payment_address: [u8; 32] = hex::decode(bridging_payment_address)
.expect("Wrong format of bridging-payment-address")
.try_into()
.expect("Wrong format of bridging-payment-address");

arr.into()
});
let bridging_payment_address = bridging_payment_address.into();

let relayer =
MessageRelayer::new(gear_api, eth_api, args.from_block, bridging_payment_address)
let relayer = paid_token_transfers::Relayer::new(
gear_api,
eth_api,
args.from_block,
bridging_payment_address,
)
.await
.unwrap();

MetricsBuilder::new()
.register_service(&relayer)
.build()
.run(args.prometheus_args.endpoint)
.await;

relayer.run();
} else {
let relayer = all_token_transfers::Relayer::new(gear_api, eth_api, args.from_block)
.await
.unwrap();

MetricsBuilder::new()
.register_service(&relayer)
.build()
.run(args.prometheus_args.endpoint)
.await;
MetricsBuilder::new()
.register_service(&relayer)
.build()
.run(args.prometheus_args.endpoint)
.await;

relayer.run();
}

relayer.run().await.unwrap();
loop {
// relayer.run() spawns thread and exits, so we need to add this loop after calling run.
std::thread::sleep(Duration::from_millis(100));
}
}
CliCommands::RelayCheckpoints(args) => ethereum_checkpoints::relay(args).await,
CliCommands::FetchAuthoritySetState(args) => {
Expand Down
80 changes: 80 additions & 0 deletions relayer/src/message_relayer/all_token_transfers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::iter;

use ethereum_client::EthApi;
use gear_rpc_client::GearApi;
use utils_prometheus::MeteredService;

use super::common::{
ethereum_block_listener::EthereumBlockListener, ethereum_message_sender::EthereumMessageSender,
gear_block_listener::GearBlockListener, merkle_root_extractor::MerkleRootExtractor,
message_queued_event_extractor::MessageQueuedEventExtractor,
};

pub struct Relayer {
gear_block_listener: GearBlockListener,
ethereum_block_listener: EthereumBlockListener,

message_sent_listener: MessageQueuedEventExtractor,

merkle_root_extractor: MerkleRootExtractor,
message_sender: EthereumMessageSender,
}

impl MeteredService for Relayer {
fn get_sources(&self) -> impl IntoIterator<Item = Box<dyn prometheus::core::Collector>> {
iter::empty()
.chain(self.gear_block_listener.get_sources())
.chain(self.ethereum_block_listener.get_sources())
.chain(self.message_sent_listener.get_sources())
.chain(self.merkle_root_extractor.get_sources())
.chain(self.message_sender.get_sources())
}
}

impl Relayer {
pub async fn new(
gear_api: GearApi,
eth_api: EthApi,
from_block: Option<u32>,
) -> anyhow::Result<Self> {
let from_gear_block = if let Some(block) = from_block {
block
} else {
let block = gear_api.latest_finalized_block().await?;
gear_api.block_hash_to_number(block).await?
};

let from_eth_block = eth_api.block_number().await?;

let gear_block_listener = GearBlockListener::new(gear_api.clone(), from_gear_block);

let ethereum_block_listener = EthereumBlockListener::new(eth_api.clone(), from_eth_block);

let message_sent_listener = MessageQueuedEventExtractor::new(gear_api.clone());

let merkle_root_listener = MerkleRootExtractor::new(eth_api.clone(), gear_api.clone());

let message_sender = EthereumMessageSender::new(eth_api, gear_api);

Ok(Self {
gear_block_listener,
ethereum_block_listener,

message_sent_listener,

merkle_root_extractor: merkle_root_listener,
message_sender,
})
}

pub fn run(self) {
let [gear_blocks] = self.gear_block_listener.run();
let ethereum_blocks = self.ethereum_block_listener.run();

let messages = self.message_sent_listener.run(gear_blocks);

let merkle_roots = self.merkle_root_extractor.run(ethereum_blocks);

self.message_sender.run(messages, merkle_roots);
}
}
93 changes: 93 additions & 0 deletions relayer/src/message_relayer/common/ethereum_block_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{
sync::mpsc::{channel, Receiver, Sender},
time::Duration,
};

use ethereum_client::EthApi;
use prometheus::IntGauge;
use utils_prometheus::{impl_metered_service, MeteredService};

use super::EthereumBlockNumber;

const ETHEREUM_BLOCK_TIME_APPROX: Duration = Duration::from_secs(12);

pub struct EthereumBlockListener {
eth_api: EthApi,
from_block: u64,

metrics: Metrics,
}

impl MeteredService for EthereumBlockListener {
fn get_sources(&self) -> impl IntoIterator<Item = Box<dyn prometheus::core::Collector>> {
self.metrics.get_sources()
}
}

impl_metered_service! {
struct Metrics {
latest_block: IntGauge,
}
}

impl Metrics {
fn new() -> Self {
Self::new_inner().expect("Failed to create metrics")
}

fn new_inner() -> prometheus::Result<Self> {
Ok(Self {
latest_block: IntGauge::new(
"ethereum_block_listener_latest_block",
"Latest ethereum block discovered by listener",
)?,
})
}
}

impl EthereumBlockListener {
pub fn new(eth_api: EthApi, from_block: u64) -> Self {
Self {
eth_api,
from_block,

metrics: Metrics::new(),
}
}

pub fn run(self) -> Receiver<EthereumBlockNumber> {
let (sender, receiver) = channel();

tokio::spawn(async move {
loop {
let res = self.run_inner(&sender).await;
if let Err(err) = res {
log::error!("Ethereum block listener failed: {}", err);
}
}
});

receiver
}

async fn run_inner(&self, sender: &Sender<EthereumBlockNumber>) -> anyhow::Result<()> {
let mut current_block = self.from_block;

self.metrics.latest_block.set(current_block as i64);

loop {
let latest = self.eth_api.block_number().await?;
if latest >= current_block {
for block in current_block..=latest {
sender.send(EthereumBlockNumber(block))?;
}

current_block = latest + 1;

self.metrics.latest_block.set(latest as i64);
} else {
tokio::time::sleep(ETHEREUM_BLOCK_TIME_APPROX / 2).await;
}
}
}
}
Loading
Loading