Skip to content

Commit

Permalink
lp-gateway: Handle message proofs for inbound and outbound messages
Browse files Browse the repository at this point in the history
  • Loading branch information
cdamian committed Jul 23, 2024
1 parent 790f427 commit b37b4b3
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 43 deletions.
5 changes: 5 additions & 0 deletions libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub trait LPEncoding: Sized {
fn deserialize(input: &[u8]) -> Result<Self, DispatchError>;
}

pub trait LPMessage: LPEncoding {
fn get_message_proof(&self) -> Option<[u8; 32]>;
fn to_message_proof(&self) -> Self;
}

#[cfg(any(test, feature = "std"))]
pub mod test_util {
use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
Expand Down
4 changes: 2 additions & 2 deletions pallets/liquidity-pools-gateway/queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::fmt::Debug;

use cfg_traits::liquidity_pools::{MessageProcessor, MessageQueue};
use cfg_traits::liquidity_pools::{MessageProcessor, MessageQueue as MessageQueueT};
use frame_support::pallet_prelude::*;
use frame_system::pallet_prelude::*;
pub use pallet::*;
Expand Down Expand Up @@ -235,7 +235,7 @@ pub mod pallet {
}
}

impl<T: Config> MessageQueue for Pallet<T> {
impl<T: Config> MessageQueueT for Pallet<T> {
type Message = T::Message;

fn submit(message: Self::Message) -> DispatchResult {
Expand Down
145 changes: 112 additions & 33 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
use core::fmt::Debug;

use cfg_traits::{
liquidity_pools::{InboundMessageHandler, LPEncoding, OutboundQueue, Router as DomainRouter},
liquidity_pools::{
InboundMessageHandler, LPMessage, MessageQueue, OutboundQueue, Router as DomainRouter,
},
TryConvert,
};
use cfg_types::domain_address::{Domain, DomainAddress};
Expand Down Expand Up @@ -82,7 +84,7 @@ pub mod pallet {
/// https://github.com/centrifuge/centrifuge-chain/pull/1696#discussion_r1456370592
const DEFAULT_WEIGHT_REF_TIME: u64 = 5_000_000_000;

use cfg_traits::liquidity_pools::{MessageProcessor, MessageQueue};
use cfg_traits::liquidity_pools::{LPEncoding, MessageProcessor, MessageQueue};
use cfg_types::gateway::GatewayMessage;
use frame_support::dispatch::PostDispatchInfo;
use sp_runtime::DispatchErrorWithPostInfo;
Expand Down Expand Up @@ -121,13 +123,7 @@ pub mod pallet {
type AdminOrigin: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;

/// The LP message type.
type LPMessage: LPEncoding
+ Clone
+ Debug
+ PartialEq
+ MaxEncodedLen
+ TypeInfo
+ FullCodec;
type LPMessage: LPMessage + Clone + Debug + PartialEq + MaxEncodedLen + TypeInfo + FullCodec;

/// The message router type that is stored for each domain.
type Router: DomainRouter<Sender = Self::AccountId>
Expand All @@ -142,7 +138,7 @@ pub mod pallet {
/// The type that handles incoming messages.
type InboundMessageHandler: InboundMessageHandler<
Sender = DomainAddress,
Message = Self::Message,
Message = Self::LPMessage,
>;

/// A way to recover a domain address from two byte slices.
Expand All @@ -165,7 +161,7 @@ pub mod pallet {
type MaxRouterCount: Get<u32>;

/// Type used for queueing messages.
type MessageQueue: MessageQueue;
type MessageQueue: MessageQueue<Message = GatewayMessage<Self::AccountId, Self::LPMessage>>;
}

#[pallet::event]
Expand Down Expand Up @@ -226,6 +222,17 @@ pub mod pallet {
pub type RelayerList<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, Domain, Blake2_128Concat, DomainAddress, ()>;

/// Storage that keeps track of incoming message proofs.
#[pallet::storage]
#[pallet::getter(fn inbound_message_proof_count)]
pub type InboundMessageProofCount<T: Config> =
StorageMap<_, Blake2_128Concat, [u8; 32], u32, ValueQuery>;

/// Storage that keeps track of incoming messages.
#[pallet::storage]
#[pallet::getter(fn inbound_messages)]
pub type InboundMessages<T: Config> = StorageMap<_, Blake2_128Concat, [u8; 32], T::LPMessage>;

#[pallet::error]
pub enum Error<T> {
/// Router initialization failed.
Expand Down Expand Up @@ -556,12 +563,84 @@ pub mod pallet {
domain_address: DomainAddress,
message: T::LPMessage,
) -> DispatchResultWithPostInfo {
Ok(PostDispatchInfo::default())
let mut weight = T::DbWeight::get().reads(1);

let mut post_dispatch_info = PostDispatchInfo {
actual_weight: Some(weight),
pays_fee: Pays::Yes,
};

let routers_count = DomainMultiRouters::<T>::get(domain_address.domain())
.ok_or(DispatchErrorWithPostInfo {
post_info: post_dispatch_info,
error: Error::<T>::MultiRouterNotFound.into(),
})?
.len();

let expected_proof_count = routers_count - 1;

let (message_proof, message_proof_count) = match message.get_message_proof() {
None => {
let message_proof = message.to_message_proof().get_message_proof().unwrap();

InboundMessages::<T>::insert(message_proof, message);

(
message_proof,
InboundMessageProofCount::<T>::get(message_proof),
)
}
Some(message_proof) => {
let message_proof_count =
InboundMessageProofCount::<T>::try_mutate(message_proof, |count| {
*count += 1;

Ok::<u32, DispatchError>(*count)
})?;

(message_proof, message_proof_count)
}
};

post_dispatch_info.actual_weight = Some(
post_dispatch_info
.actual_weight
.unwrap()
.saturating_add(weight.saturating_add(T::DbWeight::get().reads_writes(1, 1))),
);

// IMPORTANT - the number of routers on Centrifuge Chain and other domains are
// the same. It is always expected that one router sends the message and the
// others are sending the proofs, i.e. if we have 3 routers, we expect 1 message
// and 2 message proofs.
if message_proof_count != expected_proof_count as u32 {
return Ok(post_dispatch_info);
}

let message = InboundMessages::<T>::get(message_proof).unwrap();

InboundMessages::<T>::remove(message_proof);
InboundMessageProofCount::<T>::remove(message_proof);

post_dispatch_info.actual_weight = Some(
post_dispatch_info
.actual_weight
.unwrap()
.saturating_add(weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))),
);

match T::InboundMessageHandler::handle(domain_address, message) {
Ok(_) => Ok(post_dispatch_info),
Err(e) => Err(DispatchErrorWithPostInfo {
post_info: post_dispatch_info,
error: e,
}),
}
}

/// Retrieves the routers stored for the provided domain and sends the
/// message using each, calculating and returning the required weight
/// for these operations in the `DispatchResultWithPostInfo`.
/// message and message proofs, calculating and returning the required
/// weight for these operations in the `DispatchResultWithPostInfo`.
fn process_outbound_message(
domain: Domain,
sender: T::AccountId,
Expand All @@ -571,32 +650,37 @@ pub mod pallet {

let read_weight = T::DbWeight::get().reads(1);

let mut post_dispatch_info = PostDispatchInfo {
actual_weight: Some(read_weight),
pays_fee: Pays::Yes,
};

let routers =
DomainMultiRouters::<T>::get(domain).ok_or(DispatchErrorWithPostInfo {
post_info: PostDispatchInfo {
actual_weight: Some(read_weight),
pays_fee: Pays::Yes,
},
post_info: post_dispatch_info,
error: Error::<T>::MultiRouterNotFound.into(),
})?;

let mut post_dispatch_info = PostDispatchInfo {
actual_weight: None,
pays_fee: Pays::Yes,
};
let message_proof = message.to_message_proof();
let mut message_opt = Some(message);

for router in routers {
match router.send(sender.clone(), message.serialize()) {
// Ensure that we only send the actual message once, using one router.
// The remaining routers will send the message proof.
let router_msg = match message_opt.take() {
Some(m) => m.serialize(),
None => message_proof.serialize(),
};

match router.send(sender.clone(), router_msg) {
Ok(dispatch_info) => Self::update_total_post_dispatch_info_weight(
&mut post_dispatch_info,
dispatch_info.actual_weight,
read_weight,
),
Err(e) => {
Self::update_total_post_dispatch_info_weight(
&mut post_dispatch_info,
e.post_info.actual_weight,
read_weight,
);

return Err(DispatchErrorWithPostInfo {
Expand All @@ -613,10 +697,9 @@ pub mod pallet {
fn update_total_post_dispatch_info_weight(
post_dispatch_info: &mut PostDispatchInfo,
router_call_weight: Option<Weight>,
extra_weight: Weight,
) {
let router_call_weight =
Self::get_outbound_message_processing_weight(router_call_weight, extra_weight);
Self::get_outbound_message_processing_weight(router_call_weight);

post_dispatch_info.actual_weight = Some(
post_dispatch_info
Expand All @@ -628,10 +711,7 @@ pub mod pallet {

/// Calculates the weight used by a router when processing an outbound
/// message.
fn get_outbound_message_processing_weight(
router_call_weight: Option<Weight>,
extra_weight: Weight,
) -> Weight {
fn get_outbound_message_processing_weight(router_call_weight: Option<Weight>) -> Weight {
let pov_weight: u64 = (Domain::max_encoded_len()
+ T::AccountId::max_encoded_len()
+ T::LPMessage::max_encoded_len())
Expand All @@ -641,7 +721,6 @@ pub mod pallet {
router_call_weight
.unwrap_or(Weight::from_parts(DEFAULT_WEIGHT_REF_TIME, 0))
.saturating_add(Weight::from_parts(0, pov_weight))
.saturating_add(extra_weight)
}
}

Expand All @@ -662,7 +741,7 @@ pub mod pallet {
// Make sure we use the gateway sender.
let sender = T::Sender::get();

Self::process_message(destination, sender, message)
Self::process_outbound_message(destination, sender, message)
}
}
}
Expand Down
29 changes: 25 additions & 4 deletions pallets/liquidity-pools/src/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// GNU General Public License for more details.

use cfg_traits::{
investments::TrancheCurrency, liquidity_pools::OutboundQueue, StatusNotificationHook,
investments::TrancheCurrency,
liquidity_pools::{MessageQueue, OutboundQueue},
StatusNotificationHook,
};
use cfg_types::{
domain_address::DomainAddress,
gateway::GatewayMessage,
investments::{ExecutedForeignCollect, ExecutedForeignDecreaseInvest},
};
use frame_support::{
Expand Down Expand Up @@ -67,7 +70,14 @@ where
currency_payout: status.amount_decreased.into(),
remaining_invest_amount: status.amount_remaining.into(),
};
T::OutboundQueue::submit(T::TreasuryAccount::get(), domain_address.domain(), message)?;

let msg = GatewayMessage::<T::AccountId, Message>::Outbound {
sender: T::TreasuryAccount::get(),
destination: domain_address.domain(),
message,
};

T::MessageQueue::submit(msg)?;

Ok(())
}
Expand Down Expand Up @@ -112,8 +122,13 @@ where
remaining_redeem_amount: status.amount_remaining.into(),
};

T::OutboundQueue::submit(T::TreasuryAccount::get(), domain_address.domain(), message)?;
let msg = GatewayMessage::<T::AccountId, Message>::Outbound {
sender: T::TreasuryAccount::get(),
destination: domain_address.domain(),
message,
};

T::MessageQueue::submit(msg)?;
Ok(())
}
}
Expand Down Expand Up @@ -156,7 +171,13 @@ where
remaining_invest_amount: status.amount_remaining.into(),
};

T::OutboundQueue::submit(T::TreasuryAccount::get(), domain_address.domain(), message)?;
let msg = GatewayMessage::<T::AccountId, Message>::Outbound {
sender: T::TreasuryAccount::get(),
destination: domain_address.domain(),
message,
};

T::MessageQueue::submit(msg)?;

Ok(())
}
Expand Down
21 changes: 19 additions & 2 deletions pallets/liquidity-pools/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// GNU General Public License for more details.

use cfg_traits::{
investments::ForeignInvestment, liquidity_pools::OutboundQueue, Permissions, TimeAsSecs,
investments::ForeignInvestment,
liquidity_pools::{MessageQueue, OutboundQueue},
Permissions, TimeAsSecs,
};
use cfg_types::{
domain_address::{Domain, DomainAddress},
gateway::GatewayMessage,
permissions::{PermissionScope, PoolRole, Role},
};
use frame_support::{
Expand Down Expand Up @@ -270,11 +273,25 @@ where
.into(),
};

T::OutboundQueue::submit(T::TreasuryAccount::get(), destination.domain(), message)?;
Self::send_outbound_message(T::TreasuryAccount::get(), destination.domain(), message)?;

Ok(())
}

fn send_outbound_message(
sender: T::AccountId,
destination: Domain,
message: Message,
) -> DispatchResult {
let msg = GatewayMessage::<T::AccountId, Message>::Outbound {
sender,
destination,
message,
};

T::MessageQueue::submit(msg)
}

/// Cancels an existing redemption order of the investor by decreasing the
/// redemption by the entire unprocessed amount.
///
Expand Down
2 changes: 1 addition & 1 deletion pallets/liquidity-pools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub mod pallet {
type DomainAccountToDomainAddress: Convert<(Domain, [u8; 32]), DomainAddress>;

/// Type used for queueing messages.
type MessageQueue: MessageQueue;
type MessageQueue: MessageQueue<Message = GatewayMessage<Self::AccountId, Message>>;

/// The prefix for currencies added via the LiquidityPools feature.
#[pallet::constant]
Expand Down
Loading

0 comments on commit b37b4b3

Please sign in to comment.