diff --git a/cli/polka-storage-provider/server/src/rpc.rs b/cli/polka-storage-provider/server/src/rpc.rs index 0089a46bf..e97fac220 100644 --- a/cli/polka-storage-provider/server/src/rpc.rs +++ b/cli/polka-storage-provider/server/src/rpc.rs @@ -4,6 +4,7 @@ use jsonrpsee::server::Server; use polka_storage_provider_common::rpc::{RpcError, ServerInfo, StorageProviderRpcServer}; use primitives_commitment::commd::compute_unsealed_sector_commitment; use storagext::{ + runtime::{market::events as MarketEvents, ResultType}, types::market::{ClientDealProposal as SxtClientDealProposal, DealProposal as SxtDealProposal}, MarketClientExt, }; @@ -108,21 +109,20 @@ impl StorageProviderRpcServer for RpcServerState { // however, due to https://github.com/paritytech/subxt/issues/1668 it may wrongly fail. // Fixing this requires the xt_client not wait for the finalization, it's not hard to do // it just requires some API design - let result = self + let event: MarketEvents::DealPublished = self .xt_client - .publish_signed_storage_deals(&self.xt_keypair, vec![deal]) - .await?; + .publish_signed_storage_deals(&self.xt_keypair, vec![deal], ResultType::Event) + .await? + .unwrap_event(); - let published_deals = result - .events - .find::() - .collect::, _>>() - .map_err(|err| RpcError::internal_error(err, None))?; + // let published_deals = result + // .unwrap_events() + // .find::() + // .collect::, _>>() + // .map_err(|err| RpcError::internal_error(err, None))?; // We currently just support a single deal and if there's no published deals, // an error MUST've happened - debug_assert_eq!(published_deals.len(), 1); - let unsealed_dir = self.unsealed_piece_storage_dir.clone(); let sector_size = self.server_info.seal_proof.sector_size(); @@ -151,7 +151,7 @@ impl StorageProviderRpcServer for RpcServerState { tracing::info!("{:?}", comm_d); }); - Ok(published_deals[0].deal_id) + Ok(event.deal_id) } } diff --git a/cli/polka-storage/storagext-cli/src/cmd/market.rs b/cli/polka-storage/storagext-cli/src/cmd/market.rs index d5e74edeb..49b04f1d4 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/market.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/market.rs @@ -6,7 +6,7 @@ use primitives_proofs::DealId; use storagext::{ deser::DeserializablePath, multipair::{DebugPair, MultiPairSigner}, - runtime::SubmissionResult, + runtime::{market::events as MarketEvents, HashOfPsc, ResultType, SubmissionResult}, types::market::DealProposal as SxtDealProposal, MarketClientExt, PolkaStorageConfig, }; @@ -81,6 +81,7 @@ impl MarketCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { let client = storagext::Client::new(node_rpc, n_retries, retry_interval).await?; @@ -110,7 +111,7 @@ impl MarketCommand { return Err(missing_keypair_error::().into()); }; else_ - .with_keypair(client, account_keypair, output_format) + .with_keypair(client, account_keypair, result_type, output_format) .await?; } }; @@ -122,26 +123,38 @@ impl MarketCommand { self, client: Client, account_keypair: MultiPairSigner, - output_format: OutputFormat, + result_type: ResultType, + _output_format: OutputFormat, ) -> Result<(), anyhow::Error> where Client: MarketClientExt, { operation_takes_a_while(); - let submission_result = match self { + match self { MarketCommand::AddBalance { amount } => { - Self::add_balance(client, account_keypair, amount).await? + let (hash, event) = Self::add_balance(client, account_keypair, amount, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } MarketCommand::SettleDealPayments { deal_ids } => { if deal_ids.is_empty() { bail!("No deals provided to settle"); } - Self::settle_deal_payments(client, account_keypair, deal_ids).await? + let (hash, event) = + Self::settle_deal_payments(client, account_keypair, deal_ids, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } MarketCommand::WithdrawBalance { amount } => { - Self::withdraw_balance(client, account_keypair, amount).await? + let (hash, event) = + Self::withdraw_balance(client, account_keypair, amount, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } MarketCommand::PublishStorageDeals { deals, @@ -156,34 +169,20 @@ impl MarketCommand { client_ed25519_key.map(DebugPair::into_inner) ) .expect("client is required to submit at least one key, this should've been handled by clap's ArgGroup"); - Self::publish_storage_deals(client, account_keypair, client_keypair, deals).await? + let (hash, event) = Self::publish_storage_deals( + client, + account_keypair, + client_keypair, + deals, + result_type, + ) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } _unsigned => unreachable!("unsigned commands should have been previously handled"), }; - let hash = submission_result.hash; - // This monstrosity first converts incoming events into a "generic" (subxt generated) event, - // and then we extract only the Market events. We could probably extract this into a proper - // iterator but the effort to improvement ratio seems low (for 2 pallets at least). - let submission_results = submission_result - .events - .iter() - .flat_map(|event| { - event.map(|details| details.as_root_event::()) - }) - .filter_map(|event| match event { - Ok(storagext::runtime::Event::Market(e)) => Some(Ok(e)), - Err(err) => Some(Err(err)), - _ => None, - }); - for event in submission_results { - let event = event?; - let output = output_format.format(&event)?; - match output_format { - OutputFormat::Plain => println!("[{}] {}", hash, output), - OutputFormat::Json => println!("{}", output), - } - } Ok(()) } @@ -191,14 +190,17 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, amount: u128, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: MarketClientExt, { - let submission_result = client.add_balance(&account_keypair, amount).await?; + let submission_result = client + .add_balance(&account_keypair, amount, result_type) + .await?; tracing::debug!( "[{}] Successfully added {} to Market Balance", - submission_result.hash, + submission_result, amount ); @@ -210,7 +212,8 @@ impl MarketCommand { account_keypair: MultiPairSigner, client_keypair: MultiPairSigner, deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: MarketClientExt, { @@ -219,11 +222,12 @@ impl MarketCommand { &account_keypair, &client_keypair, deals.into_iter().map(Into::into).collect(), + result_type, ) .await?; tracing::debug!( "[{}] Successfully published storage deals", - submission_result.hash + submission_result ); Ok(submission_result) @@ -233,17 +237,15 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, deal_ids: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: MarketClientExt, { let submission_result = client - .settle_deal_payments(&account_keypair, deal_ids) + .settle_deal_payments(&account_keypair, deal_ids, result_type) .await?; - tracing::debug!( - "[{}] Successfully settled deal payments", - submission_result.hash - ); + tracing::debug!("[{}] Successfully settled deal payments", submission_result,); Ok(submission_result) } @@ -252,14 +254,17 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, amount: u128, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: MarketClientExt, { - let submission_result = client.withdraw_balance(&account_keypair, amount).await?; + let submission_result = client + .withdraw_balance(&account_keypair, amount, result_type) + .await?; tracing::debug!( "[{}] Successfully withdrew {} from Market Balance", - submission_result.hash, + submission_result, amount ); diff --git a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs index 2d2bfbe8b..42c80883e 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs @@ -7,7 +7,7 @@ use storagext::{ multipair::MultiPairSigner, runtime::{ runtime_types::pallet_storage_provider::sector::ProveCommitSector as RuntimeProveCommitSector, - SubmissionResult, + storage_provider::events as SpEvents, HashOfPsc, ResultType, SubmissionResult, }, types::storage_provider::{ FaultDeclaration as SxtFaultDeclaration, ProveCommitSector as SxtProveCommitSector, @@ -16,7 +16,7 @@ use storagext::{ SubmitWindowedPoStParams as SxtSubmitWindowedPoStParams, TerminationDeclaration as SxtTerminationDeclaration, }, - PolkaStorageConfig, StorageProviderClientExt, + StorageProviderClientExt, }; use url::Url; @@ -102,6 +102,7 @@ impl StorageProviderCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { let client = storagext::Client::new(node_rpc, n_retries, retry_interval).await?; @@ -130,7 +131,7 @@ impl StorageProviderCommand { return Err(missing_keypair_error::().into()); }; else_ - .with_keypair(client, account_keypair, output_format) + .with_keypair(client, account_keypair, result_type, output_format) .await?; } }; @@ -142,64 +143,98 @@ impl StorageProviderCommand { self, client: Client, account_keypair: MultiPairSigner, - output_format: OutputFormat, + result_type: ResultType, + _output_format: OutputFormat, ) -> Result<(), anyhow::Error> where Client: StorageProviderClientExt, { operation_takes_a_while(); - let submission_result = match self { + match self { StorageProviderCommand::RegisterStorageProvider { peer_id, post_proof, } => { - Self::register_storage_provider(client, account_keypair, peer_id, post_proof) - .await? + let (hash, event) = Self::register_storage_provider( + client, + account_keypair, + peer_id, + post_proof, + result_type, + ) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } StorageProviderCommand::PreCommit { pre_commit_sectors } => { - Self::pre_commit(client, account_keypair, pre_commit_sectors).await? + let (hash, event) = + Self::pre_commit(client, account_keypair, pre_commit_sectors, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } StorageProviderCommand::ProveCommit { prove_commit_sectors, - } => Self::prove_commit(client, account_keypair, prove_commit_sectors).await?, + } => { + let (hash, event) = + Self::prove_commit(client, account_keypair, prove_commit_sectors, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); + } StorageProviderCommand::SubmitWindowedProofOfSpaceTime { windowed_post } => { - Self::submit_windowed_post(client, account_keypair, windowed_post).await? + let (hash, event) = + Self::submit_windowed_post(client, account_keypair, windowed_post, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } StorageProviderCommand::DeclareFaults { faults } => { - Self::declare_faults(client, account_keypair, faults).await? + let (hash, event) = + Self::declare_faults(client, account_keypair, faults, result_type) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } StorageProviderCommand::DeclareFaultsRecovered { recoveries } => { - Self::declare_faults_recovered(client, account_keypair, recoveries).await? + let (hash, event) = Self::declare_faults_recovered( + client, + account_keypair, + recoveries, + result_type, + ) + .await? + .unwrap_both(); + println!("{hash:?}: {event:?}"); } StorageProviderCommand::TerminateSectors { terminations } => { Self::terminate_sectors(client, account_keypair, terminations).await? } _unsigned => unreachable!("unsigned commands should have been previously handled"), - }; - - // This monstrosity first converts incoming events into a "generic" (subxt generated) event, - // and then we extract only the Market events. We could probably extract this into a proper - // iterator but the effort to improvement ratio seems low (for 2 pallets at least). - let submission_results = submission_result - .events - .iter() - .flat_map(|event| { - event.map(|details| details.as_root_event::()) - }) - .filter_map(|event| match event { - Ok(storagext::runtime::Event::StorageProvider(e)) => Some(Ok(e)), - Err(err) => Some(Err(err)), - _ => None, - }); - for event in submission_results { - let event = event?; - let output = output_format.format(&event)?; - match output_format { - OutputFormat::Plain => println!("[{}] {}", submission_result.hash, output), - OutputFormat::Json => println!("{}", output), - } } + + // // This monstrosity first converts incoming events into a "generic" (subxt generated) event, + // // and then we extract only the Market events. We could probably extract this into a proper + // // iterator but the effort to improvement ratio seems low (for 2 pallets at least). + // let events = events + // .iter() + // .flat_map(|event| { + // event.map(|details| details.as_root_event::()) + // }) + // .filter_map(|event| match event { + // Ok(storagext::runtime::Event::StorageProvider(e)) => Some(Ok(e)), + // Err(err) => Some(Err(err)), + // _ => None, + // }); + // for event in events { + // let event = event?; + // let output = output_format.format(&event)?; + // match output_format { + // OutputFormat::Plain => println!("[{}] {}", hash, output), + // OutputFormat::Json => println!("{}", output), + // } + // } Ok(()) } @@ -208,16 +243,17 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, peer_id: String, post_proof: RegisteredPoStProof, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .register_storage_provider(&account_keypair, peer_id.clone(), post_proof) + .register_storage_provider(&account_keypair, peer_id.clone(), post_proof, result_type) .await?; tracing::debug!( "[{}] Successfully registered {}, seal: {:?} in Storage Provider Pallet", - submission_result.hash, + submission_result, peer_id, post_proof ); @@ -229,7 +265,8 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, pre_commit_sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { @@ -240,11 +277,11 @@ impl StorageProviderCommand { .unzip(); let submission_result = client - .pre_commit_sectors(&account_keypair, pre_commit_sectors) + .pre_commit_sectors(&account_keypair, pre_commit_sectors, result_type) .await?; tracing::debug!( "[{}] Successfully pre-commited sectors {:?}.", - submission_result.hash, + submission_result, sector_numbers ); @@ -255,7 +292,8 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, prove_commit_sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { @@ -270,11 +308,11 @@ impl StorageProviderCommand { }) .unzip(); let submission_result = client - .prove_commit_sectors(&account_keypair, prove_commit_sectors) + .prove_commit_sectors(&account_keypair, prove_commit_sectors, result_type) .await?; tracing::debug!( "[{}] Successfully proven sector {:?}.", - submission_result.hash, + submission_result, sector_numbers ); @@ -285,14 +323,15 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, windowed_post: SxtSubmitWindowedPoStParams, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .submit_windowed_post(&account_keypair, windowed_post.into()) + .submit_windowed_post(&account_keypair, windowed_post.into(), result_type) .await?; - tracing::debug!("[{}] Successfully submitted proof.", submission_result.hash); + tracing::debug!("[{}] Successfully submitted proof.", submission_result); Ok(submission_result) } @@ -301,12 +340,15 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, faults: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { - let submission_result = client.declare_faults(&account_keypair, faults).await?; - tracing::debug!("[{}] Successfully declared faults.", submission_result.hash); + let submission_result = client + .declare_faults(&account_keypair, faults, result_type) + .await?; + tracing::debug!("[{}] Successfully declared faults.", submission_result); Ok(submission_result) } @@ -315,14 +357,15 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, recoveries: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .declare_faults_recovered(&account_keypair, recoveries) + .declare_faults_recovered(&account_keypair, recoveries, result_type) .await?; - tracing::debug!("[{}] Successfully declared faults.", submission_result.hash); + tracing::debug!("[{}] Successfully declared faults.", submission_result); Ok(submission_result) } diff --git a/cli/polka-storage/storagext-cli/src/main.rs b/cli/polka-storage/storagext-cli/src/main.rs index b512d357a..74346016e 100644 --- a/cli/polka-storage/storagext-cli/src/main.rs +++ b/cli/polka-storage/storagext-cli/src/main.rs @@ -6,7 +6,10 @@ use std::{fmt::Debug, time::Duration}; use clap::{ArgGroup, Parser, Subcommand}; use cmd::{market::MarketCommand, storage_provider::StorageProviderCommand, system::SystemCommand}; -use storagext::multipair::{DebugPair, MultiPairSigner}; +use storagext::{ + multipair::{DebugPair, MultiPairSigner}, + runtime::ResultType, +}; use subxt::ext::sp_core::{ ecdsa::Pair as ECDSAPair, ed25519::Pair as Ed25519Pair, sr25519::Pair as Sr25519Pair, }; @@ -77,6 +80,10 @@ struct Cli { #[arg(long, env, default_value = DEFAULT_RETRY_INTERVAL_MS, value_parser = parse_ms)] pub retry_interval: Duration, + /// The expected return type, i.e. none, hash, events, etc.. + #[arg(long, env, default_value_t = ResultType::Success, value_parser = ResultType::value_parser)] + pub result_type: ResultType, + /// Output format. #[arg(long, env, value_parser = OutputFormat::value_parser, default_value_t = OutputFormat::Plain)] pub format: OutputFormat, @@ -101,6 +108,7 @@ impl SubCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { match self { @@ -110,6 +118,7 @@ impl SubCommand { account_keypair, n_retries, retry_interval, + result_type, output_format, ) .await?; @@ -120,6 +129,7 @@ impl SubCommand { account_keypair, n_retries, retry_interval, + result_type, output_format, ) .await?; @@ -179,6 +189,7 @@ async fn main() -> Result<(), anyhow::Error> { multi_pair_signer, cli_arguments.n_retries, cli_arguments.retry_interval, + cli_arguments.result_type, cli_arguments.format, ) .await?; diff --git a/cli/polka-storage/storagext/src/clients/market.rs b/cli/polka-storage/storagext/src/clients/market.rs index ee7b9e6e5..f529a2fcf 100644 --- a/cli/polka-storage/storagext/src/clients/market.rs +++ b/cli/polka-storage/storagext/src/clients/market.rs @@ -6,7 +6,8 @@ use subxt::{ext::sp_core::crypto::Ss58Codec, utils::Static}; use crate::{ runtime::{ self, - client::SubmissionResult, + client::{HashOfPsc, ResultType, SubmissionResult}, + market::events as MarketEvents, runtime_types::pallet_market::pallet::{ BalanceEntry, ClientDealProposal as RuntimeClientDealProposal, }, @@ -34,7 +35,10 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, amount: Currency, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -43,7 +47,10 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, amount: Currency, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -54,7 +61,10 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, deal_ids: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -66,7 +76,10 @@ pub trait MarketClientExt { account_keypair: &Keypair, client_keypair: &ClientKeypair, deals: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer, ClientKeypair: subxt::tx::Signer; @@ -78,7 +91,10 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, deals: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -102,12 +118,14 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, amount: Currency, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { let payload = runtime::tx().market().withdraw_balance(amount); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -122,12 +140,14 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, amount: Currency, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { let payload = runtime::tx().market().add_balance(amount); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -142,7 +162,8 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, mut deal_ids: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -160,7 +181,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .settle_deal_payments(bounded_unbounded_deal_ids); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -175,7 +197,8 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, client_keypair: &ClientKeypair, mut deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, ClientKeypair: subxt::tx::Signer, @@ -202,7 +225,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .publish_storage_deals(bounded_unbounded_deals); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -216,7 +240,8 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, mut deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -240,7 +265,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .publish_storage_deals(bounded_unbounded_deals); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( diff --git a/cli/polka-storage/storagext/src/clients/storage_provider.rs b/cli/polka-storage/storagext/src/clients/storage_provider.rs index 26ee45c58..0adc5d257 100644 --- a/cli/polka-storage/storagext/src/clients/storage_provider.rs +++ b/cli/polka-storage/storagext/src/clients/storage_provider.rs @@ -11,25 +11,32 @@ use crate::{ runtime::{ self, bounded_vec::IntoBoundedByteVec, - client::SubmissionResult, + client::{HashOfPsc, ResultType, SubmissionResult}, runtime_types::pallet_storage_provider::{ proofs::SubmitWindowedPoStParams, sector::ProveCommitSector, storage_provider::StorageProviderState, }, - storage_provider::calls::types::register_storage_provider::PeerId, + storage_provider::{calls::types::register_storage_provider::PeerId, events as SpEvents}, }, types::storage_provider::{ FaultDeclaration, RecoveryDeclaration, SectorPreCommitInfo, TerminationDeclaration, }, BlockNumber, Currency, PolkaStorageConfig, }; + pub trait StorageProviderClientExt { fn register_storage_provider( &self, account_keypair: &Keypair, peer_id: String, post_proof: RegisteredPoStProof, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + SubmissionResult, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -37,7 +44,10 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, sectors: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -45,7 +55,8 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, sectors: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future, subxt::Error>> where Keypair: subxt::tx::Signer; @@ -53,7 +64,10 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, windowed_post: SubmitWindowedPoStParams, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -61,7 +75,8 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, faults: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future, subxt::Error>> where Keypair: subxt::tx::Signer; @@ -69,7 +84,8 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, recoveries: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future, subxt::Error>> where Keypair: subxt::tx::Signer; @@ -106,7 +122,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, peer_id: String, post_proof: RegisteredPoStProof, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -114,7 +131,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .register_storage_provider(peer_id.into_bounded_byte_vec(), post_proof); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -128,14 +146,16 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { let sectors = BoundedVec(sectors.into_iter().map(Into::into).collect()); let payload = runtime::tx().storage_provider().pre_commit_sectors(sectors); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -149,7 +169,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -158,7 +179,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .prove_commit_sectors(sectors); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -172,7 +194,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, windowed_post: SubmitWindowedPoStParams, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -180,7 +203,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .submit_windowed_post(windowed_post); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -194,7 +218,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, faults: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -202,7 +227,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .declare_faults(faults.into()); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -216,7 +242,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, recoveries: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -224,7 +251,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .declare_faults_recovered(recoveries.into()); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument(level = "debug", skip_all)] diff --git a/cli/polka-storage/storagext/src/lib.rs b/cli/polka-storage/storagext/src/lib.rs index d4da7aed4..6db1c592f 100644 --- a/cli/polka-storage/storagext/src/lib.rs +++ b/cli/polka-storage/storagext/src/lib.rs @@ -19,7 +19,7 @@ pub type Currency = u128; pub type BlockNumber = u64; /// Parachain configuration for subxt. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PolkaStorageConfig {} // Types are fully qualified ON PURPOSE! diff --git a/cli/polka-storage/storagext/src/runtime/client.rs b/cli/polka-storage/storagext/src/runtime/client.rs index 5cf101e78..8d3ffc34d 100644 --- a/cli/polka-storage/storagext/src/runtime/client.rs +++ b/cli/polka-storage/storagext/src/runtime/client.rs @@ -1,20 +1,113 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use hex::ToHex; -use subxt::{blocks::ExtrinsicEvents, OnlineClient}; +use subxt::{utils::AccountId32, OnlineClient}; +use tokio::sync::RwLock; -use crate::PolkaStorageConfig; +use crate::{ + runtime::{market::events as MarketEvents, storage_provider::events as SpEvents}, + PolkaStorageConfig, +}; + +type HashOf = ::Hash; +pub type HashOfPsc = HashOf; +type ParaEvents = Arc, subxt::events::EventDetails)>>>; /// Helper type for [`Client::traced_submission`] successful results. -pub struct SubmissionResult +pub enum SubmissionResult { + /// No return value contained. + None, + /// Submission block hash. + Hash { block_hash: Hash }, + /// Resulting extrinsic's events. + Event { event: Event }, + /// Combined. Return of a hash and the events. + Both { block_hash: Hash, event: Event }, +} + +impl SubmissionResult where - Config: subxt::Config, + Hash: subxt::config::BlockHash, + Event: subxt::events::StaticEvent, { - /// Submission block hash. - pub hash: Config::Hash, + /// Like other `unwrap()` implementations that unwraps the expected `Config::Hash`. + /// Method will panic in case it does not contain the expected return data type. + pub fn unwrap_block_hash(self) -> Hash { + match self { + SubmissionResult::Hash { block_hash } => block_hash, + _ => panic!("expected block-hash but contained something different"), + } + } - /// Resulting extrinsic's events. - pub events: ExtrinsicEvents, + /// Like other `unwrap()` implementations that unwraps the expected `Config::Hash`. + /// Method will panic in case it does not contain the expected return data type. + pub fn unwrap_event(self) -> Event { + match self { + SubmissionResult::Event { event } => event, + _ => panic!("expected events but contained something different"), + } + } + + /// Like other `unwrap()` implementations that unwraps the combined tuple. + /// Method will panic in case it does not contain the expected return data type. + pub fn unwrap_both(self) -> (Hash, Event) { + match self { + SubmissionResult::Both { block_hash, event } => (block_hash, event), + _ => panic!("expected events but contained something different"), + } + } +} + +impl std::fmt::Display for SubmissionResult +where + Hash: subxt::config::BlockHash + std::fmt::Debug, + Event: subxt::events::StaticEvent + std::fmt::Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SubmissionResult::None => write!(f, "None"), + SubmissionResult::Hash { block_hash } => write!(f, "{block_hash:?}"), + SubmissionResult::Event { event } => write!(f, "{event:?}"), + SubmissionResult::Both { block_hash, event } => { + write!(f, "Block-Hash: {block_hash:?}, Event: {event:?}") + } + } + } +} + +/// Selector type for the result type. +#[derive(Clone, Debug, Default, PartialEq)] +pub enum ResultType { + /// Nothing at all, this means like fire-and-forget. + Nothing, + /// Just wait for success but do not return any value. + #[default] + Success, + /// The block's hash of the submission shall be returned. + Hash, + /// The extrinsic's event of the submission shall be returned. + Event, + /// Both types will be returned as combined return value. + Both, +} + +impl ResultType { + pub fn value_parser(s: &str) -> Result { + match s.to_lowercase().as_str() { + "nothing" => Ok(ResultType::Nothing), + "success" => Ok(ResultType::Success), + "hash" => Ok(ResultType::Hash), + "event" => Ok(ResultType::Event), + "both" => Ok(ResultType::Both), + format => Err(format!("unknown format: {}", format)), + } + } +} + +impl std::fmt::Display for ResultType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } } /// Client to interact with a pallet extrinsics. @@ -66,15 +159,19 @@ impl Client { /// /// Equivalent to performing [`OnlineClient::sign_and_submit_then_watch_default`], /// followed by [`TxInBlock::wait_for_finalized`] and [`TxInBlock::wait_for_success`]. - pub(crate) async fn traced_submission( + pub(crate) async fn traced_submission( &self, call: &Call, account_keypair: &Keypair, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result, subxt::Error> where - Call: subxt::tx::Payload, + Call: subxt::tx::Payload + std::fmt::Debug, Keypair: subxt::tx::Signer, + Event: subxt::events::StaticEvent + std::fmt::Debug, + EventMetaProvider: EventMetaType, { + println!("call: {call:?}"); tracing::trace!("submitting extrinsic"); let submission_progress = self .client @@ -82,24 +179,51 @@ impl Client { .sign_and_submit_then_watch_default(call, account_keypair) .await?; + if result_type == ResultType::Nothing { + return Ok(SubmissionResult::None); + } tracing::trace!( extrinsic_hash = submission_progress.extrinsic_hash().encode_hex::(), "waiting for finalization" ); - let finalized_xt = submission_progress.wait_for_finalized().await?; - let block_hash = finalized_xt.block_hash(); - tracing::trace!( - block_hash = block_hash.encode_hex::(), - "successfully submitted extrinsic" - ); - // finalized != successful - let xt_events = finalized_xt.wait_for_success().await?; + let para_events: ParaEvents = Arc::new(RwLock::new(Vec::new())); + let account_id = AccountId32::from(account_keypair.account_id()); + let listener_flag = Arc::new(RwLock::new(true)); + + let p_api = self.client.clone(); + let p_events = para_events.clone(); + let p_flag = listener_flag.clone(); - Ok(SubmissionResult { - hash: block_hash, - events: xt_events, - }) + let _ = tokio::spawn(async move { + para_watcher(p_api, p_flag, p_events).await; + }); + + static EVENT_META_PROVIDER: EventMetaProvider = EventMetaProvider; + let (block_hash, event) = wait_for_para_event::( + para_events.clone(), + >::pallet_name(&EVENT_META_PROVIDER), + >::variant_name(&EVENT_META_PROVIDER), + >::filter_fn( + &EVENT_META_PROVIDER, + account_id, + ), + ) + .await; + { + let mut flag_guard = listener_flag.write().await; + *flag_guard = false; + } + + match result_type { + ResultType::Success => Ok(SubmissionResult::None), + ResultType::Hash => Ok(SubmissionResult::Hash { block_hash }), + ResultType::Event => Ok(SubmissionResult::Event { event }), + ResultType::Both => Ok(SubmissionResult::Both { block_hash, event }), + ResultType::Nothing => { + panic!("should never happen, return this shit to developer"); + } + } } } @@ -108,3 +232,200 @@ impl From> for Client { Self { client } } } + +/// TODO +async fn wait_for_para_event( + events: ParaEvents, + pallet: &'static str, + variant: &'static str, + predicate: impl Fn(&E) -> bool, +) -> (HashOf, E) +where + C: subxt::Config + Clone + std::fmt::Debug, + E: subxt::events::StaticEvent + std::fmt::Debug, +{ + loop { + let mut events = events.write().await; + if let Some(entry) = events.iter().find(|&e| { + e.2.pallet_name() == pallet + && e.2.variant_name() == variant + && predicate(&e.2.as_event::().unwrap().unwrap()) + }) { + let entry = entry.clone(); + events.retain(|e| e.0 > entry.0); + tracing::trace!( + "Found related event {}::{} on block {}", + pallet, + variant, + entry.0 + ); + return (entry.1, entry.2.as_event::().unwrap().unwrap()); + } + drop(events); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } +} + +/// TODO +async fn para_watcher( + api: OnlineClient, + flag: Arc>, + events: ParaEvents, +) where + ::Number: std::fmt::Display, +{ + tracing::trace!("start listening to events on finalised blocks"); + let mut blocks_sub = api.blocks().subscribe_finalized().await.unwrap(); + + while *flag.read().await { + while let Some(block) = blocks_sub.next().await { + let block = block.unwrap(); + let hash = block.hash(); + + for event in block.events().await.unwrap().iter() { + let event = event.unwrap(); + tracing::trace!( + "Listened in block {} event: {}::{}", + block.number(), + event.pallet_name(), + event.variant_name() + ); + { + events + .write() + .await + .push((block.number().into(), hash.clone(), event.clone())); + } + } + } + } + tracing::trace!("stoped event-listener"); +} + +pub trait EventMetaType { + fn pallet_name(&self) -> &str; + fn variant_name(&self) -> &str; + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&Event) -> bool; +} + +pub struct EventMetaProvider; + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "Market" + } + fn variant_name(&self) -> &str { + "BalanceAdded" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceAdded) -> bool { + move |e: &MarketEvents::BalanceAdded| e.who == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "Market" + } + fn variant_name(&self) -> &str { + "BalanceWithdrawn" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceWithdrawn) -> bool { + move |e: &MarketEvents::BalanceWithdrawn| e.who == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "Market" + } + fn variant_name(&self) -> &str { + "DealsSettled" + } + fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealsSettled) -> bool { + move |_: &MarketEvents::DealsSettled| true + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "Market" + } + fn variant_name(&self) -> &str { + "DealPublished" + } + fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealPublished) -> bool { + move |_: &MarketEvents::DealPublished| true + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "StorageProviderRegistered" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::StorageProviderRegistered) -> bool { + move |e: &SpEvents::StorageProviderRegistered| e.owner == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "SectorsPreCommitted" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsPreCommitted) -> bool { + move |e: &SpEvents::SectorsPreCommitted| e.owner == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "SectorsProven" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsProven) -> bool { + move |e: &SpEvents::SectorsProven| e.owner == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "ValidPoStSubmitted" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::ValidPoStSubmitted) -> bool { + move |e: &SpEvents::ValidPoStSubmitted| e.owner == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "FaultsDeclared" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsDeclared) -> bool { + move |e: &SpEvents::FaultsDeclared| e.owner == acc + } +} + +impl EventMetaType for EventMetaProvider { + fn pallet_name(&self) -> &str { + "StorageProvider" + } + fn variant_name(&self) -> &str { + "FaultsRecovered" + } + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsRecovered) -> bool { + move |e: &SpEvents::FaultsRecovered| e.owner == acc + } +} diff --git a/cli/polka-storage/storagext/src/runtime/mod.rs b/cli/polka-storage/storagext/src/runtime/mod.rs index 484a1d436..e5b104422 100644 --- a/cli/polka-storage/storagext/src/runtime/mod.rs +++ b/cli/polka-storage/storagext/src/runtime/mod.rs @@ -103,7 +103,7 @@ pub mod display; mod polka_storage_runtime {} // Using self keeps the import separate from the others -pub use client::SubmissionResult; +pub use client::{HashOfPsc, ResultType, SubmissionResult}; pub use self::polka_storage_runtime::*; #[cfg(test)]