From 1616f7dc6c516c0c5efc4da4e4f0e469e01cae4f Mon Sep 17 00:00:00 2001 From: Kayanski Date: Thu, 20 Jun 2024 15:40:56 +0000 Subject: [PATCH 1/5] Channel creator --- cw-orch-daemon/src/sender.rs | 4 + packages/interchain/hermes-relayer/Cargo.toml | 34 ++ .../hermes-relayer/examples/pion-xion.rs | 53 +++ .../interchain/hermes-relayer/src/channel.rs | 0 .../interchain/hermes-relayer/src/config.rs | 65 ++++ .../interchain/hermes-relayer/src/core.rs | 79 ++++ .../hermes-relayer/src/interchain_env.rs | 344 ++++++++++++++++++ .../interchain/hermes-relayer/src/keys.rs | 29 ++ packages/interchain/hermes-relayer/src/lib.rs | 5 + .../interchain/hermes-relayer/src/packet.rs | 0 .../interchain/hermes-relayer/src/relayer.rs | 149 ++++++++ .../interchain-daemon/src/interchain_env.rs | 2 +- .../interchain/interchain-daemon/src/lib.rs | 1 + .../interchain-daemon/src/packet_inspector.rs | 2 +- 14 files changed, 765 insertions(+), 2 deletions(-) create mode 100644 packages/interchain/hermes-relayer/Cargo.toml create mode 100644 packages/interchain/hermes-relayer/examples/pion-xion.rs create mode 100644 packages/interchain/hermes-relayer/src/channel.rs create mode 100644 packages/interchain/hermes-relayer/src/config.rs create mode 100644 packages/interchain/hermes-relayer/src/core.rs create mode 100644 packages/interchain/hermes-relayer/src/interchain_env.rs create mode 100644 packages/interchain/hermes-relayer/src/keys.rs create mode 100644 packages/interchain/hermes-relayer/src/lib.rs create mode 100644 packages/interchain/hermes-relayer/src/packet.rs create mode 100644 packages/interchain/hermes-relayer/src/relayer.rs diff --git a/cw-orch-daemon/src/sender.rs b/cw-orch-daemon/src/sender.rs index 2cc790f9b..fb996799c 100644 --- a/cw-orch-daemon/src/sender.rs +++ b/cw-orch-daemon/src/sender.rs @@ -218,6 +218,10 @@ impl Sender { } } + pub fn options(&self) -> &SenderOptions { + &self.options + } + fn cosmos_private_key(&self) -> SigningKey { SigningKey::from_slice(&self.private_key.raw_key()).unwrap() } diff --git a/packages/interchain/hermes-relayer/Cargo.toml b/packages/interchain/hermes-relayer/Cargo.toml new file mode 100644 index 000000000..d3aa04f0b --- /dev/null +++ b/packages/interchain/hermes-relayer/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "hermes-relayer" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +cosmwasm-std.workspace = true +cw-orch-interchain-core = { version = "0.3.3", path = "../interchain-core" } +cw-orch-interchain-daemon = { version = "0.3.2", path = "../interchain-daemon" } +cw-orch-networks = { version = "0.22.2", path = "../../cw-orch-networks" } +cw-orch-core = { version = "1.1.1", path = "../../cw-orch-core" } +hdpath = "0.6.3" +ibc-relayer = "0.28.0" +ibc-relayer-cli = "1.9.0" +ibc-relayer-types = { version = "0.28.0" } +old-ibc-relayer-types = { package = "ibc-relayer-types", version = "0.25.0" } +tokio.workspace = true +cw-orch-daemon = { version = "0.23.5", path = "../../../cw-orch-daemon" } +log.workspace = true +cosmrs.workspace = true +futures = "0.3.30" +tonic.workspace = true + +[dev-dependencies] +cw-orch = { path = "../../../cw-orch", features = ["daemon"] } +cw-orch-interchain = { path = "../../../cw-orch-interchain", features = [ + "daemon", +] } +dotenv = "0.15.0" +pretty_env_logger = "0.5.0" diff --git a/packages/interchain/hermes-relayer/examples/pion-xion.rs b/packages/interchain/hermes-relayer/examples/pion-xion.rs new file mode 100644 index 000000000..f63dfbf0a --- /dev/null +++ b/packages/interchain/hermes-relayer/examples/pion-xion.rs @@ -0,0 +1,53 @@ +use cw_orch::{ + daemon::networks::{PION_1, XION_TESTNET_1}, + tokio::runtime::Runtime, +}; +use cw_orch_interchain::prelude::*; +use hermes_relayer::relayer::HermesRelayer; +use old_ibc_relayer_types::core::ics24_host::identifier::PortId; + +pub fn main() -> cw_orch::anyhow::Result<()> { + dotenv::dotenv()?; + pretty_env_logger::init(); + let rt = Runtime::new()?; + + let relayer = HermesRelayer::new( + rt.handle(), + vec![ + ( + PION_1, + None, + true, + "https://rpc-falcron.pion-1.ntrn.tech/".to_string(), + ), + ( + XION_TESTNET_1, + None, + false, + "https://xion-testnet-rpc.polkachu.com".to_string(), + ), + ], + vec![( + ( + XION_TESTNET_1.chain_id.to_string(), + PION_1.chain_id.to_string(), + ), + "connection-63".to_string(), + )] + .into_iter() + .collect(), + )?; + + let interchain = relayer.interchain_env(); + + interchain.create_channel( + "xion-testnet-1", + "pion-1", + &PortId::transfer(), + &PortId::transfer(), + "ics20-1", + None, + )?; + + Ok(()) +} diff --git a/packages/interchain/hermes-relayer/src/channel.rs b/packages/interchain/hermes-relayer/src/channel.rs new file mode 100644 index 000000000..e69de29bb diff --git a/packages/interchain/hermes-relayer/src/config.rs b/packages/interchain/hermes-relayer/src/config.rs new file mode 100644 index 000000000..b5da234ed --- /dev/null +++ b/packages/interchain/hermes-relayer/src/config.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use cw_orch_core::environment::ChainInfoOwned; +use ibc_relayer::chain::cosmos::config::CosmosSdkConfig; +use ibc_relayer::config::gas_multiplier::GasMultiplier; +use ibc_relayer::config::types::{MaxMsgNum, MaxTxSize, Memo}; +use ibc_relayer::config::{AddressType, ChainConfig, EventSourceMode, GasPrice, RefreshRate}; +use ibc_relayer::keyring::Store; +use ibc_relayer_types::core::ics02_client::trust_threshold::TrustThreshold; +use ibc_relayer_types::core::ics24_host::identifier::{self}; + +pub const KEY_NAME: &str = "relayer"; + +pub fn chain_config( + chain: &str, + rpc_url: &str, + chain_data: &ChainInfoOwned, + is_consumer_chain: bool, +) -> ChainConfig { + ChainConfig::CosmosSdk(CosmosSdkConfig { + id: identifier::ChainId::from_string(chain), + + rpc_addr: rpc_url.parse().unwrap(), + grpc_addr: chain_data.grpc_urls[0].parse().unwrap(), + event_source: EventSourceMode::Pull { + interval: Duration::from_secs(4), + max_retries: 4, + }, + rpc_timeout: Duration::from_secs(10), + trusted_node: false, + account_prefix: chain_data.network_info.pub_address_prefix.to_string(), + key_name: KEY_NAME.to_string(), + key_store_type: Store::Memory, + key_store_folder: None, + store_prefix: "ibc".to_string(), + default_gas: Some(100000), + max_gas: Some(2000000), + genesis_restart: None, + gas_adjustment: None, + gas_multiplier: Some(GasMultiplier::new(1.3).unwrap()), + fee_granter: None, + max_msg_num: MaxMsgNum::new(30).unwrap(), + max_tx_size: MaxTxSize::new(180000).unwrap(), + max_grpc_decoding_size: 33554432u64.into(), + clock_drift: Duration::from_secs(5), + max_block_time: Duration::from_secs(30), + trusting_period: None, + ccv_consumer_chain: is_consumer_chain, + memo_prefix: Memo::new("").unwrap(), + sequential_batch_tx: false, + proof_specs: None, + trust_threshold: TrustThreshold::new(1, 3).unwrap(), + gas_price: GasPrice::new(chain_data.gas_price, chain_data.gas_denom.to_string()), + packet_filter: Default::default(), + address_type: AddressType::Cosmos, + extension_options: Default::default(), + query_packets_chunk_size: 10, + client_refresh_rate: RefreshRate::new(5, 1), + memo_overwrite: Default::default(), + dynamic_gas_price: Default::default(), + compat_mode: Default::default(), + clear_interval: Default::default(), + excluded_sequences: Default::default(), + }) +} diff --git a/packages/interchain/hermes-relayer/src/core.rs b/packages/interchain/hermes-relayer/src/core.rs new file mode 100644 index 000000000..388b90015 --- /dev/null +++ b/packages/interchain/hermes-relayer/src/core.rs @@ -0,0 +1,79 @@ +use std::collections::HashMap; + +use cw_orch_core::environment::ChainInfoOwned; +use cw_orch_core::environment::ChainState; +use cw_orch_daemon::Daemon; +use cw_orch_interchain_daemon::{IcDaemonResult, Mnemonic}; +use tokio::runtime::Handle; +#[derive(Clone)] +pub struct HermesRelayer { + /// Daemon objects representing all the chains available inside the starship environment + pub daemons: HashMap, + /// Runtime handle for awaiting async functions + pub rt_handle: Handle, + + pub connection_ids: HashMap<(String, String), String>, +} + +impl HermesRelayer { + /// Builds a new `InterchainEnv` instance. + /// For use with starship, we advise to use `Starship::interchain_env` instead + pub fn new( + runtime: &Handle, + chains: Vec<(T, Option, bool, String)>, + connections: HashMap<(String, String), String>, + ) -> IcDaemonResult + where + T: Into, + { + let mut env = Self::raw(runtime); + + // We create daemons for each chains + for (chain_data, mnemonic, is_consumer_chain, rpc) in chains { + let daemon = env.build_daemon(runtime, chain_data.into(), mnemonic)?; + env.daemons.insert( + daemon.state().chain_data.chain_id.to_string(), + (daemon, is_consumer_chain, rpc), + ); + } + env.connection_ids = connections; + + Ok(env) + } + + fn raw(rt: &Handle) -> Self { + Self { + daemons: HashMap::new(), + rt_handle: rt.clone(), + connection_ids: Default::default(), + } + } + + /// Build a daemon from chain data and mnemonic and add it to the current configuration + fn build_daemon( + &mut self, + runtime: &Handle, + chain_data: ChainInfoOwned, + mnemonic: Option, + ) -> IcDaemonResult { + let mut daemon_builder = Daemon::builder(); + let mut daemon_builder = daemon_builder.chain(chain_data.clone()).handle(runtime); + + daemon_builder = if let Some(mn) = mnemonic { + daemon_builder.mnemonic(mn) + } else { + daemon_builder + }; + + // State is shared between daemons, so if a daemon already exists, we use its state + daemon_builder = if let Some((daemon, _, _)) = self.daemons.values().next() { + daemon_builder.state(daemon.state()) + } else { + daemon_builder + }; + + let daemon = daemon_builder.build().unwrap(); + + Ok(daemon) + } +} diff --git a/packages/interchain/hermes-relayer/src/interchain_env.rs b/packages/interchain/hermes-relayer/src/interchain_env.rs new file mode 100644 index 000000000..3f3718144 --- /dev/null +++ b/packages/interchain/hermes-relayer/src/interchain_env.rs @@ -0,0 +1,344 @@ +use cosmwasm_std::IbcOrder; +use cw_orch_core::environment::{ChainInfoOwned, ChainState, IndexResponse}; +use cw_orch_daemon::queriers::{Ibc, Node}; +use cw_orch_daemon::{CosmTxResponse, Daemon, DaemonError}; +use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; +use cw_orch_interchain_core::env::{ChainId, ChannelCreation}; +use cw_orch_interchain_core::InterchainEnv; + +use crate::relayer::HermesRelayer; +use cw_orch_interchain_daemon::packet_inspector::PacketInspector; +use cw_orch_interchain_daemon::InterchainDaemonError; +use old_ibc_relayer_types::core::ics04_channel::packet::Sequence; +use old_ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; +use tokio::time::sleep; +use tonic::transport::Channel; + +use cw_orch_interchain_core::types::{ + ChannelCreationTransactionsResult, IbcTxAnalysis, InternalChannelCreationResult, NetworkId, + SimpleIbcPacketAnalysis, +}; +use cw_orch_interchain_daemon::ChannelCreator; +use futures::future::try_join4; +use std::collections::HashMap; +use std::str::FromStr; +use std::time::Duration; +use tokio::runtime::Handle; + +impl InterchainEnv for HermesRelayer { + type ChannelCreationResult = (); + + type Error = InterchainDaemonError; + + /// Get the daemon for a network-id in the interchain. + fn chain(&self, chain_id: impl ToString) -> Result { + self.daemons + .get(&chain_id.to_string()) + .map(|(d, _, _)| d) + .ok_or(InterchainDaemonError::DaemonNotFound(chain_id.to_string())) + .cloned() + } + + // In a daemon environmment, you don't create a channel between 2 chains, instead you just do it with external tools and returns here when the channel is ready + fn _internal_create_channel( + &self, + src_chain: ChainId, + dst_chain: ChainId, + src_port: &PortId, + dst_port: &PortId, + version: &str, + order: Option, + ) -> Result, Self::Error> { + let connection_id = + self.create_ibc_channel(src_chain, dst_chain, src_port, dst_port, version, order)?; + + Ok(InternalChannelCreationResult { + result: (), + src_connection_id: connection_id, + }) + } + + // This function creates a channel and returns the 4 transactions hashes for channel creation + fn get_channel_creation_txs( + &self, + src_chain: ChainId, + ibc_channel: &mut InterchainChannel, + _channel_creation_result: (), + ) -> Result, Self::Error> { + let (src_port, dst_port) = ibc_channel.get_mut_ordered_ports_from(src_chain)?; + + // We start by getting the connection-id of the counterparty chain + let connection_end = self.rt_handle.block_on( + Ibc::new_async(src_port.chain.clone()) + ._connection_end(src_port.connection_id.clone().unwrap()), + )?; + + dst_port.connection_id = Some(connection_end.unwrap().counterparty.unwrap().connection_id); + + // Then we make sure the channel is indeed created between the two chains + let channel_creation = self + .rt_handle + .block_on(self.find_channel_creation_tx(src_chain, ibc_channel))?; + + let src_channel_id = channel_creation + .ack + .event_attr_value("channel_open_ack", "channel_id")?; + let dst_channel_id = channel_creation + .confirm + .event_attr_value("channel_open_confirm", "channel_id")?; + + log::info!("Successfully created a channel between {} and {} on '{}:{}' and channels {}:'{}'(txhash : {}) and {}:'{}' (txhash : {})", + ibc_channel.port_a.port.clone(), + ibc_channel.port_b.port.clone(), + ibc_channel.port_a.connection_id.clone().unwrap(), + ibc_channel.port_b.connection_id.clone().unwrap(), + ibc_channel.port_a.chain_id.clone(), + src_channel_id, + channel_creation.ack.txhash, + ibc_channel.port_b.chain_id.clone(), + dst_channel_id, + channel_creation.confirm.txhash, + ); + + Ok(ChannelCreationTransactionsResult { + src_channel_id: ChannelId::from_str(&src_channel_id)?, + dst_channel_id: ChannelId::from_str(&dst_channel_id)?, + channel_creation_txs: channel_creation, + }) + } + + // This function follows every IBC packet sent out in a tx result + fn wait_ibc( + &self, + chain_id: ChainId, + tx_response: CosmTxResponse, + ) -> Result, Self::Error> { + log::info!( + target: chain_id, + "Investigating sent packet events on tx {}", + tx_response.txhash + ); + + // We crate an interchain env object that is safe to send between threads + let interchain_env = self.rt_handle.block_on(PacketInspector::new( + self.daemons.values().map(|(d, _, _)| d).collect(), + ))?; + + // We follow the trail + let ibc_trail = self + .rt_handle + .block_on(interchain_env.wait_ibc(chain_id.to_string(), tx_response))?; + + Ok(ibc_trail) + } + + // This function follow the execution of an IBC packet across the chain + fn follow_packet( + &self, + src_chain: ChainId, + src_port: PortId, + src_channel: ChannelId, + dst_chain: ChainId, + sequence: Sequence, + ) -> Result, Self::Error> { + // We crate an interchain env object that is safe to send between threads + let interchain_env = self.rt_handle.block_on(PacketInspector::new( + self.daemons.values().map(|(d, _, _)| d).collect(), + ))?; + + // We try to relay the packets using the HERMES relayer + + self.force_packet_relay()?; + + // We follow the trail + let ibc_trail = self.rt_handle.block_on(interchain_env.follow_packet( + src_chain, + src_port, + src_channel, + dst_chain, + sequence, + ))?; + + Ok(ibc_trail) + } +} + +impl HermesRelayer { + /// This function follows every IBC packet sent out in a tx result + /// This allows only providing the transaction hash when you don't have access to the whole response object + pub fn wait_ibc_from_txhash( + &self, + chain_id: ChainId, + packet_send_tx_hash: String, + ) -> Result, InterchainDaemonError> { + let grpc_channel1 = self.chain(chain_id)?.channel(); + + let tx = self.rt_handle.block_on( + Node::new_async(grpc_channel1.clone())._find_tx(packet_send_tx_hash.clone()), + )?; + + let ibc_trail = self.wait_ibc(chain_id, tx)?; + + Ok(ibc_trail) + } + + async fn find_channel_creation_tx<'a>( + &self, + src_chain: ChainId<'a>, + ibc_channel: &InterchainChannel, + ) -> Result, InterchainDaemonError> { + for _ in 0..5 { + match self.get_last_channel_creation(src_chain, ibc_channel).await { + Ok(tx) => { + if tx.init.is_some() + && tx.r#try.is_some() + && tx.ack.is_some() + && tx.confirm.is_some() + { + let creation = ChannelCreation { + init: tx.init.unwrap(), + r#try: tx.r#try.unwrap(), + ack: tx.ack.unwrap(), + confirm: tx.confirm.unwrap(), + }; + return Ok(creation); + } + log::debug!("No new TX by events found"); + log::debug!("Waiting 20s"); + sleep(Duration::from_secs(20)).await; + } + Err(e) => { + log::debug!("{:?}", e); + break; + } + } + } + + Err(InterchainDaemonError::ChannelCreationEventsNotFound { + src_chain: src_chain.to_string(), + channel: ibc_channel.clone(), + }) + } + + /// Queries the last transactions that is related to creating a channel from chain from to the counterparty chain defined in the structure + async fn get_last_channel_creation<'a>( + &self, + src_chain: ChainId<'a>, + ibc_channel: &InterchainChannel, + ) -> Result>, InterchainDaemonError> { + let (channel_init, channel_try, channel_ack, channel_confirm) = try_join4( + self.get_channel_creation_init(src_chain, ibc_channel), + self.get_channel_creation_try(src_chain, ibc_channel), + self.get_channel_creation_ack(src_chain, ibc_channel), + self.get_channel_creation_confirm(src_chain, ibc_channel), + ) + .await?; + + Ok(ChannelCreation::new( + channel_init, + channel_try, + channel_ack, + channel_confirm, + )) + } + + async fn get_channel_creation_init<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + ) -> Result, InterchainDaemonError> { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let channel_creation_events_init_events = vec![ + format!("channel_open_init.port_id='{}'", src_port.port), + format!("channel_open_init.counterparty_port_id='{}'", dst_port.port), + format!( + "channel_open_init.connection_id='{}'", + src_port.connection_id.clone().unwrap() + ), + ]; + + Ok(find_one_tx_by_events(src_port, channel_creation_events_init_events).await?) + } + + async fn get_channel_creation_try<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + ) -> Result, InterchainDaemonError> { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let channel_creation_try_events = vec![ + format!("channel_open_try.port_id='{}'", dst_port.port), + format!("channel_open_try.counterparty_port_id='{}'", src_port.port), + format!( + "channel_open_try.connection_id='{}'", + dst_port.connection_id.clone().unwrap() + ), + ]; + + log::debug!( + "Try {} {:?}", + dst_port.chain_id, + channel_creation_try_events + ); + + Ok(find_one_tx_by_events(dst_port, channel_creation_try_events).await?) + } + + async fn get_channel_creation_ack<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + ) -> Result, InterchainDaemonError> { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let channel_creation_ack_events = vec![ + format!("channel_open_ack.port_id='{}'", src_port.port), + format!("channel_open_ack.counterparty_port_id='{}'", dst_port.port), + format!( + "channel_open_ack.connection_id='{}'", + src_port.connection_id.clone().unwrap() + ), + ]; + + Ok(find_one_tx_by_events(src_port, channel_creation_ack_events).await?) + } + + async fn get_channel_creation_confirm<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + ) -> Result, InterchainDaemonError> { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let channel_creation_confirm_events = vec![ + format!("channel_open_confirm.port_id='{}'", dst_port.port), + format!( + "channel_open_confirm.counterparty_port_id='{}'", + src_port.port + ), + format!( + "channel_open_confirm.connection_id='{}'", + dst_port.connection_id.clone().unwrap() + ), + ]; + + Ok(find_one_tx_by_events(dst_port, channel_creation_confirm_events).await?) + } +} + +async fn find_one_tx_by_events( + port: IbcPort, + events: Vec, +) -> Result, DaemonError> { + let optional_tx = Node::new_async(port.chain.clone()) + ._find_tx_by_events( + events, + None, + Some(cosmrs::proto::cosmos::tx::v1beta1::OrderBy::Desc), + ) + .await?; + + Ok(optional_tx.first().cloned()) +} diff --git a/packages/interchain/hermes-relayer/src/keys.rs b/packages/interchain/hermes-relayer/src/keys.rs new file mode 100644 index 000000000..5e1e88cb4 --- /dev/null +++ b/packages/interchain/hermes-relayer/src/keys.rs @@ -0,0 +1,29 @@ +use cw_orch_core::environment::ChainInfoOwned; +use hdpath::{Purpose, StandardHDPath}; +use ibc_relayer::{ + config::AddressType, + keyring::{AnySigningKeyPair, Secp256k1KeyPair, SigningKeyPair}, +}; + +pub fn restore_key( + mnemonic: String, + hdpath_index: u32, + chain_data: &ChainInfoOwned, +) -> anyhow::Result { + let hdpath = StandardHDPath::new( + Purpose::Pubkey, + chain_data.network_info.coin_type, + 0, + 0, + hdpath_index, + ); + + let key_pair = Secp256k1KeyPair::from_mnemonic( + &mnemonic, + &hdpath, + &AddressType::Cosmos, + &chain_data.network_info.pub_address_prefix, + )?; + + Ok(key_pair.into()) +} diff --git a/packages/interchain/hermes-relayer/src/lib.rs b/packages/interchain/hermes-relayer/src/lib.rs new file mode 100644 index 000000000..c8aa022d7 --- /dev/null +++ b/packages/interchain/hermes-relayer/src/lib.rs @@ -0,0 +1,5 @@ +pub mod config; +pub mod interchain_env; +pub mod keys; +pub mod relayer; +pub mod core; \ No newline at end of file diff --git a/packages/interchain/hermes-relayer/src/packet.rs b/packages/interchain/hermes-relayer/src/packet.rs new file mode 100644 index 000000000..e69de29bb diff --git a/packages/interchain/hermes-relayer/src/relayer.rs b/packages/interchain/hermes-relayer/src/relayer.rs new file mode 100644 index 000000000..eb392e82a --- /dev/null +++ b/packages/interchain/hermes-relayer/src/relayer.rs @@ -0,0 +1,149 @@ +use std::collections::HashMap; + +use crate::config::{chain_config, KEY_NAME}; +use crate::keys::restore_key; +use cosmwasm_std::IbcOrder; +use cw_orch_core::environment::ChainInfoOwned; +use cw_orch_core::environment::ChainState; +use cw_orch_daemon::Daemon; +use cw_orch_interchain_core::env::ChainId; +use cw_orch_interchain_daemon::ChannelCreator; +use cw_orch_interchain_daemon::DaemonInterchainEnv; +use cw_orch_interchain_daemon::InterchainDaemonError; +use cw_orch_interchain_daemon::{IcDaemonResult, Mnemonic}; +use ibc_relayer::chain::requests::{IncludeProof, QueryClientStateRequest, QueryConnectionRequest}; +use ibc_relayer::chain::{handle::ChainHandle, requests::QueryHeight}; +use ibc_relayer::channel::Channel; +use ibc_relayer::config::{Config, RestConfig, TelemetryConfig}; +use ibc_relayer::connection::Connection; +use ibc_relayer::foreign_client::ForeignClient; +use ibc_relayer_cli::cli_utils::spawn_chain_runtime; +use ibc_relayer_types::core::ics03_connection::connection::IdentifiedConnectionEnd; +use ibc_relayer_types::core::ics04_channel::channel::Ordering; +use ibc_relayer_types::core::ics24_host::identifier::{self}; +use tokio::runtime::Handle; + +impl ChannelCreator for HermesRelayer { + fn create_ibc_channel( + &self, + src_chain: ChainId, + dst_chain: ChainId, + src_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, + dst_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, + version: &str, + order: Option, + ) -> Result { + // TODO connection should be a parameter + let src_connection = self + .connection_ids + .get(&(src_chain.to_string(), dst_chain.to_string())) + .unwrap(); + + let (src_daemon, src_is_consumer_chain, src_rpc_url) = self.daemons.get(src_chain).unwrap(); + let src_chain_data = &src_daemon.state().chain_data; + let src_hd_path = src_daemon.wallet().options().hd_index; + + let (dst_daemon, dst_is_consumer_chain, dst_rpc_url) = self.daemons.get(dst_chain).unwrap(); + let dst_chain_data = &dst_daemon.state().chain_data; + let dst_hd_path = dst_daemon.wallet().options().hd_index; + + let mnemonic = std::env::var("TEST_MNEMONIC").unwrap(); + + let config = Config { + global: ibc_relayer::config::GlobalConfig { + log_level: ibc_relayer::config::LogLevel::Debug, + }, + mode: ibc_relayer::config::ModeConfig::default(), + rest: RestConfig::default(), + telemetry: TelemetryConfig::default(), + chains: vec![ + chain_config( + src_chain, + src_rpc_url, + src_chain_data, + *src_is_consumer_chain, + ), + chain_config( + dst_chain, + dst_rpc_url, + dst_chain_data, + *dst_is_consumer_chain, + ), + ], + tracing_server: Default::default(), + }; + + // Validate & spawn runtime for side a. + let chain_a = + spawn_chain_runtime(&config, &identifier::ChainId::from_string(src_chain)).unwrap(); + + let src_key = + restore_key(mnemonic.clone(), src_hd_path.unwrap_or(0), src_chain_data).unwrap(); + chain_a.add_key(KEY_NAME.to_string(), src_key).unwrap(); + + // Query the connection end. + let (conn_end, _) = chain_a + .query_connection( + QueryConnectionRequest { + connection_id: src_connection.parse().unwrap(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .unwrap(); + + // Query the client state, obtain the identifier of chain b. + let chain_b = chain_a + .query_client_state( + QueryClientStateRequest { + client_id: conn_end.client_id().clone(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .map(|(cs, _)| cs.chain_id()) + .unwrap(); + + // Spawn the runtime for side b. + let chain_b = spawn_chain_runtime(&config, &chain_b).unwrap(); + let dst_key = restore_key(mnemonic, dst_hd_path.unwrap_or(0), dst_chain_data).unwrap(); + chain_b.add_key(KEY_NAME.to_string(), dst_key).unwrap(); + + // Create the foreign client handles. + let client_a = + ForeignClient::find(chain_b.clone(), chain_a.clone(), conn_end.client_id()).unwrap(); + + let client_b = + ForeignClient::find(chain_a, chain_b, conn_end.counterparty().client_id()).unwrap(); + + let identified_end = + IdentifiedConnectionEnd::new(src_connection.parse().unwrap(), conn_end); + + let connection = Connection::find(client_a, client_b, &identified_end).unwrap(); + + let channel = Channel::new( + connection, + cosmwasm_to_hermes_order(order), + src_port.to_string().parse().unwrap(), + dst_port.to_string().parse().unwrap(), + Some(version.to_string().into()), + ) + .unwrap(); + + Ok(src_connection.to_string()) + } + + fn interchain_env(&self) -> cw_orch_interchain_daemon::DaemonInterchainEnv { + panic!(); + } +} + +fn cosmwasm_to_hermes_order(order: Option) -> Ordering { + match order { + Some(order) => match order { + IbcOrder::Unordered => Ordering::Unordered, + IbcOrder::Ordered => Ordering::Ordered, + }, + None => Ordering::Unordered, + } +} diff --git a/packages/interchain/interchain-daemon/src/interchain_env.rs b/packages/interchain/interchain-daemon/src/interchain_env.rs index b0ec67507..ff2b6f649 100644 --- a/packages/interchain/interchain-daemon/src/interchain_env.rs +++ b/packages/interchain/interchain-daemon/src/interchain_env.rs @@ -41,7 +41,7 @@ pub struct DaemonInterchainEnv { rt_handle: Handle, } -type Mnemonic = String; +pub type Mnemonic = String; impl DaemonInterchainEnv { /// Builds a new `InterchainEnv` instance. diff --git a/packages/interchain/interchain-daemon/src/lib.rs b/packages/interchain/interchain-daemon/src/lib.rs index 7640acd1e..69b685bbb 100644 --- a/packages/interchain/interchain-daemon/src/lib.rs +++ b/packages/interchain/interchain-daemon/src/lib.rs @@ -19,3 +19,4 @@ pub type IcDaemonResult = Result; pub use channel_creator::{ChannelCreationValidator, ChannelCreator}; pub use interchain_env::DaemonInterchainEnv; +pub use interchain_env::Mnemonic; diff --git a/packages/interchain/interchain-daemon/src/packet_inspector.rs b/packages/interchain/interchain-daemon/src/packet_inspector.rs index 8689a3936..8ae9c6a7c 100644 --- a/packages/interchain/interchain-daemon/src/packet_inspector.rs +++ b/packages/interchain/interchain-daemon/src/packet_inspector.rs @@ -30,7 +30,7 @@ use std::collections::HashMap; /// This struct is safe to be sent between threads /// In contrary to InterchainStructure that holds Daemon in its definition which is not sendable #[derive(Default, Clone)] -pub(crate) struct PacketInspector { +pub struct PacketInspector { registered_chains: HashMap, } From c2a172e96a1b5a6a13301b0664fa04e53d3a5d2a Mon Sep 17 00:00:00 2001 From: Kayanski Date: Thu, 20 Jun 2024 17:19:57 +0000 Subject: [PATCH 2/5] cw-orch packet relaying mvp --- packages/interchain/hermes-relayer/Cargo.toml | 5 + .../hermes-relayer/examples/pion-xion.rs | 55 +- .../interchain/hermes-relayer/src/channel.rs | 117 ++++ .../interchain/hermes-relayer/src/core.rs | 71 ++ .../hermes-relayer/src/interchain_env.rs | 18 +- packages/interchain/hermes-relayer/src/lib.rs | 6 +- .../interchain/hermes-relayer/src/packet.rs | 136 ++++ .../hermes-relayer/src/packet_inspector.rs | 625 ++++++++++++++++++ .../interchain/hermes-relayer/src/relayer.rs | 149 ----- .../interchain-daemon/src/interchain_env.rs | 1 + 10 files changed, 1021 insertions(+), 162 deletions(-) create mode 100644 packages/interchain/hermes-relayer/src/packet_inspector.rs delete mode 100644 packages/interchain/hermes-relayer/src/relayer.rs diff --git a/packages/interchain/hermes-relayer/Cargo.toml b/packages/interchain/hermes-relayer/Cargo.toml index d3aa04f0b..9daba6bf8 100644 --- a/packages/interchain/hermes-relayer/Cargo.toml +++ b/packages/interchain/hermes-relayer/Cargo.toml @@ -24,6 +24,10 @@ log.workspace = true cosmrs.workspace = true futures = "0.3.30" tonic.workspace = true +cw-orch-traits = { version = "0.22.0", path = "../../cw-orch-traits" } +futures-util = "0.3.30" +async-recursion = "1.1.1" +serde_json.workspace = true [dev-dependencies] cw-orch = { path = "../../../cw-orch", features = ["daemon"] } @@ -31,4 +35,5 @@ cw-orch-interchain = { path = "../../../cw-orch-interchain", features = [ "daemon", ] } dotenv = "0.15.0" +ibc-proto = "0.32.0" pretty_env_logger = "0.5.0" diff --git a/packages/interchain/hermes-relayer/examples/pion-xion.rs b/packages/interchain/hermes-relayer/examples/pion-xion.rs index f63dfbf0a..446fa2d05 100644 --- a/packages/interchain/hermes-relayer/examples/pion-xion.rs +++ b/packages/interchain/hermes-relayer/examples/pion-xion.rs @@ -1,10 +1,18 @@ +use cosmrs::Any; +use cw_orch::prelude::*; use cw_orch::{ daemon::networks::{PION_1, XION_TESTNET_1}, tokio::runtime::Runtime, }; use cw_orch_interchain::prelude::*; -use hermes_relayer::relayer::HermesRelayer; +use cw_orch_traits::Stargate; +use hermes_relayer::core::HermesRelayer; use old_ibc_relayer_types::core::ics24_host::identifier::PortId; +use old_ibc_relayer_types::tx_msg::Msg; +use old_ibc_relayer_types::{ + applications::transfer::msgs::transfer::MsgTransfer, + core::ics04_channel::timeout::TimeoutHeight, timestamp::Timestamp, +}; pub fn main() -> cw_orch::anyhow::Result<()> { dotenv::dotenv()?; @@ -38,9 +46,7 @@ pub fn main() -> cw_orch::anyhow::Result<()> { .collect(), )?; - let interchain = relayer.interchain_env(); - - interchain.create_channel( + let channel = relayer.create_channel( "xion-testnet-1", "pion-1", &PortId::transfer(), @@ -49,5 +55,46 @@ pub fn main() -> cw_orch::anyhow::Result<()> { None, )?; + let xion = relayer.chain("xion-testnet-1")?; + let pion = relayer.chain("pion-1")?; + + let msg = MsgTransfer { + source_port: PortId::transfer(), + source_channel: channel + .interchain_channel + .get_chain("xion-testnet-1")? + .channel + .unwrap(), + token: ibc_proto::cosmos::base::v1beta1::Coin { + denom: "uxion".to_string(), + amount: "1987".to_string(), + }, + + sender: xion.sender().to_string().parse().unwrap(), + receiver: pion.sender().to_string().parse().unwrap(), + timeout_height: TimeoutHeight::Never, + timeout_timestamp: Timestamp::from_nanoseconds(1_800_000_000_000_000_000)?, + memo: None, + }; + let response = xion.commit_any::( + vec![Any { + type_url: msg.type_url(), + value: msg.to_any().value, + }], + None, + )?; + + relayer.follow_packet( + "xion-testnet-1", + PortId::transfer(), + channel + .interchain_channel + .get_chain("xion-testnet-1")? + .channel + .unwrap(), + "pion-1", + 1.into(), + )?; + Ok(()) } diff --git a/packages/interchain/hermes-relayer/src/channel.rs b/packages/interchain/hermes-relayer/src/channel.rs index e69de29bb..daf1eb7de 100644 --- a/packages/interchain/hermes-relayer/src/channel.rs +++ b/packages/interchain/hermes-relayer/src/channel.rs @@ -0,0 +1,117 @@ +use crate::config::KEY_NAME; +use crate::core::HermesRelayer; +use crate::keys::restore_key; +use cosmwasm_std::IbcOrder; +use cw_orch_core::environment::ChainState; +use cw_orch_interchain_core::env::ChainId; +use cw_orch_interchain_core::InterchainEnv; +use cw_orch_interchain_daemon::ChannelCreator; +use cw_orch_interchain_daemon::InterchainDaemonError; +use ibc_relayer::chain::requests::{IncludeProof, QueryClientStateRequest, QueryConnectionRequest}; +use ibc_relayer::chain::{handle::ChainHandle, requests::QueryHeight}; +use ibc_relayer::channel::Channel; +use ibc_relayer::connection::Connection; +use ibc_relayer::foreign_client::ForeignClient; +use ibc_relayer_cli::cli_utils::spawn_chain_runtime; +use ibc_relayer_types::core::ics03_connection::connection::IdentifiedConnectionEnd; +use ibc_relayer_types::core::ics04_channel::channel::Ordering; +use ibc_relayer_types::core::ics24_host::identifier::{self}; + +impl ChannelCreator for HermesRelayer { + fn create_ibc_channel( + &self, + src_chain: ChainId, + dst_chain: ChainId, + src_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, + dst_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, + version: &str, + order: Option, + ) -> Result { + // TODO connection should be a parameter + let src_connection = self + .connection_ids + .get(&(src_chain.to_string(), dst_chain.to_string())) + .unwrap(); + + let (src_daemon, _, _) = self.daemons.get(src_chain).unwrap(); + let src_chain_data = &src_daemon.state().chain_data; + let src_hd_path = src_daemon.wallet().options().hd_index; + + let (dst_daemon, _, _) = self.daemons.get(dst_chain).unwrap(); + let dst_chain_data = &dst_daemon.state().chain_data; + let dst_hd_path = dst_daemon.wallet().options().hd_index; + + let mnemonic = std::env::var("TEST_MNEMONIC").unwrap(); + + let config = self.duplex_config(src_chain, dst_chain); + + // Validate & spawn runtime for side a. + let chain_a = + spawn_chain_runtime(&config, &identifier::ChainId::from_string(src_chain)).unwrap(); + + self.add_key(&chain_a); + // Query the connection end. + let (conn_end, _) = chain_a + .query_connection( + QueryConnectionRequest { + connection_id: src_connection.parse().unwrap(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .unwrap(); + + // Query the client state, obtain the identifier of chain b. + let chain_b = chain_a + .query_client_state( + QueryClientStateRequest { + client_id: conn_end.client_id().clone(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .map(|(cs, _)| cs.chain_id()) + .unwrap(); + + // Spawn the runtime for side b. + let chain_b = spawn_chain_runtime(&config, &chain_b).unwrap(); + self.add_key(&chain_b); + + // Create the foreign client handles. + let client_a = + ForeignClient::find(chain_b.clone(), chain_a.clone(), conn_end.client_id()).unwrap(); + + let client_b = + ForeignClient::find(chain_a, chain_b, conn_end.counterparty().client_id()).unwrap(); + + let identified_end = + IdentifiedConnectionEnd::new(src_connection.parse().unwrap(), conn_end); + + let connection = Connection::find(client_a, client_b, &identified_end).unwrap(); + + let channel = Channel::new( + connection, + cosmwasm_to_hermes_order(order), + src_port.to_string().parse().unwrap(), + dst_port.to_string().parse().unwrap(), + Some(version.to_string().into()), + ) + .unwrap(); + + Ok(src_connection.to_string()) + } + + fn interchain_env(&self) -> cw_orch_interchain_daemon::DaemonInterchainEnv { + unimplemented!() + } +} + +fn cosmwasm_to_hermes_order(order: Option) -> Ordering { + match order { + Some(order) => match order { + IbcOrder::Unordered => Ordering::Unordered, + IbcOrder::Ordered => Ordering::Ordered, + }, + None => Ordering::Unordered, + } +} diff --git a/packages/interchain/hermes-relayer/src/core.rs b/packages/interchain/hermes-relayer/src/core.rs index 388b90015..bc46cc177 100644 --- a/packages/interchain/hermes-relayer/src/core.rs +++ b/packages/interchain/hermes-relayer/src/core.rs @@ -3,8 +3,18 @@ use std::collections::HashMap; use cw_orch_core::environment::ChainInfoOwned; use cw_orch_core::environment::ChainState; use cw_orch_daemon::Daemon; +use cw_orch_interchain_core::env::ChainId; +use cw_orch_interchain_core::InterchainEnv; use cw_orch_interchain_daemon::{IcDaemonResult, Mnemonic}; +use ibc_relayer::chain::handle::ChainHandle; use tokio::runtime::Handle; + +use ibc_relayer::config::{Config, RestConfig, TelemetryConfig}; + +use crate::config::chain_config; +use crate::config::KEY_NAME; +use crate::keys::restore_key; + #[derive(Clone)] pub struct HermesRelayer { /// Daemon objects representing all the chains available inside the starship environment @@ -41,6 +51,20 @@ impl HermesRelayer { Ok(env) } + /// This creates an interchain environment from existing daemon instances + /// The `channel_creator` argument will be responsible for creation interchain channel + /// If using starship, prefer using Starship::interchain_env for environment creation + pub fn from_daemons(rt: &Handle, daemons: Vec<(Daemon, bool, String)>) -> Self { + let mut env = Self::raw(rt); + for (daemon, is_consumer_chain, rpc) in daemons { + env.daemons.insert( + daemon.state().chain_data.chain_id.to_string(), + (daemon, is_consumer_chain, rpc), + ); + } + env + } + fn raw(rt: &Handle) -> Self { Self { daemons: HashMap::new(), @@ -76,4 +100,51 @@ impl HermesRelayer { Ok(daemon) } + + pub fn duplex_config(&self, src_chain: ChainId, dst_chain: ChainId) -> Config { + let (src_daemon, src_is_consumer_chain, src_rpc_url) = self.daemons.get(src_chain).unwrap(); + let src_chain_data = &src_daemon.state().chain_data; + + let (dst_daemon, dst_is_consumer_chain, dst_rpc_url) = self.daemons.get(dst_chain).unwrap(); + let dst_chain_data = &dst_daemon.state().chain_data; + + Config { + global: ibc_relayer::config::GlobalConfig { + log_level: ibc_relayer::config::LogLevel::Info, + }, + mode: ibc_relayer::config::ModeConfig::default(), + rest: RestConfig::default(), + telemetry: TelemetryConfig::default(), + chains: vec![ + chain_config( + src_chain, + src_rpc_url, + src_chain_data, + *src_is_consumer_chain, + ), + chain_config( + dst_chain, + dst_rpc_url, + dst_chain_data, + *dst_is_consumer_chain, + ), + ], + tracing_server: Default::default(), + } + } + + pub fn add_key(&self, chain: &impl ChainHandle) { + let chain_id = chain.config().unwrap().id().to_string(); + + let (daemon, _, rpc) = self.daemons.get(&chain_id).unwrap(); + + let chain_data = &daemon.state().chain_data; + let hd_path = daemon.wallet().options().hd_index; + let key = restore_key(self.mnemonic().clone(), hd_path.unwrap_or(0), chain_data).unwrap(); + chain.add_key(KEY_NAME.to_string(), key).unwrap(); + } + + fn mnemonic(&self) -> String { + std::env::var("TEST_MNEMONIC").unwrap() + } } diff --git a/packages/interchain/hermes-relayer/src/interchain_env.rs b/packages/interchain/hermes-relayer/src/interchain_env.rs index 3f3718144..cc216e6d2 100644 --- a/packages/interchain/hermes-relayer/src/interchain_env.rs +++ b/packages/interchain/hermes-relayer/src/interchain_env.rs @@ -1,13 +1,13 @@ use cosmwasm_std::IbcOrder; -use cw_orch_core::environment::{ChainInfoOwned, ChainState, IndexResponse}; +use cw_orch_core::environment::IndexResponse; use cw_orch_daemon::queriers::{Ibc, Node}; use cw_orch_daemon::{CosmTxResponse, Daemon, DaemonError}; use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; use cw_orch_interchain_core::env::{ChainId, ChannelCreation}; use cw_orch_interchain_core::InterchainEnv; -use crate::relayer::HermesRelayer; -use cw_orch_interchain_daemon::packet_inspector::PacketInspector; +use crate::core::HermesRelayer; +use crate::packet_inspector::PacketInspector; use cw_orch_interchain_daemon::InterchainDaemonError; use old_ibc_relayer_types::core::ics04_channel::packet::Sequence; use old_ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; @@ -15,15 +15,13 @@ use tokio::time::sleep; use tonic::transport::Channel; use cw_orch_interchain_core::types::{ - ChannelCreationTransactionsResult, IbcTxAnalysis, InternalChannelCreationResult, NetworkId, + ChannelCreationTransactionsResult, IbcTxAnalysis, InternalChannelCreationResult, SimpleIbcPacketAnalysis, }; use cw_orch_interchain_daemon::ChannelCreator; use futures::future::try_join4; -use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use tokio::runtime::Handle; impl InterchainEnv for HermesRelayer { type ChannelCreationResult = (); @@ -148,7 +146,13 @@ impl InterchainEnv for HermesRelayer { // We try to relay the packets using the HERMES relayer - self.force_packet_relay()?; + self.force_packet_relay( + src_chain, + src_port.clone(), + src_channel.clone(), + dst_chain, + sequence, + ); // We follow the trail let ibc_trail = self.rt_handle.block_on(interchain_env.follow_packet( diff --git a/packages/interchain/hermes-relayer/src/lib.rs b/packages/interchain/hermes-relayer/src/lib.rs index c8aa022d7..fcbc28a3c 100644 --- a/packages/interchain/hermes-relayer/src/lib.rs +++ b/packages/interchain/hermes-relayer/src/lib.rs @@ -1,5 +1,7 @@ +pub mod channel; pub mod config; +pub mod core; pub mod interchain_env; pub mod keys; -pub mod relayer; -pub mod core; \ No newline at end of file +pub mod packet; +pub mod packet_inspector; diff --git a/packages/interchain/hermes-relayer/src/packet.rs b/packages/interchain/hermes-relayer/src/packet.rs index e69de29bb..ebc4ff354 100644 --- a/packages/interchain/hermes-relayer/src/packet.rs +++ b/packages/interchain/hermes-relayer/src/packet.rs @@ -0,0 +1,136 @@ +use crate::{config::KEY_NAME, core::HermesRelayer, keys::restore_key}; +use cw_orch_core::environment::QuerierGetter; +use cw_orch_daemon::queriers::Ibc; +use cw_orch_interchain_core::env::ChainId; +use ibc_relayer::{ + chain::handle::ChainHandle, + link::{Link, LinkParameters}, +}; +use ibc_relayer_cli::cli_utils::ChainHandlePair; +use ibc_relayer_types::core::{ics04_channel, ics24_host::identifier}; +use old_ibc_relayer_types::core::{ + ics04_channel::packet::Sequence, + ics24_host::identifier::{ChannelId, PortId}, +}; + +impl HermesRelayer { + pub fn force_packet_relay( + &self, + src_chain: ChainId, + src_port: PortId, + src_channel: ChannelId, + dst_chain: ChainId, + sequence: Sequence, + ) { + self.receive_packet( + src_chain, + src_port.clone(), + src_channel.clone(), + dst_chain, + sequence, + ); + self.ack_packet( + src_chain, + src_port.clone(), + src_channel.clone(), + dst_chain, + sequence, + ); + } + + pub fn receive_packet( + &self, + src_chain: ChainId, + src_port: PortId, + src_channel: ChannelId, + dst_chain: ChainId, + sequence: Sequence, + ) { + let config = self.duplex_config(src_chain, dst_chain); + let chains = ChainHandlePair::spawn( + &config, + &identifier::ChainId::from_string(src_chain), + &identifier::ChainId::from_string(dst_chain), + ) + .unwrap(); + + let opts = LinkParameters { + src_port_id: src_port.to_string().parse().unwrap(), + src_channel_id: src_channel.to_string().parse().unwrap(), + max_memo_size: config.mode.packets.ics20_max_memo_size, + max_receiver_size: config.mode.packets.ics20_max_receiver_size, + + // Packets are only excluded when clearing + exclude_src_sequences: vec![], + }; + + self.add_key(&chains.src); + self.add_key(&chains.dst); + + let link = Link::new_from_opts(chains.src, chains.dst, opts, false, false).unwrap(); + + let sequence: u64 = sequence.into(); + let sequence = ics04_channel::packet::Sequence::from(sequence); + + let res = link + .relay_recv_packet_and_timeout_messages_with_packet_data_query_height( + vec![sequence..=sequence], + None, + ) + .unwrap(); + } + + pub fn ack_packet( + &self, + src_chain: ChainId, + src_port: PortId, + src_channel: ChannelId, + dst_chain: ChainId, + sequence: Sequence, + ) { + let config = self.duplex_config(src_chain, dst_chain); + + let chains = ChainHandlePair::spawn( + &config, + &identifier::ChainId::from_string(dst_chain), + &identifier::ChainId::from_string(src_chain), + ) + .unwrap(); + + let (d, _, _) = self.daemons.get(src_chain).unwrap(); + + let ibc: Ibc = d.querier(); + + let counterparty = d + .rt_handle + .block_on(ibc._channel(src_port.to_string(), src_channel.to_string())) + .unwrap() + .counterparty + .unwrap(); + + let opts = LinkParameters { + src_port_id: counterparty.port_id.to_string().parse().unwrap(), + src_channel_id: counterparty.channel_id.to_string().parse().unwrap(), + max_memo_size: config.mode.packets.ics20_max_memo_size, + max_receiver_size: config.mode.packets.ics20_max_receiver_size, + + // Packets are only excluded when clearing + exclude_src_sequences: vec![], + }; + + self.add_key(&chains.src); + self.add_key(&chains.dst); + + let link = Link::new_from_opts(chains.src, chains.dst, opts, false, false).unwrap(); + + let sequence: u64 = sequence.into(); + let sequence = ics04_channel::packet::Sequence::from(sequence); + + let res = link + .relay_ack_packet_messages_with_packet_data_query_height( + vec![sequence..=sequence], + None, + ) + .unwrap(); + } +} diff --git a/packages/interchain/hermes-relayer/src/packet_inspector.rs b/packages/interchain/hermes-relayer/src/packet_inspector.rs new file mode 100644 index 000000000..8b5142a4b --- /dev/null +++ b/packages/interchain/hermes-relayer/src/packet_inspector.rs @@ -0,0 +1,625 @@ +//! Module for tracking a specific packet inside the interchain + +use cw_orch_core::environment::{ChainInfoOwned, ChainState}; +use cw_orch_daemon::networks::parse_network; +use cw_orch_daemon::queriers::{Ibc, Node}; +use cw_orch_daemon::GrpcChannel; +use cw_orch_daemon::TxResultBlockEvent; +use cw_orch_daemon::{CosmTxResponse, Daemon, DaemonError}; +use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; +use cw_orch_interchain_core::env::ChainId; +use cw_orch_interchain_daemon::IcDaemonResult; +use futures_util::future::select_all; +use futures_util::FutureExt; +use ibc_relayer_types::core::ics04_channel::channel::State; + +use cw_orch_interchain_core::types::{ + FullIbcPacketAnalysis, IbcPacketAnalysis, IbcPacketInfo, IbcPacketOutcome, IbcTxAnalysis, + NetworkId, SimpleIbcPacketAnalysis, TxId, +}; +use cw_orch_interchain_daemon::InterchainDaemonError; + +use futures::future::try_join_all; +use old_ibc_relayer_types::core::ics04_channel::packet::Sequence; +use old_ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; +use tonic::transport::Channel; + +use std::collections::HashMap; + +use crate::core::HermesRelayer; + +/// Environment used to track IBC execution and updates on multiple chains. +/// This can be used to track specific IBC packets or get general information update on channels between multiple chains +/// This struct is safe to be sent between threads +/// In contrary to InterchainStructure that holds Daemon in its definition which is not sendable +#[derive(Default, Clone)] +pub struct PacketInspector { + registered_chains: HashMap, +} + +// / TODO, change this doc comment that is not up to date anymore +// / Follow all IBC packets included in a transaction (recursively). +// / ## Example +// / ```no_run +// / use cw_orch::prelude::PacketInspector; +// / # tokio_test::block_on(async { +// / PacketInspector::default() +// / .await_ibc_execution( +// / "juno-1".to_string(), +// / "2E68E86FEFED8459144D19968B36C6FB8928018D720CC29689B4793A7DE50BD5".to_string() +// / ).await.unwrap(); +// / # }) +// / ``` +impl PacketInspector { + /// Adds a custom chain to the environment + /// While following IBC packet execution, this struct will need to get specific chain information from the `chain_id` only + /// More precisely, it will need to get a gRPC channel from a `chain_id`. + /// This struct will use the `crate::prelude::networks::parse_network` function by default to do so. + /// To override this behavior for specific chains (for example for local testing), you can specify a channel for a specific chain_id + pub async fn new(custom_chains: Vec<&Daemon>) -> IcDaemonResult { + let mut env = PacketInspector::default(); + + for chain in custom_chains { + env.registered_chains.insert( + chain.state().chain_data.chain_id.to_string(), + chain.channel(), + ); + } + Ok(env) + } + + /// Following the IBC documentation of packets here : https://github.com/CosmWasm/cosmwasm/blob/main/IBC.md + /// This function retrieves all ibc packets sent out during a transaction and follows them until they are acknoledged back on the sending chain + /// + /// 1. Send Packet. The provided transaction hash is used to retrieve all transaction logs from the sending chain. + /// In the logs, we can find all details that allow us to identify the transaction in which the packet is received in the remote chain + /// These include : + /// - The connection_id + /// - The destination port + /// - The destination channel + /// - The packet sequence (to identify a specific packet in the channel) + /// + /// ## Remarks + /// - The packet data is also retrieved for logging + /// - Multiple packets can be sent out during the same transaction. + /// In order to identify them, we assume the order of the events is the same for all events of a single packet. + /// Ex: packet_connection = ["connection_id_of_packet_1", "connection_id_of_packet_2"] + /// Ex: packet_dst_port = ["packet_dst_port_of_packet_1", "packet_dst_port_of_packet_2"] + /// - The chain id of the destination chain is not available directly in the logs. + /// However, it is possible to query the node for the chain id of the counterparty chain linked by a connection + /// + /// 2. Follow all IBC packets until they are acknowledged on the origin chain + /// + /// 3. Scan all encountered transactions along the way for additional IBC packets + #[async_recursion::async_recursion(?Send)] + pub async fn wait_ibc( + &self, + src_chain: NetworkId, + tx: CosmTxResponse, + ) -> IcDaemonResult> { + // 1. Getting IBC related events for the current tx + finding all IBC packets sent out in the transaction + let grpc_channel1 = self.get_grpc_channel(&src_chain).await?; + + let sent_packets = + find_ibc_packets_sent_in_tx(src_chain.clone(), grpc_channel1.clone(), tx.clone()) + .await?; + + // 2. We follow the packet history for each packet found inside the transaction + let ibc_packet_results = try_join_all( + sent_packets + .iter() + .map(|packet| { + self.clone().follow_packet( + &src_chain, + packet.src_port.clone(), + packet.src_channel.clone(), + &packet.dst_chain_id, + packet.sequence, + ) + }) + .collect::>(), + ) + .await? + .into_iter() + .collect::>(); + + let send_tx_id = TxId { + chain_id: src_chain.clone(), + response: tx, + }; + + // We follow all results from outgoing packets in the resulting transactions + let full_results = try_join_all(ibc_packet_results.into_iter().map(|ibc_result| async { + let txs_to_analyze = match ibc_result.outcome.clone() { + IbcPacketOutcome::Timeout { timeout_tx } => vec![timeout_tx], + IbcPacketOutcome::Success { + receive_tx, ack_tx, .. + } => vec![receive_tx, ack_tx], + }; + + let txs_results = try_join_all( + txs_to_analyze + .iter() + .map(|tx| { + let chain_id = tx.chain_id.clone(); + let response = tx.response.clone(); + self.wait_ibc(chain_id.clone(), response) + }) + .collect::>(), + ) + .await?; + + let analyzed_outcome = match ibc_result.outcome { + IbcPacketOutcome::Timeout { .. } => IbcPacketOutcome::Timeout { + timeout_tx: txs_results[0].clone(), + }, + IbcPacketOutcome::Success { ack, .. } => IbcPacketOutcome::Success { + ack: ack.clone(), + receive_tx: txs_results[0].clone(), + ack_tx: txs_results[1].clone(), + }, + }; + + let analyzed_result = FullIbcPacketAnalysis { + send_tx: Some(send_tx_id.clone()), + outcome: analyzed_outcome, + }; + + Ok::<_, InterchainDaemonError>(analyzed_result.clone()) + })) + .await?; + + let tx_identification = IbcTxAnalysis { + tx_id: send_tx_id.clone(), + packets: full_results, + }; + + Ok(tx_identification) + } + + /// Gets the grpc channel associed with a specific `chain_id` + /// If it's not registered in this struct (using the `add_custom_chain` member), it will query the grpc from the chain regisry (`networks::parse_network` function) + async fn get_grpc_channel<'a>(&self, chain_id: ChainId<'a>) -> IcDaemonResult { + let grpc_channel = self.registered_chains.get(chain_id); + + if let Some(dst_grpc_channel) = grpc_channel { + Ok(dst_grpc_channel.clone()) + } else { + // If no custom channel was registered, we try to get it from the registry + let chain_data: ChainInfoOwned = parse_network(chain_id).unwrap().into(); // TODO, no unwrap here ? + Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await?) + } + } + + /// This is a wrapper to follow a packet directly in a single future + /// Prefer the use of `await_ibc_execution` for following IBC packets related to a transaction + pub async fn follow_packet<'a>( + self, + src_chain: ChainId<'a>, + src_port: PortId, + src_channel: ChannelId, + dst_chain: ChainId<'a>, + sequence: Sequence, + ) -> IcDaemonResult> { + let src_grpc_channel = self.get_grpc_channel(src_chain).await?; + let dst_grpc_channel = self.get_grpc_channel(dst_chain).await?; + + // Then we check that the channel indeed exists + let registered_channel = Ibc::new_async(src_grpc_channel.clone()) + ._channel(src_port.to_string(), src_channel.to_string()) + .await?; + + // We log to warn when the channel state is not right + if registered_channel.state != State::Open as i32 { + log::warn!("Channel is not in an open state, the packet will most likely not be relayed. Channel information {:?}", registered_channel); + } + + let counterparty = registered_channel.counterparty.unwrap(); + + // Here the connection id is not used, because a port_id and a channel_id are sufficient to track a channel + let ibc_channel = InterchainChannel::new( + IbcPort { + connection_id: None, + chain: src_grpc_channel, + chain_id: src_chain.to_string(), + port: src_port, + channel: Some(src_channel), + }, + IbcPort { + connection_id: None, + chain: dst_grpc_channel, + chain_id: dst_chain.to_string(), + port: counterparty.port_id.parse().unwrap(), + channel: Some(counterparty.channel_id.parse().unwrap()), + }, + ); + + // We try to transfer the packets + + // There are 2 possible outcomes for an IBC packet transfer + // 1. The transfer succeeds, this is covered by the `InterchainChannel::follow_packet_cycle` method + // 2. The transfer errors and the packet times out. This is covered by the `InterchainChannel::follow_packet_timeout` method + // If either of those functions succeeds, the other one will never succeeds. That's why we are racing those 2 functions here. + + let (result, _, _) = select_all(vec![ + self.follow_packet_cycle(src_chain, &ibc_channel, sequence) + .boxed(), + self.follow_packet_timeout(src_chain, &ibc_channel, sequence) + .boxed(), + ]) + .await; + + result + } + + /// This functions follows an IBC packet on the remote chain and back on its origin chain. It returns all encountered tx hashes + /// 1. Receive packet. We use the identification of the packet to find the tx in which the packet was received + /// We make sure that only one transaction tracks receiving this packet. + /// If not, we sent out an error (this error actually comes from the code not identifying an IBC packet properly) + /// If such an error happens, it means this function is not implemented properly + /// We verify this transaction is not errored (it should never error) + /// + /// 2. Acknowledgment. The last part of the packet lifetime is the acknowledgement the remote chain sents back. + /// In the same transaction as the one in which the packet is received, an packet acknowledgement should be sent back to the origin chain + /// We get this acknowledgment and deserialize it according to https://github.com/cosmos/cosmos-sdk/blob/v0.42.4/proto/ibc/core/channel/v1/channel.proto#L134-L147 + /// If the acknowledgement doesn't follow the standard, we don't mind and continue + /// 3. Identify the acknowledgement receive packet on the origin chain + /// Finally, we get the transaction hash of the transaction in which the acknowledgement is received on the origin chain. + /// This is also logged for debugging purposes + /// + /// We return the tx hash of the received packet on the remote chain as well as the ack packet transaction on the origin chain + pub async fn follow_packet_cycle<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + sequence: Sequence, + ) -> Result, InterchainDaemonError> { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + // 0. Query the send tx hash for analysis + let send_tx = self.get_packet_send_tx(from, ibc_channel, sequence).await?; + + // 1. Query the tx hash on the remote chains related to the packet the origin chain sent + let received_tx = self + .get_packet_receive_tx(from, ibc_channel, sequence) + .await?; + // We check if the tx errors (this shouldn't happen in IBC connections) + if received_tx.code != 0 { + return Err(DaemonError::TxFailed { + code: received_tx.code, + reason: format!( + "Raw log on {} : {}", + dst_port.chain_id, + received_tx.raw_log.clone() + ), + } + .into()); + } + + // 2. We get the events related to the acknowledgements sent back on the remote chain + + let all_recv_events = received_tx.get_events("write_acknowledgement"); + let recv_event = all_recv_events + .iter() + .filter(|e| { + e.get_first_attribute_value("packet_sequence").unwrap() == sequence.to_string() + }) + .collect::>()[0]; + + let recv_packet_sequence = recv_event + .get_first_attribute_value("packet_sequence") + .unwrap(); + let recv_packet_data = recv_event.get_first_attribute_value("packet_data").unwrap(); + let acknowledgment = recv_event.get_first_attribute_value("packet_ack").unwrap(); + + // We try to unpack the acknowledgement if possible, when it's following the standard format (is not enforced so it's not always possible) + let decoded_ack_string = + serde_json::from_str(&acknowledgment).unwrap_or(format!("{:x?}", acknowledgment)); + + log::info!( + target: &dst_port.chain_id, + "IBC packet n°{} : {}, received on {} on tx {}, with acknowledgment sent back: {}", + recv_packet_sequence, + recv_packet_data, + dst_port.chain_id, + received_tx.txhash, + decoded_ack_string + ); + + // 3. We look for the acknowledgement packet on the origin chain + let ack_tx = self + .get_packet_ack_receive_tx(&src_port.chain_id, ibc_channel, sequence) + .await?; + // First we check if the tx errors (this shouldn't happen in IBC connections) + if ack_tx.code != 0 { + return Err(DaemonError::TxFailed { + code: ack_tx.code, + reason: format!( + "Raw log on {} : {}", + src_port.chain_id.clone(), + ack_tx.raw_log + ), + } + .into()); + } + log::info!( + target: &src_port.chain_id, + "IBC packet n°{} acknowledgment received on {} on tx {}", + sequence, + src_port.chain_id.clone(), + ack_tx.txhash + ); + + Ok(IbcPacketAnalysis { + send_tx: Some(TxId { + response: send_tx, + chain_id: src_port.chain_id.clone(), + }), + outcome: IbcPacketOutcome::Success { + receive_tx: TxId { + chain_id: dst_port.chain_id.clone(), + response: received_tx, + }, + ack_tx: TxId { + chain_id: src_port.chain_id.clone(), + response: ack_tx, + }, + ack: acknowledgment.as_bytes().into(), + }, + }) + } + + /// This functions looks for timeouts of an IBC packet on its origin chain. It returns the tx hash of the timeout tx. + pub async fn follow_packet_timeout<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + sequence: Sequence, + ) -> Result, InterchainDaemonError> { + // 0. Query the send tx hash for analysis + let send_tx = self.get_packet_send_tx(from, ibc_channel, sequence).await?; + + let (src_port, _dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + // We query the tx hash of the timeout packet on the source chain + let timeout_tx = self + .get_packet_timeout_tx(from, ibc_channel, sequence) + .await?; + // We check if the tx errors (this shouldn't happen in IBC connections) + if timeout_tx.code != 0 { + return Err(DaemonError::TxFailed { + code: timeout_tx.code, + reason: format!( + "Raw log on {} : {}", + src_port.chain_id, + timeout_tx.raw_log.clone() + ), + } + .into()); + } + + log::error!( + target: &src_port.chain_id, + "IBC packet n° {} : + port : {}, + channel: {} received a timeout and was not broadcasted successfully on tx {}", + sequence, + src_port.port, + src_port.channel.unwrap(), + timeout_tx.txhash + ); + + // We return the tx hash of this transaction for future analysis + Ok(IbcPacketAnalysis { + send_tx: Some(TxId { + chain_id: src_port.chain_id.clone(), + response: send_tx, + }), + outcome: IbcPacketOutcome::Timeout { + timeout_tx: TxId { + chain_id: src_port.chain_id.clone(), + response: timeout_tx, + }, + }, + }) + } + + async fn get_tx_by_events_and_assert_one( + channel: Channel, + events: Vec, + ) -> Result { + let txs = Node::new_async(channel.clone()) + ._find_some_tx_by_events(events, None, None) + .await?; + if txs.len() != 1 { + return Err(DaemonError::ibc_err("Found multiple transactions matching a send packet event, this is impossible (or cw-orch impl is at fault)").into()); + } + Ok(txs[0].clone()) + } + + // From is the channel from which the send packet has been sent + pub async fn get_packet_send_tx<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + packet_sequence: Sequence, + ) -> Result { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let send_events_string = vec![ + format!("send_packet.packet_dst_port='{}'", dst_port.port), + format!( + "send_packet.packet_dst_channel='{}'", + dst_port + .channel + .clone() + .ok_or(DaemonError::ibc_err(format!( + "No channel registered between {:?} and {:?}", + src_port, dst_port + )))? + ), + format!("send_packet.packet_sequence='{}'", packet_sequence), + ]; + + Self::get_tx_by_events_and_assert_one(src_port.chain, send_events_string).await + } + + // on is the chain on which the packet will be received + pub async fn get_packet_receive_tx<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + packet_sequence: Sequence, + ) -> Result { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let receive_events_string = vec![ + format!("recv_packet.packet_dst_port='{}'", dst_port.port), + format!( + "recv_packet.packet_dst_channel='{}'", + dst_port + .channel + .clone() + .ok_or(DaemonError::ibc_err(format!( + "No channel registered between {:?} and {:?}", + src_port, dst_port + )))? + ), + format!("recv_packet.packet_sequence='{}'", packet_sequence), + ]; + + Self::get_tx_by_events_and_assert_one(dst_port.chain, receive_events_string).await + } + + // on is the chain on which the packet will be received + pub async fn get_packet_timeout_tx<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + packet_sequence: Sequence, + ) -> Result { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let timeout_events_string = vec![ + format!("timeout_packet.packet_dst_port='{}'", dst_port.port), + format!( + "timeout_packet.packet_dst_channel='{}'", + dst_port + .channel + .clone() + .ok_or(DaemonError::ibc_err(format!( + "No channel registered between {:?} and {:?}", + src_port, dst_port + )))? + ), + format!("timeout_packet.packet_sequence='{}'", packet_sequence), + ]; + + Self::get_tx_by_events_and_assert_one(src_port.chain, timeout_events_string).await + } + + // From is the channel from which the original send packet has been sent + pub async fn get_packet_ack_receive_tx<'a>( + &self, + from: ChainId<'a>, + ibc_channel: &'a InterchainChannel, + packet_sequence: Sequence, + ) -> Result { + let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; + + let ack_events_string = vec![ + format!("acknowledge_packet.packet_dst_port='{}'", dst_port.port), + format!( + "acknowledge_packet.packet_dst_channel='{}'", + dst_port + .channel + .clone() + .ok_or(DaemonError::ibc_err(format!( + "No channel registered between {:?} and {:?}", + src_port, dst_port + )))? + ), + format!("acknowledge_packet.packet_sequence='{}'", packet_sequence), + ]; + + Self::get_tx_by_events_and_assert_one(src_port.chain, ack_events_string).await + } +} + +fn get_events(events: &[TxResultBlockEvent], attr_name: &str) -> Vec { + events + .iter() + .map(|e| e.get_first_attribute_value(attr_name).unwrap()) + .collect() +} + +async fn find_ibc_packets_sent_in_tx( + chain: NetworkId, + grpc_channel: Channel, + tx: CosmTxResponse, +) -> IcDaemonResult> { + let send_packet_events = tx.get_events("send_packet"); + if send_packet_events.is_empty() { + return Ok(vec![]); + } + + let connections = get_events(&send_packet_events, "packet_connection"); + let src_ports = get_events(&send_packet_events, "packet_src_port"); + let src_channels = get_events(&send_packet_events, "packet_src_channel"); + let sequences = get_events(&send_packet_events, "packet_sequence"); + let packet_datas = get_events(&send_packet_events, "packet_data"); + let chain_ids = try_join_all( + connections + .iter() + .map(|c| async { + Ok::<_, InterchainDaemonError>( + Ibc::new_async(grpc_channel.clone()) + ._connection_client(c.clone()) + .await? + .chain_id, + ) + }) + .collect::>(), + ) + .await?; + + let mut ibc_packets = vec![]; + for i in 0..src_ports.len() { + // We create the ibcPacketInfo struct + ibc_packets.push(IbcPacketInfo { + src_port: src_ports[i].parse()?, + src_channel: src_channels[i].parse()?, + sequence: sequences[i].parse()?, + dst_chain_id: chain_ids[i].clone(), + }); + + // We query the destination ports and channels to log as well + let ibc = Ibc::new_async(grpc_channel.clone()); + let counterparty = ibc + ._channel(src_ports[i].clone(), src_channels[i].clone()) + .await? + .counterparty + .expect( + "Unreachable, Channel needs to be open on both sides to be able to send packet! ", + ); + + // We log the packets we follow. + log::info!( + target: &chain, + "IBC packet n° {} : + src_port : {}, + src_channel: {}, + dst_port : {}, + dst_channel: {}, + data: {}", + sequences[i], + src_ports[i], + src_channels[i], + counterparty.port_id, + counterparty.channel_id, + packet_datas[i] + ); + } + + Ok(ibc_packets) +} diff --git a/packages/interchain/hermes-relayer/src/relayer.rs b/packages/interchain/hermes-relayer/src/relayer.rs deleted file mode 100644 index eb392e82a..000000000 --- a/packages/interchain/hermes-relayer/src/relayer.rs +++ /dev/null @@ -1,149 +0,0 @@ -use std::collections::HashMap; - -use crate::config::{chain_config, KEY_NAME}; -use crate::keys::restore_key; -use cosmwasm_std::IbcOrder; -use cw_orch_core::environment::ChainInfoOwned; -use cw_orch_core::environment::ChainState; -use cw_orch_daemon::Daemon; -use cw_orch_interchain_core::env::ChainId; -use cw_orch_interchain_daemon::ChannelCreator; -use cw_orch_interchain_daemon::DaemonInterchainEnv; -use cw_orch_interchain_daemon::InterchainDaemonError; -use cw_orch_interchain_daemon::{IcDaemonResult, Mnemonic}; -use ibc_relayer::chain::requests::{IncludeProof, QueryClientStateRequest, QueryConnectionRequest}; -use ibc_relayer::chain::{handle::ChainHandle, requests::QueryHeight}; -use ibc_relayer::channel::Channel; -use ibc_relayer::config::{Config, RestConfig, TelemetryConfig}; -use ibc_relayer::connection::Connection; -use ibc_relayer::foreign_client::ForeignClient; -use ibc_relayer_cli::cli_utils::spawn_chain_runtime; -use ibc_relayer_types::core::ics03_connection::connection::IdentifiedConnectionEnd; -use ibc_relayer_types::core::ics04_channel::channel::Ordering; -use ibc_relayer_types::core::ics24_host::identifier::{self}; -use tokio::runtime::Handle; - -impl ChannelCreator for HermesRelayer { - fn create_ibc_channel( - &self, - src_chain: ChainId, - dst_chain: ChainId, - src_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, - dst_port: &old_ibc_relayer_types::core::ics24_host::identifier::PortId, - version: &str, - order: Option, - ) -> Result { - // TODO connection should be a parameter - let src_connection = self - .connection_ids - .get(&(src_chain.to_string(), dst_chain.to_string())) - .unwrap(); - - let (src_daemon, src_is_consumer_chain, src_rpc_url) = self.daemons.get(src_chain).unwrap(); - let src_chain_data = &src_daemon.state().chain_data; - let src_hd_path = src_daemon.wallet().options().hd_index; - - let (dst_daemon, dst_is_consumer_chain, dst_rpc_url) = self.daemons.get(dst_chain).unwrap(); - let dst_chain_data = &dst_daemon.state().chain_data; - let dst_hd_path = dst_daemon.wallet().options().hd_index; - - let mnemonic = std::env::var("TEST_MNEMONIC").unwrap(); - - let config = Config { - global: ibc_relayer::config::GlobalConfig { - log_level: ibc_relayer::config::LogLevel::Debug, - }, - mode: ibc_relayer::config::ModeConfig::default(), - rest: RestConfig::default(), - telemetry: TelemetryConfig::default(), - chains: vec![ - chain_config( - src_chain, - src_rpc_url, - src_chain_data, - *src_is_consumer_chain, - ), - chain_config( - dst_chain, - dst_rpc_url, - dst_chain_data, - *dst_is_consumer_chain, - ), - ], - tracing_server: Default::default(), - }; - - // Validate & spawn runtime for side a. - let chain_a = - spawn_chain_runtime(&config, &identifier::ChainId::from_string(src_chain)).unwrap(); - - let src_key = - restore_key(mnemonic.clone(), src_hd_path.unwrap_or(0), src_chain_data).unwrap(); - chain_a.add_key(KEY_NAME.to_string(), src_key).unwrap(); - - // Query the connection end. - let (conn_end, _) = chain_a - .query_connection( - QueryConnectionRequest { - connection_id: src_connection.parse().unwrap(), - height: QueryHeight::Latest, - }, - IncludeProof::No, - ) - .unwrap(); - - // Query the client state, obtain the identifier of chain b. - let chain_b = chain_a - .query_client_state( - QueryClientStateRequest { - client_id: conn_end.client_id().clone(), - height: QueryHeight::Latest, - }, - IncludeProof::No, - ) - .map(|(cs, _)| cs.chain_id()) - .unwrap(); - - // Spawn the runtime for side b. - let chain_b = spawn_chain_runtime(&config, &chain_b).unwrap(); - let dst_key = restore_key(mnemonic, dst_hd_path.unwrap_or(0), dst_chain_data).unwrap(); - chain_b.add_key(KEY_NAME.to_string(), dst_key).unwrap(); - - // Create the foreign client handles. - let client_a = - ForeignClient::find(chain_b.clone(), chain_a.clone(), conn_end.client_id()).unwrap(); - - let client_b = - ForeignClient::find(chain_a, chain_b, conn_end.counterparty().client_id()).unwrap(); - - let identified_end = - IdentifiedConnectionEnd::new(src_connection.parse().unwrap(), conn_end); - - let connection = Connection::find(client_a, client_b, &identified_end).unwrap(); - - let channel = Channel::new( - connection, - cosmwasm_to_hermes_order(order), - src_port.to_string().parse().unwrap(), - dst_port.to_string().parse().unwrap(), - Some(version.to_string().into()), - ) - .unwrap(); - - Ok(src_connection.to_string()) - } - - fn interchain_env(&self) -> cw_orch_interchain_daemon::DaemonInterchainEnv { - panic!(); - } -} - -fn cosmwasm_to_hermes_order(order: Option) -> Ordering { - match order { - Some(order) => match order { - IbcOrder::Unordered => Ordering::Unordered, - IbcOrder::Ordered => Ordering::Ordered, - }, - None => Ordering::Unordered, - } -} diff --git a/packages/interchain/interchain-daemon/src/interchain_env.rs b/packages/interchain/interchain-daemon/src/interchain_env.rs index ff2b6f649..8ab6a87ac 100644 --- a/packages/interchain/interchain-daemon/src/interchain_env.rs +++ b/packages/interchain/interchain-daemon/src/interchain_env.rs @@ -41,6 +41,7 @@ pub struct DaemonInterchainEnv { rt_handle: Handle, } +/// Mnemonic type to clarify code pub type Mnemonic = String; impl DaemonInterchainEnv { From 8c07e21ca462530186c36ea77b4cbec8342ff193 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Fri, 21 Jun 2024 07:45:37 +0000 Subject: [PATCH 3/5] Cleaned up hermes --- .../hermes-relayer/examples/pion-xion.rs | 12 +- .../interchain/hermes-relayer/src/channel.rs | 22 +- .../interchain/hermes-relayer/src/core.rs | 3 +- .../hermes-relayer/src/interchain_env.rs | 86 ++- packages/interchain/hermes-relayer/src/lib.rs | 1 - .../interchain/hermes-relayer/src/packet.rs | 29 +- .../hermes-relayer/src/packet_inspector.rs | 625 ------------------ .../interchain-daemon/src/packet_inspector.rs | 11 +- 8 files changed, 101 insertions(+), 688 deletions(-) delete mode 100644 packages/interchain/hermes-relayer/src/packet_inspector.rs diff --git a/packages/interchain/hermes-relayer/examples/pion-xion.rs b/packages/interchain/hermes-relayer/examples/pion-xion.rs index 446fa2d05..85c9101c1 100644 --- a/packages/interchain/hermes-relayer/examples/pion-xion.rs +++ b/packages/interchain/hermes-relayer/examples/pion-xion.rs @@ -84,17 +84,7 @@ pub fn main() -> cw_orch::anyhow::Result<()> { None, )?; - relayer.follow_packet( - "xion-testnet-1", - PortId::transfer(), - channel - .interchain_channel - .get_chain("xion-testnet-1")? - .channel - .unwrap(), - "pion-1", - 1.into(), - )?; + relayer.check_ibc("xion-testnet-1", response)?; Ok(()) } diff --git a/packages/interchain/hermes-relayer/src/channel.rs b/packages/interchain/hermes-relayer/src/channel.rs index daf1eb7de..42b1a603e 100644 --- a/packages/interchain/hermes-relayer/src/channel.rs +++ b/packages/interchain/hermes-relayer/src/channel.rs @@ -1,10 +1,6 @@ -use crate::config::KEY_NAME; use crate::core::HermesRelayer; -use crate::keys::restore_key; use cosmwasm_std::IbcOrder; -use cw_orch_core::environment::ChainState; use cw_orch_interchain_core::env::ChainId; -use cw_orch_interchain_core::InterchainEnv; use cw_orch_interchain_daemon::ChannelCreator; use cw_orch_interchain_daemon::InterchainDaemonError; use ibc_relayer::chain::requests::{IncludeProof, QueryClientStateRequest, QueryConnectionRequest}; @@ -27,22 +23,11 @@ impl ChannelCreator for HermesRelayer { version: &str, order: Option, ) -> Result { - // TODO connection should be a parameter let src_connection = self .connection_ids .get(&(src_chain.to_string(), dst_chain.to_string())) .unwrap(); - let (src_daemon, _, _) = self.daemons.get(src_chain).unwrap(); - let src_chain_data = &src_daemon.state().chain_data; - let src_hd_path = src_daemon.wallet().options().hd_index; - - let (dst_daemon, _, _) = self.daemons.get(dst_chain).unwrap(); - let dst_chain_data = &dst_daemon.state().chain_data; - let dst_hd_path = dst_daemon.wallet().options().hd_index; - - let mnemonic = std::env::var("TEST_MNEMONIC").unwrap(); - let config = self.duplex_config(src_chain, dst_chain); // Validate & spawn runtime for side a. @@ -89,7 +74,7 @@ impl ChannelCreator for HermesRelayer { let connection = Connection::find(client_a, client_b, &identified_end).unwrap(); - let channel = Channel::new( + Channel::new( connection, cosmwasm_to_hermes_order(order), src_port.to_string().parse().unwrap(), @@ -102,7 +87,10 @@ impl ChannelCreator for HermesRelayer { } fn interchain_env(&self) -> cw_orch_interchain_daemon::DaemonInterchainEnv { - unimplemented!() + unimplemented!(" + The Hermes Relayer is a channel creator as well as an Interchain env. + You don't need to use this function, you can simply await packets directly on this structure" + ) } } diff --git a/packages/interchain/hermes-relayer/src/core.rs b/packages/interchain/hermes-relayer/src/core.rs index bc46cc177..8a0d292a1 100644 --- a/packages/interchain/hermes-relayer/src/core.rs +++ b/packages/interchain/hermes-relayer/src/core.rs @@ -4,7 +4,6 @@ use cw_orch_core::environment::ChainInfoOwned; use cw_orch_core::environment::ChainState; use cw_orch_daemon::Daemon; use cw_orch_interchain_core::env::ChainId; -use cw_orch_interchain_core::InterchainEnv; use cw_orch_interchain_daemon::{IcDaemonResult, Mnemonic}; use ibc_relayer::chain::handle::ChainHandle; use tokio::runtime::Handle; @@ -136,7 +135,7 @@ impl HermesRelayer { pub fn add_key(&self, chain: &impl ChainHandle) { let chain_id = chain.config().unwrap().id().to_string(); - let (daemon, _, rpc) = self.daemons.get(&chain_id).unwrap(); + let (daemon, _, _) = self.daemons.get(&chain_id).unwrap(); let chain_data = &daemon.state().chain_data; let hd_path = daemon.wallet().options().hd_index; diff --git a/packages/interchain/hermes-relayer/src/interchain_env.rs b/packages/interchain/hermes-relayer/src/interchain_env.rs index cc216e6d2..5cfeff1c7 100644 --- a/packages/interchain/hermes-relayer/src/interchain_env.rs +++ b/packages/interchain/hermes-relayer/src/interchain_env.rs @@ -4,10 +4,12 @@ use cw_orch_daemon::queriers::{Ibc, Node}; use cw_orch_daemon::{CosmTxResponse, Daemon, DaemonError}; use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; use cw_orch_interchain_core::env::{ChainId, ChannelCreation}; +use cw_orch_interchain_core::types::{FullIbcPacketAnalysis, IbcPacketOutcome, TxId}; use cw_orch_interchain_core::InterchainEnv; +use cw_orch_interchain_daemon::packet_inspector::find_ibc_packets_sent_in_tx; +use cw_orch_interchain_daemon::packet_inspector::PacketInspector; use crate::core::HermesRelayer; -use crate::packet_inspector::PacketInspector; use cw_orch_interchain_daemon::InterchainDaemonError; use old_ibc_relayer_types::core::ics04_channel::packet::Sequence; use old_ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; @@ -117,17 +119,81 @@ impl InterchainEnv for HermesRelayer { tx_response.txhash ); - // We crate an interchain env object that is safe to send between threads - let interchain_env = self.rt_handle.block_on(PacketInspector::new( - self.daemons.values().map(|(d, _, _)| d).collect(), - ))?; + // 1. Getting IBC related events for the current tx + finding all IBC packets sent out in the transaction + let daemon_1 = self.chain(chain_id)?; + let grpc_channel1 = daemon_1.channel(); - // We follow the trail - let ibc_trail = self - .rt_handle - .block_on(interchain_env.wait_ibc(chain_id.to_string(), tx_response))?; + let sent_packets = daemon_1.rt_handle.block_on(find_ibc_packets_sent_in_tx( + chain_id.to_string(), + grpc_channel1.clone(), + tx_response.clone(), + ))?; - Ok(ibc_trail) + // 2. We follow the packet history for each packet found inside the transaction + let ibc_packet_results = sent_packets + .iter() + .map(|packet| { + self.clone().follow_packet( + chain_id, + packet.src_port.clone(), + packet.src_channel.clone(), + &packet.dst_chain_id, + packet.sequence, + ) + }) + .collect::, _>>()?; + + let send_tx_id = TxId { + chain_id: chain_id.to_string(), + response: tx_response, + }; + + // We follow all results from outgoing packets in the resulting transactions + let full_results = ibc_packet_results + .into_iter() + .map(|ibc_result| { + let txs_to_analyze = match ibc_result.outcome.clone() { + IbcPacketOutcome::Timeout { timeout_tx } => vec![timeout_tx], + IbcPacketOutcome::Success { + receive_tx, ack_tx, .. + } => vec![receive_tx, ack_tx], + }; + + let txs_results = txs_to_analyze + .iter() + .map(|tx| { + let chain_id = tx.chain_id.clone(); + let response = tx.response.clone(); + self.wait_ibc(&chain_id, response) + }) + .collect::, _>>()?; + + let analyzed_outcome = match ibc_result.outcome { + IbcPacketOutcome::Timeout { .. } => IbcPacketOutcome::Timeout { + timeout_tx: txs_results[0].clone(), + }, + IbcPacketOutcome::Success { ack, .. } => IbcPacketOutcome::Success { + ack: ack.clone(), + receive_tx: txs_results[0].clone(), + ack_tx: txs_results[1].clone(), + }, + }; + + let analyzed_result = FullIbcPacketAnalysis { + send_tx: Some(send_tx_id.clone()), + outcome: analyzed_outcome, + }; + + Ok::<_, InterchainDaemonError>(analyzed_result.clone()) + }) + .collect::>()?; + + let tx_identification = IbcTxAnalysis { + tx_id: send_tx_id.clone(), + packets: full_results, + }; + + Ok(tx_identification) } // This function follow the execution of an IBC packet across the chain diff --git a/packages/interchain/hermes-relayer/src/lib.rs b/packages/interchain/hermes-relayer/src/lib.rs index fcbc28a3c..5339baa3c 100644 --- a/packages/interchain/hermes-relayer/src/lib.rs +++ b/packages/interchain/hermes-relayer/src/lib.rs @@ -4,4 +4,3 @@ pub mod core; pub mod interchain_env; pub mod keys; pub mod packet; -pub mod packet_inspector; diff --git a/packages/interchain/hermes-relayer/src/packet.rs b/packages/interchain/hermes-relayer/src/packet.rs index ebc4ff354..18e0a352b 100644 --- a/packages/interchain/hermes-relayer/src/packet.rs +++ b/packages/interchain/hermes-relayer/src/packet.rs @@ -1,11 +1,8 @@ -use crate::{config::KEY_NAME, core::HermesRelayer, keys::restore_key}; +use crate::core::HermesRelayer; use cw_orch_core::environment::QuerierGetter; use cw_orch_daemon::queriers::Ibc; use cw_orch_interchain_core::env::ChainId; -use ibc_relayer::{ - chain::handle::ChainHandle, - link::{Link, LinkParameters}, -}; +use ibc_relayer::link::{Link, LinkParameters}; use ibc_relayer_cli::cli_utils::ChainHandlePair; use ibc_relayer_types::core::{ics04_channel, ics24_host::identifier}; use old_ibc_relayer_types::core::{ @@ -72,12 +69,11 @@ impl HermesRelayer { let sequence: u64 = sequence.into(); let sequence = ics04_channel::packet::Sequence::from(sequence); - let res = link - .relay_recv_packet_and_timeout_messages_with_packet_data_query_height( - vec![sequence..=sequence], - None, - ) - .unwrap(); + link.relay_recv_packet_and_timeout_messages_with_packet_data_query_height( + vec![sequence..=sequence], + None, + ) + .unwrap(); } pub fn ack_packet( @@ -126,11 +122,10 @@ impl HermesRelayer { let sequence: u64 = sequence.into(); let sequence = ics04_channel::packet::Sequence::from(sequence); - let res = link - .relay_ack_packet_messages_with_packet_data_query_height( - vec![sequence..=sequence], - None, - ) - .unwrap(); + link.relay_ack_packet_messages_with_packet_data_query_height( + vec![sequence..=sequence], + None, + ) + .unwrap(); } } diff --git a/packages/interchain/hermes-relayer/src/packet_inspector.rs b/packages/interchain/hermes-relayer/src/packet_inspector.rs deleted file mode 100644 index 8b5142a4b..000000000 --- a/packages/interchain/hermes-relayer/src/packet_inspector.rs +++ /dev/null @@ -1,625 +0,0 @@ -//! Module for tracking a specific packet inside the interchain - -use cw_orch_core::environment::{ChainInfoOwned, ChainState}; -use cw_orch_daemon::networks::parse_network; -use cw_orch_daemon::queriers::{Ibc, Node}; -use cw_orch_daemon::GrpcChannel; -use cw_orch_daemon::TxResultBlockEvent; -use cw_orch_daemon::{CosmTxResponse, Daemon, DaemonError}; -use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; -use cw_orch_interchain_core::env::ChainId; -use cw_orch_interchain_daemon::IcDaemonResult; -use futures_util::future::select_all; -use futures_util::FutureExt; -use ibc_relayer_types::core::ics04_channel::channel::State; - -use cw_orch_interchain_core::types::{ - FullIbcPacketAnalysis, IbcPacketAnalysis, IbcPacketInfo, IbcPacketOutcome, IbcTxAnalysis, - NetworkId, SimpleIbcPacketAnalysis, TxId, -}; -use cw_orch_interchain_daemon::InterchainDaemonError; - -use futures::future::try_join_all; -use old_ibc_relayer_types::core::ics04_channel::packet::Sequence; -use old_ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; -use tonic::transport::Channel; - -use std::collections::HashMap; - -use crate::core::HermesRelayer; - -/// Environment used to track IBC execution and updates on multiple chains. -/// This can be used to track specific IBC packets or get general information update on channels between multiple chains -/// This struct is safe to be sent between threads -/// In contrary to InterchainStructure that holds Daemon in its definition which is not sendable -#[derive(Default, Clone)] -pub struct PacketInspector { - registered_chains: HashMap, -} - -// / TODO, change this doc comment that is not up to date anymore -// / Follow all IBC packets included in a transaction (recursively). -// / ## Example -// / ```no_run -// / use cw_orch::prelude::PacketInspector; -// / # tokio_test::block_on(async { -// / PacketInspector::default() -// / .await_ibc_execution( -// / "juno-1".to_string(), -// / "2E68E86FEFED8459144D19968B36C6FB8928018D720CC29689B4793A7DE50BD5".to_string() -// / ).await.unwrap(); -// / # }) -// / ``` -impl PacketInspector { - /// Adds a custom chain to the environment - /// While following IBC packet execution, this struct will need to get specific chain information from the `chain_id` only - /// More precisely, it will need to get a gRPC channel from a `chain_id`. - /// This struct will use the `crate::prelude::networks::parse_network` function by default to do so. - /// To override this behavior for specific chains (for example for local testing), you can specify a channel for a specific chain_id - pub async fn new(custom_chains: Vec<&Daemon>) -> IcDaemonResult { - let mut env = PacketInspector::default(); - - for chain in custom_chains { - env.registered_chains.insert( - chain.state().chain_data.chain_id.to_string(), - chain.channel(), - ); - } - Ok(env) - } - - /// Following the IBC documentation of packets here : https://github.com/CosmWasm/cosmwasm/blob/main/IBC.md - /// This function retrieves all ibc packets sent out during a transaction and follows them until they are acknoledged back on the sending chain - /// - /// 1. Send Packet. The provided transaction hash is used to retrieve all transaction logs from the sending chain. - /// In the logs, we can find all details that allow us to identify the transaction in which the packet is received in the remote chain - /// These include : - /// - The connection_id - /// - The destination port - /// - The destination channel - /// - The packet sequence (to identify a specific packet in the channel) - /// - /// ## Remarks - /// - The packet data is also retrieved for logging - /// - Multiple packets can be sent out during the same transaction. - /// In order to identify them, we assume the order of the events is the same for all events of a single packet. - /// Ex: packet_connection = ["connection_id_of_packet_1", "connection_id_of_packet_2"] - /// Ex: packet_dst_port = ["packet_dst_port_of_packet_1", "packet_dst_port_of_packet_2"] - /// - The chain id of the destination chain is not available directly in the logs. - /// However, it is possible to query the node for the chain id of the counterparty chain linked by a connection - /// - /// 2. Follow all IBC packets until they are acknowledged on the origin chain - /// - /// 3. Scan all encountered transactions along the way for additional IBC packets - #[async_recursion::async_recursion(?Send)] - pub async fn wait_ibc( - &self, - src_chain: NetworkId, - tx: CosmTxResponse, - ) -> IcDaemonResult> { - // 1. Getting IBC related events for the current tx + finding all IBC packets sent out in the transaction - let grpc_channel1 = self.get_grpc_channel(&src_chain).await?; - - let sent_packets = - find_ibc_packets_sent_in_tx(src_chain.clone(), grpc_channel1.clone(), tx.clone()) - .await?; - - // 2. We follow the packet history for each packet found inside the transaction - let ibc_packet_results = try_join_all( - sent_packets - .iter() - .map(|packet| { - self.clone().follow_packet( - &src_chain, - packet.src_port.clone(), - packet.src_channel.clone(), - &packet.dst_chain_id, - packet.sequence, - ) - }) - .collect::>(), - ) - .await? - .into_iter() - .collect::>(); - - let send_tx_id = TxId { - chain_id: src_chain.clone(), - response: tx, - }; - - // We follow all results from outgoing packets in the resulting transactions - let full_results = try_join_all(ibc_packet_results.into_iter().map(|ibc_result| async { - let txs_to_analyze = match ibc_result.outcome.clone() { - IbcPacketOutcome::Timeout { timeout_tx } => vec![timeout_tx], - IbcPacketOutcome::Success { - receive_tx, ack_tx, .. - } => vec![receive_tx, ack_tx], - }; - - let txs_results = try_join_all( - txs_to_analyze - .iter() - .map(|tx| { - let chain_id = tx.chain_id.clone(); - let response = tx.response.clone(); - self.wait_ibc(chain_id.clone(), response) - }) - .collect::>(), - ) - .await?; - - let analyzed_outcome = match ibc_result.outcome { - IbcPacketOutcome::Timeout { .. } => IbcPacketOutcome::Timeout { - timeout_tx: txs_results[0].clone(), - }, - IbcPacketOutcome::Success { ack, .. } => IbcPacketOutcome::Success { - ack: ack.clone(), - receive_tx: txs_results[0].clone(), - ack_tx: txs_results[1].clone(), - }, - }; - - let analyzed_result = FullIbcPacketAnalysis { - send_tx: Some(send_tx_id.clone()), - outcome: analyzed_outcome, - }; - - Ok::<_, InterchainDaemonError>(analyzed_result.clone()) - })) - .await?; - - let tx_identification = IbcTxAnalysis { - tx_id: send_tx_id.clone(), - packets: full_results, - }; - - Ok(tx_identification) - } - - /// Gets the grpc channel associed with a specific `chain_id` - /// If it's not registered in this struct (using the `add_custom_chain` member), it will query the grpc from the chain regisry (`networks::parse_network` function) - async fn get_grpc_channel<'a>(&self, chain_id: ChainId<'a>) -> IcDaemonResult { - let grpc_channel = self.registered_chains.get(chain_id); - - if let Some(dst_grpc_channel) = grpc_channel { - Ok(dst_grpc_channel.clone()) - } else { - // If no custom channel was registered, we try to get it from the registry - let chain_data: ChainInfoOwned = parse_network(chain_id).unwrap().into(); // TODO, no unwrap here ? - Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await?) - } - } - - /// This is a wrapper to follow a packet directly in a single future - /// Prefer the use of `await_ibc_execution` for following IBC packets related to a transaction - pub async fn follow_packet<'a>( - self, - src_chain: ChainId<'a>, - src_port: PortId, - src_channel: ChannelId, - dst_chain: ChainId<'a>, - sequence: Sequence, - ) -> IcDaemonResult> { - let src_grpc_channel = self.get_grpc_channel(src_chain).await?; - let dst_grpc_channel = self.get_grpc_channel(dst_chain).await?; - - // Then we check that the channel indeed exists - let registered_channel = Ibc::new_async(src_grpc_channel.clone()) - ._channel(src_port.to_string(), src_channel.to_string()) - .await?; - - // We log to warn when the channel state is not right - if registered_channel.state != State::Open as i32 { - log::warn!("Channel is not in an open state, the packet will most likely not be relayed. Channel information {:?}", registered_channel); - } - - let counterparty = registered_channel.counterparty.unwrap(); - - // Here the connection id is not used, because a port_id and a channel_id are sufficient to track a channel - let ibc_channel = InterchainChannel::new( - IbcPort { - connection_id: None, - chain: src_grpc_channel, - chain_id: src_chain.to_string(), - port: src_port, - channel: Some(src_channel), - }, - IbcPort { - connection_id: None, - chain: dst_grpc_channel, - chain_id: dst_chain.to_string(), - port: counterparty.port_id.parse().unwrap(), - channel: Some(counterparty.channel_id.parse().unwrap()), - }, - ); - - // We try to transfer the packets - - // There are 2 possible outcomes for an IBC packet transfer - // 1. The transfer succeeds, this is covered by the `InterchainChannel::follow_packet_cycle` method - // 2. The transfer errors and the packet times out. This is covered by the `InterchainChannel::follow_packet_timeout` method - // If either of those functions succeeds, the other one will never succeeds. That's why we are racing those 2 functions here. - - let (result, _, _) = select_all(vec![ - self.follow_packet_cycle(src_chain, &ibc_channel, sequence) - .boxed(), - self.follow_packet_timeout(src_chain, &ibc_channel, sequence) - .boxed(), - ]) - .await; - - result - } - - /// This functions follows an IBC packet on the remote chain and back on its origin chain. It returns all encountered tx hashes - /// 1. Receive packet. We use the identification of the packet to find the tx in which the packet was received - /// We make sure that only one transaction tracks receiving this packet. - /// If not, we sent out an error (this error actually comes from the code not identifying an IBC packet properly) - /// If such an error happens, it means this function is not implemented properly - /// We verify this transaction is not errored (it should never error) - /// - /// 2. Acknowledgment. The last part of the packet lifetime is the acknowledgement the remote chain sents back. - /// In the same transaction as the one in which the packet is received, an packet acknowledgement should be sent back to the origin chain - /// We get this acknowledgment and deserialize it according to https://github.com/cosmos/cosmos-sdk/blob/v0.42.4/proto/ibc/core/channel/v1/channel.proto#L134-L147 - /// If the acknowledgement doesn't follow the standard, we don't mind and continue - /// 3. Identify the acknowledgement receive packet on the origin chain - /// Finally, we get the transaction hash of the transaction in which the acknowledgement is received on the origin chain. - /// This is also logged for debugging purposes - /// - /// We return the tx hash of the received packet on the remote chain as well as the ack packet transaction on the origin chain - pub async fn follow_packet_cycle<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - sequence: Sequence, - ) -> Result, InterchainDaemonError> { - let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - // 0. Query the send tx hash for analysis - let send_tx = self.get_packet_send_tx(from, ibc_channel, sequence).await?; - - // 1. Query the tx hash on the remote chains related to the packet the origin chain sent - let received_tx = self - .get_packet_receive_tx(from, ibc_channel, sequence) - .await?; - // We check if the tx errors (this shouldn't happen in IBC connections) - if received_tx.code != 0 { - return Err(DaemonError::TxFailed { - code: received_tx.code, - reason: format!( - "Raw log on {} : {}", - dst_port.chain_id, - received_tx.raw_log.clone() - ), - } - .into()); - } - - // 2. We get the events related to the acknowledgements sent back on the remote chain - - let all_recv_events = received_tx.get_events("write_acknowledgement"); - let recv_event = all_recv_events - .iter() - .filter(|e| { - e.get_first_attribute_value("packet_sequence").unwrap() == sequence.to_string() - }) - .collect::>()[0]; - - let recv_packet_sequence = recv_event - .get_first_attribute_value("packet_sequence") - .unwrap(); - let recv_packet_data = recv_event.get_first_attribute_value("packet_data").unwrap(); - let acknowledgment = recv_event.get_first_attribute_value("packet_ack").unwrap(); - - // We try to unpack the acknowledgement if possible, when it's following the standard format (is not enforced so it's not always possible) - let decoded_ack_string = - serde_json::from_str(&acknowledgment).unwrap_or(format!("{:x?}", acknowledgment)); - - log::info!( - target: &dst_port.chain_id, - "IBC packet n°{} : {}, received on {} on tx {}, with acknowledgment sent back: {}", - recv_packet_sequence, - recv_packet_data, - dst_port.chain_id, - received_tx.txhash, - decoded_ack_string - ); - - // 3. We look for the acknowledgement packet on the origin chain - let ack_tx = self - .get_packet_ack_receive_tx(&src_port.chain_id, ibc_channel, sequence) - .await?; - // First we check if the tx errors (this shouldn't happen in IBC connections) - if ack_tx.code != 0 { - return Err(DaemonError::TxFailed { - code: ack_tx.code, - reason: format!( - "Raw log on {} : {}", - src_port.chain_id.clone(), - ack_tx.raw_log - ), - } - .into()); - } - log::info!( - target: &src_port.chain_id, - "IBC packet n°{} acknowledgment received on {} on tx {}", - sequence, - src_port.chain_id.clone(), - ack_tx.txhash - ); - - Ok(IbcPacketAnalysis { - send_tx: Some(TxId { - response: send_tx, - chain_id: src_port.chain_id.clone(), - }), - outcome: IbcPacketOutcome::Success { - receive_tx: TxId { - chain_id: dst_port.chain_id.clone(), - response: received_tx, - }, - ack_tx: TxId { - chain_id: src_port.chain_id.clone(), - response: ack_tx, - }, - ack: acknowledgment.as_bytes().into(), - }, - }) - } - - /// This functions looks for timeouts of an IBC packet on its origin chain. It returns the tx hash of the timeout tx. - pub async fn follow_packet_timeout<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - sequence: Sequence, - ) -> Result, InterchainDaemonError> { - // 0. Query the send tx hash for analysis - let send_tx = self.get_packet_send_tx(from, ibc_channel, sequence).await?; - - let (src_port, _dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - // We query the tx hash of the timeout packet on the source chain - let timeout_tx = self - .get_packet_timeout_tx(from, ibc_channel, sequence) - .await?; - // We check if the tx errors (this shouldn't happen in IBC connections) - if timeout_tx.code != 0 { - return Err(DaemonError::TxFailed { - code: timeout_tx.code, - reason: format!( - "Raw log on {} : {}", - src_port.chain_id, - timeout_tx.raw_log.clone() - ), - } - .into()); - } - - log::error!( - target: &src_port.chain_id, - "IBC packet n° {} : - port : {}, - channel: {} received a timeout and was not broadcasted successfully on tx {}", - sequence, - src_port.port, - src_port.channel.unwrap(), - timeout_tx.txhash - ); - - // We return the tx hash of this transaction for future analysis - Ok(IbcPacketAnalysis { - send_tx: Some(TxId { - chain_id: src_port.chain_id.clone(), - response: send_tx, - }), - outcome: IbcPacketOutcome::Timeout { - timeout_tx: TxId { - chain_id: src_port.chain_id.clone(), - response: timeout_tx, - }, - }, - }) - } - - async fn get_tx_by_events_and_assert_one( - channel: Channel, - events: Vec, - ) -> Result { - let txs = Node::new_async(channel.clone()) - ._find_some_tx_by_events(events, None, None) - .await?; - if txs.len() != 1 { - return Err(DaemonError::ibc_err("Found multiple transactions matching a send packet event, this is impossible (or cw-orch impl is at fault)").into()); - } - Ok(txs[0].clone()) - } - - // From is the channel from which the send packet has been sent - pub async fn get_packet_send_tx<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - packet_sequence: Sequence, - ) -> Result { - let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - let send_events_string = vec![ - format!("send_packet.packet_dst_port='{}'", dst_port.port), - format!( - "send_packet.packet_dst_channel='{}'", - dst_port - .channel - .clone() - .ok_or(DaemonError::ibc_err(format!( - "No channel registered between {:?} and {:?}", - src_port, dst_port - )))? - ), - format!("send_packet.packet_sequence='{}'", packet_sequence), - ]; - - Self::get_tx_by_events_and_assert_one(src_port.chain, send_events_string).await - } - - // on is the chain on which the packet will be received - pub async fn get_packet_receive_tx<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - packet_sequence: Sequence, - ) -> Result { - let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - let receive_events_string = vec![ - format!("recv_packet.packet_dst_port='{}'", dst_port.port), - format!( - "recv_packet.packet_dst_channel='{}'", - dst_port - .channel - .clone() - .ok_or(DaemonError::ibc_err(format!( - "No channel registered between {:?} and {:?}", - src_port, dst_port - )))? - ), - format!("recv_packet.packet_sequence='{}'", packet_sequence), - ]; - - Self::get_tx_by_events_and_assert_one(dst_port.chain, receive_events_string).await - } - - // on is the chain on which the packet will be received - pub async fn get_packet_timeout_tx<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - packet_sequence: Sequence, - ) -> Result { - let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - let timeout_events_string = vec![ - format!("timeout_packet.packet_dst_port='{}'", dst_port.port), - format!( - "timeout_packet.packet_dst_channel='{}'", - dst_port - .channel - .clone() - .ok_or(DaemonError::ibc_err(format!( - "No channel registered between {:?} and {:?}", - src_port, dst_port - )))? - ), - format!("timeout_packet.packet_sequence='{}'", packet_sequence), - ]; - - Self::get_tx_by_events_and_assert_one(src_port.chain, timeout_events_string).await - } - - // From is the channel from which the original send packet has been sent - pub async fn get_packet_ack_receive_tx<'a>( - &self, - from: ChainId<'a>, - ibc_channel: &'a InterchainChannel, - packet_sequence: Sequence, - ) -> Result { - let (src_port, dst_port) = ibc_channel.get_ordered_ports_from(from)?; - - let ack_events_string = vec![ - format!("acknowledge_packet.packet_dst_port='{}'", dst_port.port), - format!( - "acknowledge_packet.packet_dst_channel='{}'", - dst_port - .channel - .clone() - .ok_or(DaemonError::ibc_err(format!( - "No channel registered between {:?} and {:?}", - src_port, dst_port - )))? - ), - format!("acknowledge_packet.packet_sequence='{}'", packet_sequence), - ]; - - Self::get_tx_by_events_and_assert_one(src_port.chain, ack_events_string).await - } -} - -fn get_events(events: &[TxResultBlockEvent], attr_name: &str) -> Vec { - events - .iter() - .map(|e| e.get_first_attribute_value(attr_name).unwrap()) - .collect() -} - -async fn find_ibc_packets_sent_in_tx( - chain: NetworkId, - grpc_channel: Channel, - tx: CosmTxResponse, -) -> IcDaemonResult> { - let send_packet_events = tx.get_events("send_packet"); - if send_packet_events.is_empty() { - return Ok(vec![]); - } - - let connections = get_events(&send_packet_events, "packet_connection"); - let src_ports = get_events(&send_packet_events, "packet_src_port"); - let src_channels = get_events(&send_packet_events, "packet_src_channel"); - let sequences = get_events(&send_packet_events, "packet_sequence"); - let packet_datas = get_events(&send_packet_events, "packet_data"); - let chain_ids = try_join_all( - connections - .iter() - .map(|c| async { - Ok::<_, InterchainDaemonError>( - Ibc::new_async(grpc_channel.clone()) - ._connection_client(c.clone()) - .await? - .chain_id, - ) - }) - .collect::>(), - ) - .await?; - - let mut ibc_packets = vec![]; - for i in 0..src_ports.len() { - // We create the ibcPacketInfo struct - ibc_packets.push(IbcPacketInfo { - src_port: src_ports[i].parse()?, - src_channel: src_channels[i].parse()?, - sequence: sequences[i].parse()?, - dst_chain_id: chain_ids[i].clone(), - }); - - // We query the destination ports and channels to log as well - let ibc = Ibc::new_async(grpc_channel.clone()); - let counterparty = ibc - ._channel(src_ports[i].clone(), src_channels[i].clone()) - .await? - .counterparty - .expect( - "Unreachable, Channel needs to be open on both sides to be able to send packet! ", - ); - - // We log the packets we follow. - log::info!( - target: &chain, - "IBC packet n° {} : - src_port : {}, - src_channel: {}, - dst_port : {}, - dst_channel: {}, - data: {}", - sequences[i], - src_ports[i], - src_channels[i], - counterparty.port_id, - counterparty.channel_id, - packet_datas[i] - ); - } - - Ok(ibc_packets) -} diff --git a/packages/interchain/interchain-daemon/src/packet_inspector.rs b/packages/interchain/interchain-daemon/src/packet_inspector.rs index 8ae9c6a7c..669d1c343 100644 --- a/packages/interchain/interchain-daemon/src/packet_inspector.rs +++ b/packages/interchain/interchain-daemon/src/packet_inspector.rs @@ -433,7 +433,7 @@ impl PacketInspector { } // From is the channel from which the send packet has been sent - pub async fn get_packet_send_tx<'a>( + async fn get_packet_send_tx<'a>( &self, from: ChainId<'a>, ibc_channel: &'a InterchainChannel, @@ -460,7 +460,7 @@ impl PacketInspector { } // on is the chain on which the packet will be received - pub async fn get_packet_receive_tx<'a>( + async fn get_packet_receive_tx<'a>( &self, from: ChainId<'a>, ibc_channel: &'a InterchainChannel, @@ -487,7 +487,7 @@ impl PacketInspector { } // on is the chain on which the packet will be received - pub async fn get_packet_timeout_tx<'a>( + async fn get_packet_timeout_tx<'a>( &self, from: ChainId<'a>, ibc_channel: &'a InterchainChannel, @@ -514,7 +514,7 @@ impl PacketInspector { } // From is the channel from which the original send packet has been sent - pub async fn get_packet_ack_receive_tx<'a>( + async fn get_packet_ack_receive_tx<'a>( &self, from: ChainId<'a>, ibc_channel: &'a InterchainChannel, @@ -548,7 +548,8 @@ fn get_events(events: &[TxResultBlockEvent], attr_name: &str) -> Vec { .collect() } -async fn find_ibc_packets_sent_in_tx( +/// Find all the ibc packets that were sent during a transaction from the transaction events +pub async fn find_ibc_packets_sent_in_tx( chain: NetworkId, grpc_channel: Channel, tx: CosmTxResponse, From b034cb4b849728a0275d8a38567f245204cecb77 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Fri, 21 Jun 2024 07:45:44 +0000 Subject: [PATCH 4/5] formatting --- packages/interchain/hermes-relayer/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/interchain/hermes-relayer/Cargo.toml b/packages/interchain/hermes-relayer/Cargo.toml index 9daba6bf8..e61887339 100644 --- a/packages/interchain/hermes-relayer/Cargo.toml +++ b/packages/interchain/hermes-relayer/Cargo.toml @@ -32,7 +32,7 @@ serde_json.workspace = true [dev-dependencies] cw-orch = { path = "../../../cw-orch", features = ["daemon"] } cw-orch-interchain = { path = "../../../cw-orch-interchain", features = [ - "daemon", + "daemon", ] } dotenv = "0.15.0" ibc-proto = "0.32.0" From efeed5490024471c4b7f867a5cf7086db6547c35 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Fri, 21 Jun 2024 07:46:17 +0000 Subject: [PATCH 5/5] Nit --- packages/interchain/hermes-relayer/src/interchain_env.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/interchain/hermes-relayer/src/interchain_env.rs b/packages/interchain/hermes-relayer/src/interchain_env.rs index 5cfeff1c7..87e4d73e7 100644 --- a/packages/interchain/hermes-relayer/src/interchain_env.rs +++ b/packages/interchain/hermes-relayer/src/interchain_env.rs @@ -133,7 +133,7 @@ impl InterchainEnv for HermesRelayer { let ibc_packet_results = sent_packets .iter() .map(|packet| { - self.clone().follow_packet( + self.follow_packet( chain_id, packet.src_port.clone(), packet.src_channel.clone(), @@ -211,7 +211,6 @@ impl InterchainEnv for HermesRelayer { ))?; // We try to relay the packets using the HERMES relayer - self.force_packet_relay( src_chain, src_port.clone(),