Skip to content

Commit

Permalink
MessagePaidListener
Browse files Browse the repository at this point in the history
  • Loading branch information
mertwole committed Sep 2, 2024
1 parent 7325dd8 commit d83eaaf
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 15 deletions.
107 changes: 107 additions & 0 deletions relayer/src/message_relayer/common/message_paid_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::sync::mpsc::{channel, Receiver, Sender};

use bridging_payment::services::BridgingPaymentEvents;
use futures::executor::block_on;
use gear_rpc_client::GearApi;
use parity_scale_codec::Decode;
use primitive_types::{H256, U256};
use prometheus::IntCounter;

use utils_prometheus::{impl_metered_service, MeteredService};

use super::block_listener::BlockNumber;

pub struct PaidMessage {
pub nonce: U256,

Check failure on line 15 in relayer/src/message_relayer/common/message_paid_listener.rs

View workflow job for this annotation

GitHub Actions / lints

field `nonce` is never read

Check failure on line 15 in relayer/src/message_relayer/common/message_paid_listener.rs

View workflow job for this annotation

GitHub Actions / build

field `nonce` is never read
}

pub struct MessagePaidListener {
bridging_payment_address: H256,

gear_api: GearApi,

metrics: MessagePaidListenerMetrics,
}

impl MeteredService for MessagePaidListener {
fn get_sources(&self) -> impl IntoIterator<Item = Box<dyn prometheus::core::Collector>> {
self.metrics.get_sources()
}
}

impl_metered_service! {
struct MessagePaidListenerMetrics {
total_paid_messages_found: IntCounter,
}
}

impl MessagePaidListenerMetrics {
fn new() -> Self {
Self::new_inner().expect("Failed to create metrics")
}

fn new_inner() -> prometheus::Result<Self> {
Ok(Self {
total_paid_messages_found: IntCounter::new(
"message_relayer_message_paid_listener_total_paid_messages_found",
"Total amount of paid messages found by event listener",
)?,
})
}
}

impl MessagePaidListener {
pub fn new(gear_api: GearApi, bridging_payment_address: H256) -> Self {
Self {
bridging_payment_address,
gear_api,
metrics: MessagePaidListenerMetrics::new(),
}
}

pub fn run(self, blocks: Receiver<BlockNumber>) -> Receiver<PaidMessage> {
let (sender, receiver) = channel();

tokio::spawn(async move {
loop {
for block in blocks.try_iter() {
let res = block_on(self.process_block_events(block.0, &sender));
if let Err(err) = res {
log::error!("Event processor failed: {}", err);
}
}
}
});

receiver
}

async fn process_block_events(
&self,
block: u32,
sender: &Sender<PaidMessage>,
) -> anyhow::Result<()> {
log::info!("Processing gear block #{}", block);
let block_hash = self.gear_api.block_number_to_hash(block).await?;

let messages = self
.gear_api
.user_message_sent_events(self.bridging_payment_address, block_hash)
.await?;
if !messages.is_empty() {
log::info!("Found {} paid messages", messages.len());
self.metrics
.total_paid_messages_found
.inc_by(messages.len() as u64);

for message in messages {
let user_reply = BridgingPaymentEvents::decode(&mut &message.payload[..])?;
let BridgingPaymentEvents::TeleportVaraToEth { nonce, .. } = user_reply;

sender.send(PaidMessage { nonce })?;
}
}

Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::mpsc::{channel, Receiver, Sender};

use block_listener::BlockNumber;
use futures::executor::block_on;
use gear_rpc_client::GearApi;
use prometheus::IntCounter;
Expand All @@ -9,27 +8,27 @@ use utils_prometheus::{impl_metered_service, MeteredService};

use crate::message_relayer::MessageInBlock;

pub mod block_listener;
use super::block_listener::BlockNumber;

pub struct EventListener {
pub struct MessageQueuedListener {
gear_api: GearApi,

metrics: EventListenerMetrics,
metrics: MessageQueuedListenerMetrics,
}

impl MeteredService for EventListener {
impl MeteredService for MessageQueuedListener {
fn get_sources(&self) -> impl IntoIterator<Item = Box<dyn prometheus::core::Collector>> {
self.metrics.get_sources()
}
}

impl_metered_service! {
struct EventListenerMetrics {
struct MessageQueuedListenerMetrics {
total_messages_found: IntCounter,
}
}

impl EventListenerMetrics {
impl MessageQueuedListenerMetrics {
fn new() -> Self {
Self::new_inner().expect("Failed to create metrics")
}
Expand All @@ -44,11 +43,11 @@ impl EventListenerMetrics {
}
}

impl EventListener {
impl MessageQueuedListener {
pub fn new(gear_api: GearApi) -> Self {
Self {
gear_api,
metrics: EventListenerMetrics::new(),
metrics: MessageQueuedListenerMetrics::new(),
}
}

Expand Down
4 changes: 3 additions & 1 deletion relayer/src/message_relayer/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod gear_event_listener;
pub mod block_listener;
pub mod merkle_root_listener;
pub mod message_paid_listener;
pub mod message_queued_listener;
pub mod message_sender;
9 changes: 4 additions & 5 deletions relayer/src/message_relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use utils_prometheus::MeteredService;
mod common;

use common::{
gear_event_listener::{block_listener::BlockListener, EventListener},
merkle_root_listener::MerkleRootListener,
message_sender::MessageSender,
block_listener::BlockListener, merkle_root_listener::MerkleRootListener,
message_queued_listener::MessageQueuedListener, message_sender::MessageSender,
};

type AuthoritySetId = u64;
Expand All @@ -22,7 +21,7 @@ struct MessageInBlock {

pub struct MessageRelayer {
block_listener: BlockListener,
event_listener: EventListener,
event_listener: MessageQueuedListener,
merkle_root_listener: MerkleRootListener,
message_sender: MessageSender,
}
Expand Down Expand Up @@ -61,7 +60,7 @@ impl MessageRelayer {

let block_listener = BlockListener::new(gear_api.clone(), from_gear_block);

let event_listener = EventListener::new(gear_api.clone());
let event_listener = MessageQueuedListener::new(gear_api.clone());

let merkle_root_listener =
MerkleRootListener::new(eth_api.clone(), gear_api.clone(), from_eth_block);
Expand Down

0 comments on commit d83eaaf

Please sign in to comment.