From 3b538e0efe8bcc377b5faa90b603e16141473667 Mon Sep 17 00:00:00 2001 From: satan Date: Mon, 22 Jan 2024 14:52:06 +0100 Subject: [PATCH 1/7] wip: Adding shielded sync types and futures --- crates/apps/src/lib/client/rpc.rs | 7 - crates/sdk/src/masp.rs | 845 +++++++++++++++++------------- 2 files changed, 491 insertions(+), 361 deletions(-) diff --git a/crates/apps/src/lib/client/rpc.rs b/crates/apps/src/lib/client/rpc.rs index 3a3fe9ac75..86277edfd6 100644 --- a/crates/apps/src/lib/client/rpc.rs +++ b/crates/apps/src/lib/client/rpc.rs @@ -849,13 +849,6 @@ pub async fn query_shielded_balance( { let mut shielded = context.shielded_mut().await; let _ = shielded.load().await; - let fvks: Vec<_> = viewing_keys - .iter() - .map(|fvk| ExtendedFullViewingKey::from(*fvk).fvk.vk) - .collect(); - shielded.fetch(context.client(), &[], &fvks).await.unwrap(); - // Save the update state so that future fetches can be short-circuited - let _ = shielded.save().await; } // The epoch is required to identify timestamped tokens let epoch = query_and_print_epoch(context).await; diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index a80f208aff..efced79fff 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -4,9 +4,12 @@ use std::cmp::Ordering; use std::collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; use std::env; use std::fmt::Debug; -use std::ops::Deref; +use std::future::Future; +use std::ops::{ControlFlow, Deref}; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::str::FromStr; +use std::task::{Context, Poll}; // use async_std::io::prelude::WriteExt; // use async_std::io::{self}; @@ -67,6 +70,8 @@ use rand_core::{CryptoRng, OsRng, RngCore}; use ripemd::Digest as RipemdDigest; use sha2::Digest; use thiserror::Error; +use tokio::sync::oneshot::error::TryRecvError; +use namada_core::ledger::inflation::RewardsController; #[cfg(feature = "testing")] use crate::error::EncodingError; @@ -501,10 +506,30 @@ pub type TransferDelta = HashMap; /// Represents the changes that were made to a list of shielded accounts pub type TransactionDelta = HashMap; +/// A marker type indicating that the shielded context is currently +/// syncing. +struct Syncing; + +/// A marker type indicating that the shielded context is not currently +/// syncing. +struct NotSyncing; + +trait SyncStatus { + type InterruptHandler; +} + +impl SyncStatus for NotSyncing { + type InterruptHandler = (); +} + +impl SyncStatus for Syncing { + type InterruptHandler = tokio::sync::oneshot::Receiver<()>; +} + /// Represents the current state of the shielded pool from the perspective of /// the chosen viewing keys. #[derive(BorshSerialize, BorshDeserialize, Debug)] -pub struct ShieldedContext { +pub struct ShieldedContext { /// Location where this shielded context is saved #[borsh(skip)] pub utils: U, @@ -533,6 +558,9 @@ pub struct ShieldedContext { pub asset_types: HashMap)>, /// Maps note positions to their corresponding viewing keys pub vk_map: HashMap, + /// A type that informs the shielded context that whatever operation is + /// being performed should be interrupted. + pub interrupt: S::InterruptHandler, } /// Default implementation to ease construction of TxContexts. Derive cannot be @@ -553,26 +581,17 @@ impl Default for ShieldedContext { delta_map: BTreeMap::default(), asset_types: HashMap::default(), vk_map: HashMap::default(), + interrupt: NotSyncing::InterruptHandler::default(), } } } -impl ShieldedContext { - /// Try to load the last saved shielded context from the given context - /// directory. If this fails, then leave the current context unchanged. - pub async fn load(&mut self) -> std::io::Result<()> { - self.utils.clone().load(self).await - } - - /// Save this shielded context into its associated context directory - pub async fn save(&self) -> std::io::Result<()> { - self.utils.save(self).await - } +impl ShieldedContext { /// Merge data from the given shielded context into the current shielded /// context. It must be the case that the two shielded contexts share the /// same last transaction ID and share identical commitment trees. - pub fn merge(&mut self, new_ctx: ShieldedContext) { + pub fn merge(&mut self, new_ctx: ShieldedContext) { debug_assert_eq!(self.last_indexed, new_ctx.last_indexed); // Merge by simply extending maps. Identical keys should contain // identical values, so overwriting should not be problematic. @@ -598,153 +617,6 @@ impl ShieldedContext { } } - /// Fetch the current state of the multi-asset shielded pool into a - /// ShieldedContext - pub async fn fetch( - &mut self, - client: &C, - sks: &[ExtendedSpendingKey], - fvks: &[ViewingKey], - ) -> Result<(), Error> { - // First determine which of the keys requested to be fetched are new. - // Necessary because old transactions will need to be scanned for new - // keys. - let mut unknown_keys = Vec::new(); - for esk in sks { - let vk = to_viewing_key(esk).vk; - if !self.pos_map.contains_key(&vk) { - unknown_keys.push(vk); - } - } - for vk in fvks { - if !self.pos_map.contains_key(vk) { - unknown_keys.push(*vk); - } - } - - // If unknown keys are being used, we need to scan older transactions - // for any unspent notes - let (txs, mut tx_iter); - if !unknown_keys.is_empty() { - // Load all transactions accepted until this point - txs = Self::fetch_shielded_transfers(client, None).await?; - tx_iter = txs.iter(); - // Do this by constructing a shielding context only for unknown keys - let mut tx_ctx = Self { - utils: self.utils.clone(), - ..Default::default() - }; - for vk in unknown_keys { - tx_ctx.pos_map.entry(vk).or_insert_with(BTreeSet::new); - } - // Update this unknown shielded context until it is level with self - while tx_ctx.last_indexed != self.last_indexed { - if let Some((indexed_tx, (epoch, tx, stx))) = tx_iter.next() { - tx_ctx.scan_tx(*indexed_tx, *epoch, tx, stx)?; - } else { - break; - } - } - // Merge the context data originating from the unknown keys into the - // current context - self.merge(tx_ctx); - } else { - // Load only transactions accepted from last_txid until this point - txs = Self::fetch_shielded_transfers(client, self.last_indexed) - .await?; - tx_iter = txs.iter(); - } - // Now that we possess the unspent notes corresponding to both old and - // new keys up until tx_pos, proceed to scan the new transactions. - for (indexed_tx, (epoch, tx, stx)) in &mut tx_iter { - self.scan_tx(*indexed_tx, *epoch, tx, stx)?; - } - Ok(()) - } - - /// Obtain a chronologically-ordered list of all accepted shielded - /// transactions from a node. - pub async fn fetch_shielded_transfers( - client: &C, - last_indexed_tx: Option, - ) -> Result, Error> - { - // Query for the last produced block height - let last_block_height = query_block(client) - .await? - .map_or_else(BlockHeight::first, |block| block.height); - - let mut shielded_txs = BTreeMap::new(); - // Fetch all the transactions we do not have yet - let first_height_to_query = - last_indexed_tx.map_or_else(|| 1, |last| last.height.0); - let first_idx_to_query = - last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1); - for height in first_height_to_query..=last_block_height.0 { - // Get the valid masp transactions at the specified height - let epoch = query_epoch_at_height(client, height.into()) - .await? - .ok_or_else(|| { - Error::from(QueryError::General( - "Queried height is greater than the last committed \ - block height" - .to_string(), - )) - })?; - - let first_index_to_query = if height == first_height_to_query { - Some(TxIndex(first_idx_to_query)) - } else { - None - }; - - let txs_results = match get_indexed_masp_events_at_height( - client, - height.into(), - first_index_to_query, - ) - .await? - { - Some(events) => events, - None => continue, - }; - - // Query the actual block to get the txs bytes. If we only need one - // tx it might be slightly better to query the /tx endpoint to - // reduce the amount of data sent over the network, but this is a - // minimal improvement and it's even hard to tell how many times - // we'd need a single masp tx to make this worth it - let block = client - .block(height as u32) - .await - .map_err(|e| Error::from(QueryError::General(e.to_string())))? - .block - .data; - - for (idx, tx_event) in txs_results { - let tx = Tx::try_from(block[idx.0 as usize].as_ref()) - .map_err(|e| Error::Other(e.to_string()))?; - let (transfer, masp_transaction) = Self::extract_masp_tx( - &tx, - ExtractShieldedActionArg::Event::(tx_event), - true, - ) - .await?; - - // Collect the current transaction - shielded_txs.insert( - IndexedTx { - height: height.into(), - index: idx, - }, - (epoch, transfer, masp_transaction), - ); - } - } - - Ok(shielded_txs) - } - /// Extract the relevant shield portions of a [`Tx`], if any. async fn extract_masp_tx<'args, C: Client + Sync>( tx: &Tx, @@ -845,138 +717,6 @@ impl ShieldedContext { Ok(tx) } - /// Applies the given transaction to the supplied context. More precisely, - /// the shielded transaction's outputs are added to the commitment tree. - /// Newly discovered notes are associated to the supplied viewing keys. Note - /// nullifiers are mapped to their originating notes. Note positions are - /// associated to notes, memos, and diversifiers. And the set of notes that - /// we have spent are updated. The witness map is maintained to make it - /// easier to construct note merkle paths in other code. See - /// - pub fn scan_tx( - &mut self, - indexed_tx: IndexedTx, - epoch: Epoch, - tx: &Transfer, - shielded: &Transaction, - ) -> Result<(), Error> { - // For tracking the account changes caused by this Transaction - let mut transaction_delta = TransactionDelta::new(); - // Listen for notes sent to our viewing keys - for so in shielded - .sapling_bundle() - .map_or(&vec![], |x| &x.shielded_outputs) - { - // Create merkle tree leaf node from note commitment - let node = Node::new(so.cmu.to_repr()); - // Update each merkle tree in the witness map with the latest - // addition - for (_, witness) in self.witness_map.iter_mut() { - witness.append(node).map_err(|()| { - Error::Other("note commitment tree is full".to_string()) - })?; - } - let note_pos = self.tree.size(); - self.tree.append(node).map_err(|()| { - Error::Other("note commitment tree is full".to_string()) - })?; - // Finally, make it easier to construct merkle paths to this new - // note - let witness = IncrementalWitness::::from_tree(&self.tree); - self.witness_map.insert(note_pos, witness); - // Let's try to see if any of our viewing keys can decrypt latest - // note - let mut pos_map = HashMap::new(); - std::mem::swap(&mut pos_map, &mut self.pos_map); - for (vk, notes) in pos_map.iter_mut() { - let decres = try_sapling_note_decryption::<_, OutputDescription<<::SaplingAuth as masp_primitives::transaction::components::sapling::Authorization>::Proof>>( - &NETWORK, - 1.into(), - &PreparedIncomingViewingKey::new(&vk.ivk()), - so, - ); - // So this current viewing key does decrypt this current note... - if let Some((note, pa, memo)) = decres { - // Add this note to list of notes decrypted by this viewing - // key - notes.insert(note_pos); - // Compute the nullifier now to quickly recognize when spent - let nf = note.nf( - &vk.nk, - note_pos.try_into().map_err(|_| { - Error::Other("Can not get nullifier".to_string()) - })?, - ); - self.note_map.insert(note_pos, note); - self.memo_map.insert(note_pos, memo); - // The payment address' diversifier is required to spend - // note - self.div_map.insert(note_pos, *pa.diversifier()); - self.nf_map.insert(nf, note_pos); - // Note the account changes - let balance = transaction_delta - .entry(*vk) - .or_insert_with(I128Sum::zero); - *balance += I128Sum::from_nonnegative( - note.asset_type, - note.value as i128, - ) - .map_err(|()| { - Error::Other( - "found note with invalid value or asset type" - .to_string(), - ) - })?; - - self.vk_map.insert(note_pos, *vk); - break; - } - } - std::mem::swap(&mut pos_map, &mut self.pos_map); - } - // Cancel out those of our notes that have been spent - for ss in shielded - .sapling_bundle() - .map_or(&vec![], |x| &x.shielded_spends) - { - // If the shielded spend's nullifier is in our map, then target note - // is rendered unusable - if let Some(note_pos) = self.nf_map.get(&ss.nullifier) { - self.spents.insert(*note_pos); - // Note the account changes - let balance = transaction_delta - .entry(self.vk_map[note_pos]) - .or_insert_with(I128Sum::zero); - let note = self.note_map[note_pos]; - *balance -= I128Sum::from_nonnegative( - note.asset_type, - note.value as i128, - ) - .map_err(|()| { - Error::Other( - "found note with invalid value or asset type" - .to_string(), - ) - })?; - } - } - // Record the changes to the transparent accounts - let mut transfer_delta = TransferDelta::new(); - let token_addr = tx.token.clone(); - transfer_delta.insert( - tx.source.clone(), - MaspChange { - asset: token_addr, - change: -tx.amount.amount().change(), - }, - ); - self.last_indexed = Some(indexed_tx); - - self.delta_map - .insert(indexed_tx, (epoch, transfer_delta, transaction_delta)); - Ok(()) - } - /// Summarize the effects on shielded and transparent accounts of each /// Transfer in this context pub fn get_tx_deltas( @@ -1624,12 +1364,6 @@ impl ShieldedContext { // possess let mut shielded = context.shielded_mut().await; let _ = shielded.load().await; - shielded - .fetch(context.client(), &spending_keys, &[]) - .await?; - // Save the update state so that future fetches can be - // short-circuited - let _ = shielded.save().await; } // Determine epoch in which to submit potential shielded transaction let epoch = rpc::query_epoch(context.client()).await?; @@ -2055,13 +1789,6 @@ impl ShieldedContext { const TXS_PER_PAGE: u8 = 100; let _ = self.load().await; let vks = viewing_keys; - let fvks: Vec<_> = vks - .values() - .map(|fvk| ExtendedFullViewingKey::from(*fvk).fvk.vk) - .collect(); - self.fetch(client, &[], &fvks).await?; - // Save the update state so that future fetches can be short-circuited - let _ = self.save().await; // Required for filtering out rejected transactions from Tendermint // responses let block_results = rpc::query_results(client).await?; @@ -2156,66 +1883,441 @@ impl ShieldedContext { } } } - Ok(transfers) + Ok(transfers) + } + + + /// Get the asset type with the given epoch, token, and denomination. If it + /// does not exist in the protocol, then remove the timestamp. Make sure to + /// store the derived AssetType so that future decoding is possible. + pub async fn get_asset_type( + &mut self, + client: &C, + epoch: Epoch, + token: Address, + denom: MaspDenom, + ) -> Result { + let mut asset_type = encode_asset_type(Some(epoch), &token, denom) + .map_err(|_| { + Error::Other("unable to create asset type".to_string()) + })?; + if self.decode_asset_type(client, asset_type).await.is_none() { + // If we fail to decode the epoched asset type, then remove the + // epoch + asset_type = + encode_asset_type(None, &token, denom).map_err(|_| { + Error::Other("unable to create asset type".to_string()) + })?; + self.asset_types.insert(asset_type, (token, denom, None)); + } + Ok(asset_type) + } + + /// Convert Anoma amount and token type to MASP equivalents + async fn convert_amount( + &mut self, + client: &C, + epoch: Epoch, + token: &Address, + val: token::Amount, + ) -> Result<([AssetType; 4], U64Sum), Error> { + let mut amount = U64Sum::zero(); + let mut asset_types = Vec::new(); + for denom in MaspDenom::iter() { + let asset_type = self + .get_asset_type(client, epoch, token.clone(), denom) + .await?; + // Combine the value and unit into one amount + amount += + U64Sum::from_nonnegative(asset_type, denom.denominate(&val)) + .map_err(|_| { + Error::Other("invalid value for amount".to_string()) + })?; + asset_types.push(asset_type); + } + Ok(( + asset_types + .try_into() + .expect("there must be exactly 4 denominations"), + amount, + )) + } +} + +impl ShieldedContext { + /// Try to load the last saved shielded context from the given context + /// directory. If this fails, then leave the current context unchanged. + pub async fn load(&mut self) -> std::io::Result<()> { + self.utils.clone().load(self).await + } + + /// Save this shielded context into its associated context directory + pub async fn save(&self) -> std::io::Result<()> { + self.utils.save(self).await + } + + pub async fn syncing( + self, + client: &C, + last_query_height: Option, + sks: &[ExtendedSpendingKey], + fvks: &[ViewingKey], + ){ + let shutdown_signal = async { + let (tx, rx) = tokio::sync::oneshot::channel(); + crate::control_flow::shutdown_send(tx).await; + rx.await + }; + + let (signal_sx, signal_rx) = tokio::sync::oneshot::channel(); + let mut syncing: ShieldedContext = ShieldedContext { + utils: self.utils, + last_indexed: self.last_indexed, + tree: self.tree, + pos_map: self.pos_map, + nf_map: self.nf_map, + note_map: self.note_map, + memo_map: self.memo_map, + div_map: self.div_map, + witness_map: self.witness_map, + delta_map: self.delta_map, + spents: self.spents, + asset_types: self.asset_types, + vk_map: self.vk_map, + interrupt: signal_rx, + }; + let sync = async move { + let ControlFlow::Break(res) = syncing.fetch(client, last_query_height, sks, fvks).await else { + unreachable!() + }; + res.map(|_| syncing.stop_sync()) + }; + let result = ShieldedSync { + sync, + signal: shutdown_signal, + channel: signal_sx, + signal_recieved: false + }.await; + } +} + +impl ShieldedContext { + + /// Check if a signal to stop syncing has been received. + fn interrupted(&mut self) -> ControlFlow> { + match self.interrupt.try_recv() { + Ok(_) => ControlFlow::Break(Ok(())), + Err(TryRecvError::Empty) => ControlFlow::Continue(()), + _ => panic!("The channel for interrupting shielded sync has unexpectedly hung up.") + } + } + + /// Transition back into a non-syncing context + fn stop_sync(self) -> ShieldedContext { + ShieldedContext { + utils: self.utils, + last_indexed: self.last_indexed, + tree: self.tree, + pos_map: self.pos_map, + nf_map: self.nf_map, + note_map: self.note_map, + memo_map: self.memo_map, + div_map: self.div_map, + witness_map: self.witness_map, + delta_map: self.delta_map, + spents: self.spents, + asset_types: self.asset_types, + vk_map: self.vk_map, + interrupt: (), + } + } + + /// Fetch the current state of the multi-asset shielded pool into a + /// ShieldedContext + pub async fn fetch( + &mut self, + client: &C, + last_query_height: Option, + sks: &[ExtendedSpendingKey], + fvks: &[ViewingKey], + ) -> ControlFlow> { + // First determine which of the keys requested to be fetched are new. + // Necessary because old transactions will need to be scanned for new + // keys. + let mut unknown_keys = Vec::new(); + for esk in sks { + let vk = to_viewing_key(esk).vk; + if !self.pos_map.contains_key(&vk) { + unknown_keys.push(vk); + } + } + for vk in fvks { + if !self.pos_map.contains_key(vk) { + unknown_keys.push(*vk); + } + } + + // If unknown keys are being used, we need to scan older transactions + // for any unspent notes + let (txs, mut tx_iter); + if !unknown_keys.is_empty() { + // Load all transactions accepted until this point + txs = Self::fetch_shielded_transfers(client, None, last_query_height).await?; + tx_iter = txs.iter(); + // Do this by constructing a shielding context only for unknown keys + let mut tx_ctx = Self { + utils: self.utils.clone(), + ..Default::default() + }; + for vk in unknown_keys { + tx_ctx.pos_map.entry(vk).or_insert_with(BTreeSet::new); + } + // Update this unknown shielded context until it is level with self + while tx_ctx.last_indexed != self.last_indexed { + if let Some((indexed_tx, (epoch, tx, stx))) = tx_iter.next() { + tx_ctx.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; + } else { + break; + } + } + // Merge the context data originating from the unknown keys into the + // current context + self.merge(tx_ctx); + } else { + // Load only transactions accepted from last_txid until this point + txs = Self::fetch_shielded_transfers(client, self.last_indexed, last_query_height) + .await?; + tx_iter = txs.iter(); + } + // Now that we possess the unspent notes corresponding to both old and + // new keys up until tx_pos, proceed to scan the new transactions. + for (indexed_tx, (epoch, tx, stx)) in &mut tx_iter { + self.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; + } + ControlFlow::Break(Ok(())) } - /// Get the asset type with the given epoch, token, and denomination. If it - /// does not exist in the protocol, then remove the timestamp. Make sure to - /// store the derived AssetType so that future decoding is possible. - pub async fn get_asset_type( - &mut self, + /// Obtain a chronologically-ordered list of all accepted shielded + /// transactions from a node. + pub async fn fetch_shielded_transfers( client: &C, - epoch: Epoch, - token: Address, - denom: MaspDenom, - ) -> Result { - let mut asset_type = encode_asset_type(Some(epoch), &token, denom) - .map_err(|_| { - Error::Other("unable to create asset type".to_string()) - })?; - if self.decode_asset_type(client, asset_type).await.is_none() { - // If we fail to decode the epoched asset type, then remove the - // epoch - asset_type = - encode_asset_type(None, &token, denom).map_err(|_| { - Error::Other("unable to create asset type".to_string()) + last_indexed_tx: Option, + last_query_height: Option, + ) -> ControlFlow, Error>> + { + // Query for the last produced block height + let last_block_height = query_block(client) + .await + .map_err(ControlFlow::Break)? + .map_or_else(BlockHeight::first, |block| block.height); + let last_query_height = last_query_height.unwrap_or(last_block_height); + + let mut shielded_txs = BTreeMap::new(); + // Fetch all the transactions we do not have yet + let first_height_to_query = + last_indexed_tx.map_or_else(|| 1, |last| last.height.0); + let first_idx_to_query = + last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1); + for height in first_height_to_query..=last_query_height.0 { + // Get the valid masp transactions at the specified height + let epoch = query_epoch_at_height(client, height.into()) + .await + .map_err(ControlFlow::Break)? + .ok_or_else(|| { + Error::from(QueryError::General( + "Queried height is greater than the last committed \ + block height" + .to_string(), + )) })?; - self.asset_types.insert(asset_type, (token, denom, None)); + + let first_index_to_query = if height == first_height_to_query { + Some(TxIndex(first_idx_to_query)) + } else { + None + }; + + let txs_results = match get_indexed_masp_events_at_height( + client, + height.into(), + first_index_to_query, + ) + .await.map_err(ControlFlow::Break)? + { + Some(events) => events, + None => continue, + }; + + // Query the actual block to get the txs bytes. If we only need one + // tx it might be slightly better to query the /tx endpoint to + // reduce the amount of data sent over the network, but this is a + // minimal improvement and it's even hard to tell how many times + // we'd need a single masp tx to make this worth it + let block = client + .block(height as u32) + .await + .map_err(|e| ControlFlow::Break(Error::from(QueryError::General(e.to_string()))))? + .block + .data; + + for (idx, tx_event) in txs_results { + let tx = Tx::try_from(block[idx.0 as usize].as_ref()) + .map_err(|e| ControlFlow::Break(Error::Other(e.to_string())))?; + let (transfer, masp_transaction) = Self::extract_masp_tx( + &tx, + ExtractShieldedActionArg::Event::(tx_event), + true, + ) + .await + .map_err(ControlFlow::Break)?; + + // Collect the current transaction + shielded_txs.insert( + IndexedTx { + height: height.into(), + index: idx, + }, + (epoch, transfer, masp_transaction), + ); + } } - Ok(asset_type) + + ControlFlow::Break(Ok(shielded_txs)) } - /// Convert Anoma amount and token type to MASP equivalents - async fn convert_amount( + /// Applies the given transaction to the supplied context. More precisely, + /// the shielded transaction's outputs are added to the commitment tree. + /// Newly discovered notes are associated to the supplied viewing keys. Note + /// nullifiers are mapped to their originating notes. Note positions are + /// associated to notes, memos, and diversifiers. And the set of notes that + /// we have spent are updated. The witness map is maintained to make it + /// easier to construct note merkle paths in other code. See + /// + pub fn scan_tx( &mut self, - client: &C, + indexed_tx: IndexedTx, epoch: Epoch, - token: &Address, - val: token::Amount, - ) -> Result<([AssetType; 4], U64Sum), Error> { - let mut amount = U64Sum::zero(); - let mut asset_types = Vec::new(); - for denom in MaspDenom::iter() { - let asset_type = self - .get_asset_type(client, epoch, token.clone(), denom) - .await?; - // Combine the value and unit into one amount - amount += - U64Sum::from_nonnegative(asset_type, denom.denominate(&val)) - .map_err(|_| { - Error::Other("invalid value for amount".to_string()) + tx: &Transfer, + shielded: &Transaction, + ) -> ControlFlow> { + // For tracking the account changes caused by this Transaction + let mut transaction_delta = TransactionDelta::new(); + // Listen for notes sent to our viewing keys + for so in shielded + .sapling_bundle() + .map_or(&vec![], |x| &x.shielded_outputs) + { + // Create merkle tree leaf node from note commitment + let node = Node::new(so.cmu.to_repr()); + // Update each merkle tree in the witness map with the latest + // addition + for (_, witness) in self.witness_map.iter_mut() { + witness.append(node).map_err(|()| { + ControlFlow::Break(Error::Other("note commitment tree is full".to_string())) + })?; + } + let note_pos = self.tree.size(); + self.tree.append(node).map_err(|()| { + ControlFlow::Break(Error::Other("note commitment tree is full".to_string())) + })?; + // Finally, make it easier to construct merkle paths to this new + // note + let witness = IncrementalWitness::::from_tree(&self.tree); + self.witness_map.insert(note_pos, witness); + // Let's try to see if any of our viewing keys can decrypt latest + // note + let mut pos_map = HashMap::new(); + std::mem::swap(&mut pos_map, &mut self.pos_map); + for (vk, notes) in pos_map.iter_mut() { + let decres = try_sapling_note_decryption::<_, OutputDescription<<::SaplingAuth as masp_primitives::transaction::components::sapling::Authorization>::Proof>>( + &NETWORK, + 1.into(), + &PreparedIncomingViewingKey::new(&vk.ivk()), + so, + ); + // So this current viewing key does decrypt this current note... + if let Some((note, pa, memo)) = decres { + // Add this note to list of notes decrypted by this viewing + // key + notes.insert(note_pos); + // Compute the nullifier now to quickly recognize when spent + let nf = note.nf( + &vk.nk, + note_pos.try_into().map_err(|_| { + Error::Other("Can not get nullifier".to_string()) + })?, + ); + self.note_map.insert(note_pos, note); + self.memo_map.insert(note_pos, memo); + // The payment address' diversifier is required to spend + // note + self.div_map.insert(note_pos, *pa.diversifier()); + self.nf_map.insert(nf, note_pos); + // Note the account changes + let balance = transaction_delta + .entry(*vk) + .or_insert_with(MaspAmount::default); + *balance += I128Sum::from_nonnegative( + note.asset_type, + note.value as i128, + ) + .map_err(|()| { + Error::Other( + "found note with invalid value or asset type" + .to_string(), + ) + })?; + self.vk_map.insert(note_pos, *vk); + break; + } + } + std::mem::swap(&mut pos_map, &mut self.pos_map); + } + // Cancel out those of our notes that have been spent + for ss in shielded + .sapling_bundle() + .map_or(&vec![], |x| &x.shielded_spends) + { + // If the shielded spend's nullifier is in our map, then target note + // is rendered unusable + if let Some(note_pos) = self.nf_map.get(&ss.nullifier) { + self.spents.insert(*note_pos); + // Note the account changes + let balance = transaction_delta + .entry(self.vk_map[note_pos]) + .or_insert_with(MaspAmount::default); + let note = self.note_map[note_pos]; + *balance -= I128Sum::from_nonnegative( + note.asset_type, + note.value as i128, + ) + .map_err(|()| { + Error::Other( + "found note with invalid value or asset type" + .to_string(), + ) })?; - asset_types.push(asset_type); + } } - Ok(( - asset_types - .try_into() - .expect("there must be exactly 4 denominations"), - amount, - )) + // Record the changes to the transparent accounts + let mut transfer_delta = TransferDelta::new(); + let token_addr = tx.token.clone(); + transfer_delta.insert( + tx.source.clone(), + MaspChange { + asset: token_addr, + change: -tx.amount.amount().change(), + }, + ); + self.last_indexed = Some(indexed_tx); + + self.delta_map + .insert(indexed_tx, (epoch, transfer_delta, transaction_delta)); + ControlFlow::Break(Ok(())) } } + /// Extract the payload from the given Tx object fn extract_payload( tx: Tx, @@ -2325,8 +2427,8 @@ async fn extract_payload_from_shielded_action<'args, C: Client + Sync>( namada_core::types::ibc::get_shielded_transfer( ibc_event, ) - .ok() - .flatten(); + .ok() + .flatten(); if event.is_some() { return event; } @@ -2593,3 +2695,38 @@ pub mod fs { } } } + + +/// A future where one future must finish but the other is optional +struct ShieldedSync +where + A: Future, Error>>, + B: Future>, +{ + sync: A, + signal: B, + channel: tokio::sync::oneshot::Sender<()>, + signal_recieved: bool, +} + +impl Future for ShieldedSync +where + A: Future, Error>>, + B: Future>, +{ + type Output = Result, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(res) = self.sync.poll(cx) { + Poll::Ready(res) + } else { + if !self.signal_recieved { + if let Poll::Ready(_) = self.signal.poll(cx) { + self.signal_recieved = true; + _ = self.channel.send(()); + } + } + Poll::Pending + } + } +} \ No newline at end of file From ed76f16e5a588688915e25b507b9549cf3caf540 Mon Sep 17 00:00:00 2001 From: satan Date: Mon, 22 Jan 2024 15:03:29 +0100 Subject: [PATCH 2/7] Factored sheilded sync into separate functions --- crates/core/src/types/storage.rs | 19 +- crates/sdk/src/masp.rs | 637 ++++++++++++++++--------------- 2 files changed, 344 insertions(+), 312 deletions(-) diff --git a/crates/core/src/types/storage.rs b/crates/core/src/types/storage.rs index 1ac99d11b4..c5803f82a7 100644 --- a/crates/core/src/types/storage.rs +++ b/crates/core/src/types/storage.rs @@ -1,4 +1,5 @@ //! Storage types +use std::cmp::Ordering; use std::collections::VecDeque; use std::convert::{TryFrom, TryInto}; use std::fmt::Display; @@ -1461,8 +1462,6 @@ impl GetEventNonce for InnerEthEventsQueue { BorshDeserialize, Eq, PartialEq, - Ord, - PartialOrd, )] pub struct IndexedTx { /// The block height of the indexed tx @@ -1471,6 +1470,22 @@ pub struct IndexedTx { pub index: TxIndex, } +impl PartialOrd for IndexedTx { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for IndexedTx { + fn cmp(&self, other: &Self) -> Ordering { + if self.height == other.height { + self.index.cmp(&other.index) + } else { + self.height.cmp(&other.height) + } + } +} + #[cfg(test)] /// Tests and strategies for storage pub mod tests { diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index efced79fff..aad95a479f 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -70,8 +70,6 @@ use rand_core::{CryptoRng, OsRng, RngCore}; use ripemd::Digest as RipemdDigest; use sha2::Digest; use thiserror::Error; -use tokio::sync::oneshot::error::TryRecvError; -use namada_core::ledger::inflation::RewardsController; #[cfg(feature = "testing")] use crate::error::EncodingError; @@ -508,13 +506,13 @@ pub type TransactionDelta = HashMap; /// A marker type indicating that the shielded context is currently /// syncing. -struct Syncing; +pub struct Syncing; /// A marker type indicating that the shielded context is not currently /// syncing. -struct NotSyncing; +pub struct NotSyncing; -trait SyncStatus { +pub trait SyncStatus { type InterruptHandler; } @@ -523,7 +521,7 @@ impl SyncStatus for NotSyncing { } impl SyncStatus for Syncing { - type InterruptHandler = tokio::sync::oneshot::Receiver<()>; + type InterruptHandler = tokio::sync::watch::Receiver; } /// Represents the current state of the shielded pool from the perspective of @@ -534,7 +532,7 @@ pub struct ShieldedContext { #[borsh(skip)] pub utils: U, /// The last indexed transaction to be processed in this context - pub last_indexed: Option, + pub last_scanned: Option, /// The commitment tree produced by scanning all transactions up to tx_pos pub tree: CommitmentTree, /// Maps viewing keys to applicable note positions @@ -561,6 +559,8 @@ pub struct ShieldedContext { /// A type that informs the shielded context that whatever operation is /// being performed should be interrupted. pub interrupt: S::InterruptHandler, + /// Fetched data that we did not scan because syncing was interrupted + pub unscanned: BTreeMap, } /// Default implementation to ease construction of TxContexts. Derive cannot be @@ -569,7 +569,7 @@ impl Default for ShieldedContext { fn default() -> ShieldedContext { ShieldedContext:: { utils: U::default(), - last_indexed: None, + last_scanned: None, tree: CommitmentTree::empty(), pos_map: HashMap::default(), nf_map: HashMap::default(), @@ -581,40 +581,18 @@ impl Default for ShieldedContext { delta_map: BTreeMap::default(), asset_types: HashMap::default(), vk_map: HashMap::default(), - interrupt: NotSyncing::InterruptHandler::default(), + interrupt: ::InterruptHandler::default(), + unscanned: BTreeMap::default(), } } } impl ShieldedContext { - /// Merge data from the given shielded context into the current shielded - /// context. It must be the case that the two shielded contexts share the - /// same last transaction ID and share identical commitment trees. - pub fn merge(&mut self, new_ctx: ShieldedContext) { - debug_assert_eq!(self.last_indexed, new_ctx.last_indexed); - // Merge by simply extending maps. Identical keys should contain - // identical values, so overwriting should not be problematic. - self.pos_map.extend(new_ctx.pos_map); - self.nf_map.extend(new_ctx.nf_map); - self.note_map.extend(new_ctx.note_map); - self.memo_map.extend(new_ctx.memo_map); - self.div_map.extend(new_ctx.div_map); - self.witness_map.extend(new_ctx.witness_map); - self.spents.extend(new_ctx.spents); - self.asset_types.extend(new_ctx.asset_types); - self.vk_map.extend(new_ctx.vk_map); - // The deltas are the exception because different keys can reveal - // different parts of the same transaction. Hence each delta needs to be - // merged separately. - for (height, (ep, ntfer_delta, ntx_delta)) in new_ctx.delta_map { - let (_ep, tfer_delta, tx_delta) = self - .delta_map - .entry(height) - .or_insert((ep, TransferDelta::new(), TransactionDelta::new())); - tfer_delta.extend(ntfer_delta); - tx_delta.extend(ntx_delta); - } + /// If we have unscanned txs in the local cache, get the index of the + /// latest one. + pub fn latest_unscanned(&self) -> Option { + self.unscanned.keys().max().cloned() } /// Extract the relevant shield portions of a [`Tx`], if any. @@ -1329,6 +1307,187 @@ impl ShieldedContext( + &mut self, + client: &C, + epoch: Epoch, + token: Address, + denom: MaspDenom, + ) -> Result { + let mut asset_type = encode_asset_type(Some(epoch), &token, denom) + .map_err(|_| { + Error::Other("unable to create asset type".to_string()) + })?; + if self.decode_asset_type(client, asset_type).await.is_none() { + // If we fail to decode the epoched asset type, then remove the + // epoch + asset_type = + encode_asset_type(None, &token, denom).map_err(|_| { + Error::Other("unable to create asset type".to_string()) + })?; + self.asset_types.insert(asset_type, (token, denom, None)); + } + Ok(asset_type) + } + + /// Convert Anoma amount and token type to MASP equivalents + async fn convert_amount( + &mut self, + client: &C, + epoch: Epoch, + token: &Address, + val: token::Amount, + ) -> Result<([AssetType; 4], U64Sum), Error> { + let mut amount = U64Sum::zero(); + let mut asset_types = Vec::new(); + for denom in MaspDenom::iter() { + let asset_type = self + .get_asset_type(client, epoch, token.clone(), denom) + .await?; + // Combine the value and unit into one amount + amount += + U64Sum::from_nonnegative(asset_type, denom.denominate(&val)) + .map_err(|_| { + Error::Other("invalid value for amount".to_string()) + })?; + asset_types.push(asset_type); + } + Ok(( + asset_types + .try_into() + .expect("there must be exactly 4 denominations"), + amount, + )) + } +} + +impl ShieldedContext { + /// Try to load the last saved shielded context from the given context + /// directory. If this fails, then leave the current context unchanged. + pub async fn load(&mut self) -> std::io::Result<()> { + self.utils.clone().load(self).await + } + + /// Save this shielded context into its associated context directory + pub async fn save(&self) -> std::io::Result<()> { + self.utils.save(self).await + } + + /// Obtain the known effects of all accepted shielded and transparent + /// transactions. If an owner is specified, then restrict the set to only + /// transactions crediting/debiting the given owner. If token is specified, + /// then restrict set to only transactions involving the given token. + pub async fn query_tx_deltas( + &mut self, + client: &C, + query_owner: &Either>, + query_token: &Option
, + ) -> Result< + BTreeMap, + Error, + > { + const TXS_PER_PAGE: u8 = 100; + let _ = self.load().await; + // Required for filtering out rejected transactions from Tendermint + // responses + let block_results = rpc::query_results(client).await?; + let mut transfers = self.get_tx_deltas().clone(); + // Construct the set of addresses relevant to user's query + let relevant_addrs = match &query_owner { + Either::Left(BalanceOwner::Address(owner)) => vec![owner.clone()], + // MASP objects are dealt with outside of tx_search + Either::Left(BalanceOwner::FullViewingKey(_viewing_key)) => vec![], + Either::Left(BalanceOwner::PaymentAddress(_owner)) => vec![], + // Unspecified owner means all known addresses are considered + // relevant + Either::Right(addrs) => addrs.clone(), + }; + // Find all transactions to or from the relevant address set + for addr in relevant_addrs { + for prop in ["transfer.source", "transfer.target"] { + // Query transactions involving the current address + let mut tx_query = Query::eq(prop, addr.encode()); + // Elaborate the query if requested by the user + if let Some(token) = &query_token { + tx_query = + tx_query.and_eq("transfer.token", token.encode()); + } + for page in 1.. { + let txs = &client + .tx_search( + tx_query.clone(), + true, + page, + TXS_PER_PAGE, + Order::Ascending, + ) + .await + .map_err(|e| { + Error::from(QueryError::General(format!( + "for transaction: {e}" + ))) + })? + .txs; + for response_tx in txs { + let height = BlockHeight(response_tx.height.value()); + let idx = TxIndex(response_tx.index); + // Only process yet unprocessed transactions which have + // been accepted by node VPs + let should_process = !transfers + .contains_key(&IndexedTx { height, index: idx }) + && block_results[u64::from(height) as usize] + .is_accepted(idx.0 as usize); + if !should_process { + continue; + } + let tx = Tx::try_from(response_tx.tx.as_ref()) + .map_err(|e| Error::Other(e.to_string()))?; + let mut wrapper = None; + let mut transfer = None; + extract_payload(tx, &mut wrapper, &mut transfer)?; + // Epoch data is not needed for transparent transactions + let epoch = + wrapper.map(|x| x.epoch).unwrap_or_default(); + if let Some(transfer) = transfer { + // Skip MASP addresses as they are already handled + // by ShieldedContext + if transfer.source == MASP + || transfer.target == MASP + { + continue; + } + // Describe how a Transfer simply subtracts from one + // account and adds the same to another + + let delta = TransferDelta::from([( + transfer.source.clone(), + MaspChange { + asset: transfer.token.clone(), + change: -transfer.amount.amount().change(), + }, + )]); + + // No shielded accounts are affected by this + // Transfer + transfers.insert( + IndexedTx { height, index: idx }, + (epoch, delta, TransactionDelta::new()), + ); + } + } + // An incomplete page signifies no more transactions + if (txs.len() as u8) < TXS_PER_PAGE { + break; + } + } + } + } + Ok(transfers) + } + /// Make shielded components to embed within a Transfer object. If no /// shielded payment address nor spending key is specified, then no /// shielded components are produced. Otherwise a transaction containing @@ -1358,7 +1517,6 @@ impl ShieldedContext = spending_key.into_iter().collect(); { // Load the current shielded context given the spending key we // possess @@ -1410,7 +1568,7 @@ impl ShieldedContext ShieldedContext( - &mut self, - client: &C, - query_owner: &Either>, - query_token: &Option
, - viewing_keys: &HashMap, - ) -> Result< - BTreeMap, - Error, - > { - const TXS_PER_PAGE: u8 = 100; - let _ = self.load().await; - let vks = viewing_keys; - // Required for filtering out rejected transactions from Tendermint - // responses - let block_results = rpc::query_results(client).await?; - let mut transfers = self.get_tx_deltas().clone(); - // Construct the set of addresses relevant to user's query - let relevant_addrs = match &query_owner { - Either::Left(BalanceOwner::Address(owner)) => vec![owner.clone()], - // MASP objects are dealt with outside of tx_search - Either::Left(BalanceOwner::FullViewingKey(_viewing_key)) => vec![], - Either::Left(BalanceOwner::PaymentAddress(_owner)) => vec![], - // Unspecified owner means all known addresses are considered - // relevant - Either::Right(addrs) => addrs.clone(), - }; - // Find all transactions to or from the relevant address set - for addr in relevant_addrs { - for prop in ["transfer.source", "transfer.target"] { - // Query transactions involving the current address - let mut tx_query = Query::eq(prop, addr.encode()); - // Elaborate the query if requested by the user - if let Some(token) = &query_token { - tx_query = - tx_query.and_eq("transfer.token", token.encode()); - } - for page in 1.. { - let txs = &client - .tx_search( - tx_query.clone(), - true, - page, - TXS_PER_PAGE, - Order::Ascending, - ) - .await - .map_err(|e| { - Error::from(QueryError::General(format!( - "for transaction: {e}" - ))) - })? - .txs; - for response_tx in txs { - let height = BlockHeight(response_tx.height.value()); - let idx = TxIndex(response_tx.index); - // Only process yet unprocessed transactions which have - // been accepted by node VPs - let should_process = !transfers - .contains_key(&IndexedTx { height, index: idx }) - && block_results[u64::from(height) as usize] - .is_accepted(idx.0 as usize); - if !should_process { - continue; - } - let tx = Tx::try_from(response_tx.tx.as_ref()) - .map_err(|e| Error::Other(e.to_string()))?; - let mut wrapper = None; - let mut transfer = None; - extract_payload(tx, &mut wrapper, &mut transfer)?; - // Epoch data is not needed for transparent transactions - let epoch = - wrapper.map(|x| x.epoch).unwrap_or_default(); - if let Some(transfer) = transfer { - // Skip MASP addresses as they are already handled - // by ShieldedContext - if transfer.source == MASP - || transfer.target == MASP - { - continue; - } - // Describe how a Transfer simply subtracts from one - // account and adds the same to another - - let delta = TransferDelta::from([( - transfer.source.clone(), - MaspChange { - asset: transfer.token.clone(), - change: -transfer.amount.amount().change(), - }, - )]); - - // No shielded accounts are affected by this - // Transfer - transfers.insert( - IndexedTx { height, index: idx }, - (epoch, delta, TransactionDelta::new()), - ); - } - } - // An incomplete page signifies no more transactions - if (txs.len() as u8) < TXS_PER_PAGE { - break; - } - } - } - } - Ok(transfers) - } - - - /// Get the asset type with the given epoch, token, and denomination. If it - /// does not exist in the protocol, then remove the timestamp. Make sure to - /// store the derived AssetType so that future decoding is possible. - pub async fn get_asset_type( - &mut self, - client: &C, - epoch: Epoch, - token: Address, - denom: MaspDenom, - ) -> Result { - let mut asset_type = encode_asset_type(Some(epoch), &token, denom) - .map_err(|_| { - Error::Other("unable to create asset type".to_string()) - })?; - if self.decode_asset_type(client, asset_type).await.is_none() { - // If we fail to decode the epoched asset type, then remove the - // epoch - asset_type = - encode_asset_type(None, &token, denom).map_err(|_| { - Error::Other("unable to create asset type".to_string()) - })?; - self.asset_types.insert(asset_type, (token, denom, None)); - } - Ok(asset_type) - } - - /// Convert Anoma amount and token type to MASP equivalents - async fn convert_amount( - &mut self, - client: &C, - epoch: Epoch, - token: &Address, - val: token::Amount, - ) -> Result<([AssetType; 4], U64Sum), Error> { - let mut amount = U64Sum::zero(); - let mut asset_types = Vec::new(); - for denom in MaspDenom::iter() { - let asset_type = self - .get_asset_type(client, epoch, token.clone(), denom) - .await?; - // Combine the value and unit into one amount - amount += - U64Sum::from_nonnegative(asset_type, denom.denominate(&val)) - .map_err(|_| { - Error::Other("invalid value for amount".to_string()) - })?; - asset_types.push(asset_type); + fn into_syncing(self, channel: &tokio::sync::watch::Receiver) -> ShieldedContext { + ShieldedContext { + utils: self.utils, + last_scanned: self.last_scanned, + tree: self.tree, + pos_map: self.pos_map, + nf_map: self.nf_map, + note_map: self.note_map, + memo_map: self.memo_map, + div_map: self.div_map, + witness_map: self.witness_map, + delta_map: self.delta_map, + spents: self.spents, + asset_types: self.asset_types, + vk_map: self.vk_map, + interrupt: channel.clone(), + unscanned: self.unscanned, } - Ok(( - asset_types - .try_into() - .expect("there must be exactly 4 denominations"), - amount, - )) - } -} - -impl ShieldedContext { - /// Try to load the last saved shielded context from the given context - /// directory. If this fails, then leave the current context unchanged. - pub async fn load(&mut self) -> std::io::Result<()> { - self.utils.clone().load(self).await - } - - /// Save this shielded context into its associated context directory - pub async fn save(&self) -> std::io::Result<()> { - self.utils.save(self).await } pub async fn syncing( self, client: &C, + io: &impl Io, last_query_height: Option, sks: &[ExtendedSpendingKey], fvks: &[ViewingKey], - ){ + ) -> Result<(), Error> { let shutdown_signal = async { let (tx, rx) = tokio::sync::oneshot::channel(); crate::control_flow::shutdown_send(tx).await; rx.await }; - let (signal_sx, signal_rx) = tokio::sync::oneshot::channel(); - let mut syncing: ShieldedContext = ShieldedContext { - utils: self.utils, - last_indexed: self.last_indexed, - tree: self.tree, - pos_map: self.pos_map, - nf_map: self.nf_map, - note_map: self.note_map, - memo_map: self.memo_map, - div_map: self.div_map, - witness_map: self.witness_map, - delta_map: self.delta_map, - spents: self.spents, - asset_types: self.asset_types, - vk_map: self.vk_map, - interrupt: signal_rx, - }; + let (signal_sx, signal_rx) = tokio::sync::watch::channel(false); + let mut syncing = self.into_syncing(&signal_rx); let sync = async move { let ControlFlow::Break(res) = syncing.fetch(client, last_query_height, sks, fvks).await else { unreachable!() }; res.map(|_| syncing.stop_sync()) }; - let result = ShieldedSync { - sync, - signal: shutdown_signal, - channel: signal_sx, - signal_recieved: false - }.await; + let shielded = ShieldedSync { + sync: Box::pin(sync), + signal: Box::pin(shutdown_signal), + watch: signal_sx, + }.await?; + + shielded.save().await.map_err(|e| Error::Other(e.to_string())) } } impl ShieldedContext { /// Check if a signal to stop syncing has been received. - fn interrupted(&mut self) -> ControlFlow> { - match self.interrupt.try_recv() { - Ok(_) => ControlFlow::Break(Ok(())), - Err(TryRecvError::Empty) => ControlFlow::Continue(()), - _ => panic!("The channel for interrupting shielded sync has unexpectedly hung up.") - } + fn interrupted(&self) -> bool { + *self.interrupt.borrow() } /// Transition back into a non-syncing context fn stop_sync(self) -> ShieldedContext { ShieldedContext { utils: self.utils, - last_indexed: self.last_indexed, + last_scanned: self.last_scanned, tree: self.tree, pos_map: self.pos_map, nf_map: self.nf_map, @@ -2029,6 +2006,39 @@ impl ShieldedContext { asset_types: self.asset_types, vk_map: self.vk_map, interrupt: (), + unscanned: self.unscanned, + } + } + + /// Merge data from the given shielded context into the current shielded + /// context. It must be the case that the two shielded contexts share the + /// same last transaction ID and share identical commitment trees. + pub fn merge(&mut self, new_ctx: ShieldedContext) { + debug_assert_eq!(self.last_scanned, new_ctx.last_scanned); + // Merge by simply extending maps. Identical keys should contain + // identical values, so overwriting should not be problematic. + if !self.interrupted() { + self.pos_map.extend(new_ctx.pos_map); + } + self.nf_map.extend(new_ctx.nf_map); + self.note_map.extend(new_ctx.note_map); + self.memo_map.extend(new_ctx.memo_map); + self.div_map.extend(new_ctx.div_map); + self.witness_map.extend(new_ctx.witness_map); + self.spents.extend(new_ctx.spents); + self.asset_types.extend(new_ctx.asset_types); + self.vk_map.extend(new_ctx.vk_map); + self.unscanned = new_ctx.unscanned; + // The deltas are the exception because different keys can reveal + // different parts of the same transaction. Hence each delta needs to be + // merged separately. + for (height, (ep, ntfer_delta, ntx_delta)) in new_ctx.delta_map { + let (_ep, tfer_delta, tx_delta) = self + .delta_map + .entry(height) + .or_insert((ep, TransferDelta::new(), TransactionDelta::new())); + tfer_delta.extend(ntfer_delta); + tx_delta.extend(ntx_delta); } } @@ -2037,10 +2047,11 @@ impl ShieldedContext { pub async fn fetch( &mut self, client: &C, + io: &impl Io, last_query_height: Option, sks: &[ExtendedSpendingKey], fvks: &[ViewingKey], - ) -> ControlFlow> { + ) -> Result<(), Error> { // First determine which of the keys requested to be fetched are new. // Necessary because old transactions will need to be scanned for new // keys. @@ -2059,38 +2070,53 @@ impl ShieldedContext { // If unknown keys are being used, we need to scan older transactions // for any unspent notes - let (txs, mut tx_iter); if !unknown_keys.is_empty() { // Load all transactions accepted until this point - txs = Self::fetch_shielded_transfers(client, None, last_query_height).await?; - tx_iter = txs.iter(); + let fetched = self.fetch_shielded_transfers(client, self.latest_unscanned(), last_query_height).await?; + self.unscanned.extend(fetched); + if self.interrupted() { + return Ok(()); + }; // Do this by constructing a shielding context only for unknown keys - let mut tx_ctx = Self { + let mut tx_ctx = ShieldedContext { utils: self.utils.clone(), ..Default::default() - }; + }.into_syncing(&self.interrupt); for vk in unknown_keys { tx_ctx.pos_map.entry(vk).or_insert_with(BTreeSet::new); } // Update this unknown shielded context until it is level with self - while tx_ctx.last_indexed != self.last_indexed { - if let Some((indexed_tx, (epoch, tx, stx))) = tx_iter.next() { + let txs = self.unscanned.clone(); + while tx_ctx.last_scanned != self.last_scanned { + for (indexed_tx, (epoch, tx, stx)) in &txs { + if self.interrupted() { + break; + } tx_ctx.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; - } else { - break; + self.unscanned.remove(indexed_tx); } } // Merge the context data originating from the unknown keys into the // current context self.merge(tx_ctx); } else { - // Load only transactions accepted from last_txid until this point - txs = Self::fetch_shielded_transfers(client, self.last_indexed, last_query_height) - .await?; - tx_iter = txs.iter(); + let resume_point = std::cmp::max(self.latest_unscanned(), self.last_scanned); + // fetch only new transactions + let fetched = self.fetch_shielded_transfers(client, resume_point, last_query_height).await?; + self.unscanned.extend(fetched); } // Now that we possess the unspent notes corresponding to both old and // new keys up until tx_pos, proceed to scan the new transactions. + let mut txs = BTreeMap::new(); + std::mem::swap(&mut self.unscanned, &mut txs); + for (indexed_tx, (epoch, tx, stx)) in txs { + if self.interrupted() { + self.unscanned.insert(indexed_tx, (epoch, tx, stx)); + } else { + self.scan_tx(client, indexed_tx, epoch, &tx, &stx)?; + } + } + Ok(()) for (indexed_tx, (epoch, tx, stx)) in &mut tx_iter { self.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; } @@ -2100,15 +2126,15 @@ impl ShieldedContext { /// Obtain a chronologically-ordered list of all accepted shielded /// transactions from a node. pub async fn fetch_shielded_transfers( + &mut self, client: &C, last_indexed_tx: Option, last_query_height: Option, - ) -> ControlFlow, Error>> + ) -> Result, Error> { // Query for the last produced block height let last_block_height = query_block(client) - .await - .map_err(ControlFlow::Break)? + .await? .map_or_else(BlockHeight::first, |block| block.height); let last_query_height = last_query_height.unwrap_or(last_block_height); @@ -2118,18 +2144,18 @@ impl ShieldedContext { last_indexed_tx.map_or_else(|| 1, |last| last.height.0); let first_idx_to_query = last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1); + for height in first_height_to_query..=last_query_height.0 { + if self.interrupted() { + return Ok(shielded_txs) + } // Get the valid masp transactions at the specified height let epoch = query_epoch_at_height(client, height.into()) - .await - .map_err(ControlFlow::Break)? - .ok_or_else(|| { - Error::from(QueryError::General( - "Queried height is greater than the last committed \ - block height" - .to_string(), - )) - })?; + .await? + .ok_or_else(|| Error::from(QueryError::General( + "Queried height is greater than the last committed \ + block height".to_string(), + )))?; let first_index_to_query = if height == first_height_to_query { Some(TxIndex(first_idx_to_query)) @@ -2141,8 +2167,7 @@ impl ShieldedContext { client, height.into(), first_index_to_query, - ) - .await.map_err(ControlFlow::Break)? + ).await? { Some(events) => events, None => continue, @@ -2153,23 +2178,21 @@ impl ShieldedContext { // reduce the amount of data sent over the network, but this is a // minimal improvement and it's even hard to tell how many times // we'd need a single masp tx to make this worth it - let block = client - .block(height as u32) + let block = client.block(height as u32) .await - .map_err(|e| ControlFlow::Break(Error::from(QueryError::General(e.to_string()))))? + .map_err(|e| Error::from(QueryError::General(e.to_string())))? .block .data; for (idx, tx_event) in txs_results { let tx = Tx::try_from(block[idx.0 as usize].as_ref()) - .map_err(|e| ControlFlow::Break(Error::Other(e.to_string())))?; + .map_err(|e| Error::Other(e.to_string()))?; let (transfer, masp_transaction) = Self::extract_masp_tx( &tx, ExtractShieldedActionArg::Event::(tx_event), true, ) - .await - .map_err(ControlFlow::Break)?; + .await?; // Collect the current transaction shielded_txs.insert( @@ -2181,8 +2204,7 @@ impl ShieldedContext { ); } } - - ControlFlow::Break(Ok(shielded_txs)) + Ok(shielded_txs) } /// Applies the given transaction to the supplied context. More precisely, @@ -2199,7 +2221,7 @@ impl ShieldedContext { epoch: Epoch, tx: &Transfer, shielded: &Transaction, - ) -> ControlFlow> { + ) -> Result<(), Error> { // For tracking the account changes caused by this Transaction let mut transaction_delta = TransactionDelta::new(); // Listen for notes sent to our viewing keys @@ -2213,12 +2235,12 @@ impl ShieldedContext { // addition for (_, witness) in self.witness_map.iter_mut() { witness.append(node).map_err(|()| { - ControlFlow::Break(Error::Other("note commitment tree is full".to_string())) + Error::Other("note commitment tree is full".to_string()) })?; } let note_pos = self.tree.size(); self.tree.append(node).map_err(|()| { - ControlFlow::Break(Error::Other("note commitment tree is full".to_string())) + Error::Other("note commitment tree is full".to_string()) })?; // Finally, make it easier to construct merkle paths to this new // note @@ -2309,15 +2331,13 @@ impl ShieldedContext { change: -tx.amount.amount().change(), }, ); - self.last_indexed = Some(indexed_tx); - + self.last_scanned = Some(indexed_tx); self.delta_map .insert(indexed_tx, (epoch, transfer_delta, transaction_delta)); - ControlFlow::Break(Ok(())) + Ok(()) } } - /// Extract the payload from the given Tx object fn extract_payload( tx: Tx, @@ -2696,35 +2716,32 @@ pub mod fs { } } - /// A future where one future must finish but the other is optional -struct ShieldedSync +struct ShieldedSync where - A: Future, Error>>, + F: ShieldedUtils, + A: Future, Error>>, B: Future>, { - sync: A, - signal: B, - channel: tokio::sync::oneshot::Sender<()>, - signal_recieved: bool, + sync: Pin>, + signal: Pin>, + watch: tokio::sync::watch::Sender, } -impl Future for ShieldedSync +impl Future for ShieldedSync where - A: Future, Error>>, + F: ShieldedUtils, + A: Future, Error>>, B: Future>, { - type Output = Result, Error>; + type Output = Result, Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Poll::Ready(res) = self.sync.poll(cx) { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(res) = self.sync.as_mut().poll(cx) { Poll::Ready(res) } else { - if !self.signal_recieved { - if let Poll::Ready(_) = self.signal.poll(cx) { - self.signal_recieved = true; - _ = self.channel.send(()); - } + if !(*self.watch.borrow()) && self.signal.as_mut().poll(cx).is_ready() { + self.watch.send(true).unwrap(); } Poll::Pending } From 1b1e338e57abb09188d7e3d9c128a16f0596262d Mon Sep 17 00:00:00 2001 From: satan Date: Mon, 22 Jan 2024 15:11:18 +0100 Subject: [PATCH 3/7] [feat]: Added shielded sync --- crates/apps/src/lib/bench_utils.rs | 6 +- crates/apps/src/lib/cli.rs | 87 +++++++++ crates/apps/src/lib/cli/client.rs | 26 +++ crates/apps/src/lib/client/rpc.rs | 1 - crates/core/src/types/masp.rs | 7 + crates/sdk/src/args.rs | 20 +- crates/sdk/src/masp.rs | 238 +++++++++++++++++------ crates/tests/src/e2e/ibc_tests.rs | 22 ++- crates/tests/src/e2e/ledger_tests.rs | 12 +- crates/tests/src/integration/masp.rs | 276 ++++++++++++++++++++++++++- 10 files changed, 625 insertions(+), 70 deletions(-) diff --git a/crates/apps/src/lib/bench_utils.rs b/crates/apps/src/lib/bench_utils.rs index d6364d0d45..dfcfab631c 100644 --- a/crates/apps/src/lib/bench_utils.rs +++ b/crates/apps/src/lib/bench_utils.rs @@ -957,9 +957,11 @@ impl BenchShieldedCtx { .wallet .find_spending_key(ALBERT_SPENDING_KEY, None) .unwrap(); - async_runtime - .block_on(self.shielded.fetch( + self.shielded = async_runtime + .block_on(self.shielded.syncing( &self.shell, + &StdIo, + None, &[spending_key.into()], &[], )) diff --git a/crates/apps/src/lib/cli.rs b/crates/apps/src/lib/cli.rs index deded0a797..2fb561d090 100644 --- a/crates/apps/src/lib/cli.rs +++ b/crates/apps/src/lib/cli.rs @@ -267,6 +267,7 @@ pub mod cmds { .subcommand(QueryMetaData::def().display_order(5)) // Actions .subcommand(SignTx::def().display_order(6)) + .subcommand(ShieldedSync::def().display_order(6)) .subcommand(GenIbcShieldedTransafer::def().display_order(6)) // Utils .subcommand(Utils::def().display_order(7)) @@ -346,6 +347,7 @@ pub mod cmds { let add_to_eth_bridge_pool = Self::parse_with_ctx(matches, AddToEthBridgePool); let sign_tx = Self::parse_with_ctx(matches, SignTx); + let shielded_sync = Self::parse_with_ctx(matches, ShieldedSync); let gen_ibc_shielded = Self::parse_with_ctx(matches, GenIbcShieldedTransafer); let utils = SubCmd::parse(matches).map(Self::WithoutContext); @@ -397,6 +399,7 @@ pub mod cmds { .or(query_metadata) .or(query_account) .or(sign_tx) + .or(shielded_sync) .or(gen_ibc_shielded) .or(utils) } @@ -483,6 +486,7 @@ pub mod cmds { QueryValidatorState(QueryValidatorState), QueryRewards(QueryRewards), SignTx(SignTx), + ShieldedSync(ShieldedSync), GenIbcShieldedTransafer(GenIbcShieldedTransafer), } @@ -1344,6 +1348,30 @@ pub mod cmds { } } + #[derive(Clone, Debug)] + pub struct ShieldedSync( + pub args::ShieldedSync + ); + + impl SubCmd for ShieldedSync { + const CMD: &'static str = "shielded-sync"; + + fn parse(matches: &ArgMatches) -> Option { + matches.subcommand_matches(Self::CMD).map(|matches| { + ShieldedSync(args::ShieldedSync::parse(matches)) + }) + } + + fn def() -> App { + App::new(Self::CMD) + .about( + "Sync the local shielded context with MASP notes owned by the provided \ + viewing / spending keys up to an optional specified block height." + ) + .add_args::>() + } + } + #[derive(Clone, Debug)] pub struct Bond(pub args::Bond); @@ -3056,6 +3084,7 @@ pub mod args { pub const SIGNATURES: ArgMulti = arg_multi("signatures"); pub const SOURCE: Arg = arg("source"); pub const SOURCE_OPT: ArgOpt = SOURCE.opt(); + pub const SPENDING_KEYS: ArgMulti = arg_multi("spending-keys"); pub const STEWARD: Arg = arg("steward"); pub const SOURCE_VALIDATOR: Arg = arg("source-validator"); pub const STORAGE_KEY: Arg = arg("storage-key"); @@ -3092,6 +3121,7 @@ pub mod args { pub const VALUE: Arg = arg("value"); pub const VOTER_OPT: ArgOpt = arg_opt("voter"); pub const VIEWING_KEY: Arg = arg("key"); + pub const VIEWING_KEYS: ArgMulti = arg_multi("viewing-keys"); pub const VP: ArgOpt = arg_opt("vp"); pub const WALLET_ALIAS_FORCE: ArgFlag = flag("wallet-alias-force"); pub const WASM_CHECKSUMS_PATH: Arg = arg("wasm-checksums-path"); @@ -5598,6 +5628,62 @@ pub mod args { } } + impl Args for ShieldedSync { + fn parse(matches: &ArgMatches) -> Self { + let ledger_address = LEDGER_ADDRESS_DEFAULT.parse(matches); + let last_query_height = BLOCK_HEIGHT_OPT.parse(matches); + let spending_keys = SPENDING_KEYS.parse(matches); + let viewing_keys = VIEWING_KEYS.parse(matches); + Self { + ledger_address, + last_query_height, + spending_keys, + viewing_keys, + } + } + + fn def(app: App) -> App { + app.arg( + LEDGER_ADDRESS_DEFAULT.def().help(LEDGER_ADDRESS_ABOUT), + ) + .arg( + BLOCK_HEIGHT_OPT.def().help( + "Option block height to sync up to. Default is latest." + ) + ) + .arg( + SPENDING_KEYS.def().help( + "List of new spending keys with which to check note ownership. \ + These will be added to the shielded context." + ) + ) + .arg( + VIEWING_KEYS.def().help( + "List of new viewing keys with which to check note ownership. \ + These will be added to the shielded context." + ) + ) + } + } + + impl CliToSdk> for ShieldedSync { + fn to_sdk(self, ctx: &mut Context) -> ShieldedSync { + let chain_ctx = ctx.borrow_mut_chain_or_exit(); + ShieldedSync { + ledger_address: (), + last_query_height: self.last_query_height, + spending_keys: self.spending_keys + .iter() + .map(|sk| chain_ctx.get_cached(sk)) + .collect(), + viewing_keys: self.viewing_keys + .iter() + .map(|vk| chain_ctx.get_cached(vk)) + .collect(), + } + } + } + impl CliToSdk> for GenIbcShieldedTransafer { @@ -5885,6 +5971,7 @@ pub mod args { type TransferSource = WalletTransferSource; type TransferTarget = WalletTransferTarget; type ViewingKey = WalletViewingKey; + type SpendingKey = WalletSpendingKey; } impl CliToSdk> for Tx { diff --git a/crates/apps/src/lib/cli/client.rs b/crates/apps/src/lib/cli/client.rs index 2ae441d2d2..bc1c9b38c0 100644 --- a/crates/apps/src/lib/cli/client.rs +++ b/crates/apps/src/lib/cli/client.rs @@ -298,6 +298,32 @@ impl CliApi { tx::submit_validator_metadata_change(&namada, args) .await?; } + Sub::ShieldedSync(ShieldedSync(mut args)) => { + let client = client.unwrap_or_else(|| { + C::from_tendermint_address( + &mut args.ledger_address, + ) + }); + client.wait_until_node_is_synced(&io).await?; + let args = args.to_sdk(&mut ctx); + let mut chain_ctx = ctx.take_chain_or_exit(); + let sks = args.spending_keys + .into_iter() + .map(|sk| sk.into()) + .collect::>(); + let vks = args.viewing_keys + .into_iter() + .map(|vk| vk.into()) + .collect::>(); + _ = chain_ctx.shielded.load().await; + chain_ctx.shielded.syncing( + &client, + &io, + args.last_query_height, + &sks, + &vks, + ).await?; + } // Eth bridge Sub::AddToEthBridgePool(args) => { let mut args = args.0; diff --git a/crates/apps/src/lib/client/rpc.rs b/crates/apps/src/lib/client/rpc.rs index 86277edfd6..e8683bdc40 100644 --- a/crates/apps/src/lib/client/rpc.rs +++ b/crates/apps/src/lib/client/rpc.rs @@ -137,7 +137,6 @@ pub async fn query_transfers( context.client(), &query_owner, &query_token, - &wallet.get_viewing_keys(), ) .await .unwrap(); diff --git a/crates/core/src/types/masp.rs b/crates/core/src/types/masp.rs index 9290ad800e..a77cb9bb2d 100644 --- a/crates/core/src/types/masp.rs +++ b/crates/core/src/types/masp.rs @@ -147,6 +147,13 @@ impl From } } +impl From for masp_primitives::sapling::ViewingKey { + fn from(value: ExtendedViewingKey) -> Self { + let fvk = masp_primitives::zip32::ExtendedFullViewingKey::from(value); + fvk.fvk.vk + } +} + impl serde::Serialize for ExtendedViewingKey { fn serialize( &self, diff --git a/crates/sdk/src/args.rs b/crates/sdk/src/args.rs index 575ed22c86..3467e69ed6 100644 --- a/crates/sdk/src/args.rs +++ b/crates/sdk/src/args.rs @@ -11,7 +11,7 @@ use namada_core::types::ethereum_events::EthAddress; use namada_core::types::keccak::KeccakHash; use namada_core::types::key::{common, SchemeType}; use namada_core::types::masp::PaymentAddress; -use namada_core::types::storage::Epoch; +use namada_core::types::storage::{BlockHeight, Epoch}; use namada_core::types::time::DateTimeUtc; use namada_core::types::{storage, token}; use namada_governance::cli::onchain::{ @@ -56,6 +56,8 @@ pub trait NamadaTypes: Clone + std::fmt::Debug { type EthereumAddress: Clone + std::fmt::Debug; /// Represents a viewing key type ViewingKey: Clone + std::fmt::Debug; + /// Represents a spending key + type SpendingKey: Clone + std::fmt::Debug; /// Represents the owner of a balance type BalanceOwner: Clone + std::fmt::Debug; /// Represents a public key @@ -98,6 +100,7 @@ impl NamadaTypes for SdkTypes { type TransferSource = namada_core::types::masp::TransferSource; type TransferTarget = namada_core::types::masp::TransferTarget; type ViewingKey = namada_core::types::masp::ExtendedViewingKey; + type SpendingKey = namada_core::types::masp::ExtendedSpendingKey; } /// Common query arguments @@ -1823,6 +1826,21 @@ pub struct SignTx { pub owner: C::Address, } +#[derive(Clone, Debug)] +/// Sync notes from MASP owned by the provided spending / +/// viewing keys. Syncing can be told to stop at a given +/// block height. +pub struct ShieldedSync { + /// The ledger address + pub ledger_address: C::TendermintAddress, + /// Height to sync up to. Defaults to most recent + pub last_query_height: Option, + /// Spending keys used to determine note ownership + pub spending_keys: Vec, + /// Viewing keys used to determine note ownership + pub viewing_keys: Vec, +} + /// Query PoS commission rate #[derive(Clone, Debug)] pub struct QueryCommissionRate { diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index aad95a479f..572f641f8c 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -5,7 +5,7 @@ use std::collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; use std::env; use std::fmt::Debug; use std::future::Future; -use std::ops::{ControlFlow, Deref}; +use std::ops::Deref; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; @@ -54,10 +54,11 @@ use masp_proofs::bellman::groth16::PreparedVerifyingKey; use masp_proofs::bls12_381::Bls12; use masp_proofs::prover::LocalTxProver; use masp_proofs::sapling::SaplingVerificationContext; +use owo_colors::OwoColorize; use namada_core::types::address::{Address, MASP}; use namada_core::types::ibc::IbcShieldedTransfer; use namada_core::types::masp::{ - encode_asset_type, BalanceOwner, ExtendedViewingKey, PaymentAddress, + encode_asset_type, BalanceOwner, PaymentAddress, TransferSource, TransferTarget, }; use namada_core::types::storage::{BlockHeight, Epoch, IndexedTx, TxIndex}; @@ -79,7 +80,7 @@ use crate::queries::Client; use crate::rpc::{query_block, query_conversion, query_epoch_at_height}; use crate::tendermint_rpc::query::Query; use crate::tendermint_rpc::Order; -use crate::{display_line, edisplay_line, rpc, MaybeSend, MaybeSync, Namada}; +use crate::{display_line, edisplay_line, rpc, MaybeSend, MaybeSync, Namada, display}; /// Env var to point to a dir with MASP parameters. When not specified, /// the default OS specific path is used. @@ -504,10 +505,12 @@ pub type TransferDelta = HashMap; /// Represents the changes that were made to a list of shielded accounts pub type TransactionDelta = HashMap; +#[derive(Debug, Clone)] /// A marker type indicating that the shielded context is currently /// syncing. pub struct Syncing; +#[derive(Debug, Clone)] /// A marker type indicating that the shielded context is not currently /// syncing. pub struct NotSyncing; @@ -524,6 +527,12 @@ impl SyncStatus for Syncing { type InterruptHandler = tokio::sync::watch::Receiver; } +#[derive(BorshSerialize, BorshDeserialize, Debug, Default)] +pub struct Unscanned { + txs: BTreeMap, + vks: BTreeSet, +} + /// Represents the current state of the shielded pool from the perspective of /// the chosen viewing keys. #[derive(BorshSerialize, BorshDeserialize, Debug)] @@ -531,8 +540,8 @@ pub struct ShieldedContext { /// Location where this shielded context is saved #[borsh(skip)] pub utils: U, - /// The last indexed transaction to be processed in this context - pub last_scanned: Option, + /// The last indexed transaction fetched by the context + pub last_fetched: Option, /// The commitment tree produced by scanning all transactions up to tx_pos pub tree: CommitmentTree, /// Maps viewing keys to applicable note positions @@ -560,7 +569,7 @@ pub struct ShieldedContext { /// being performed should be interrupted. pub interrupt: S::InterruptHandler, /// Fetched data that we did not scan because syncing was interrupted - pub unscanned: BTreeMap, + pub unscanned: Unscanned, } /// Default implementation to ease construction of TxContexts. Derive cannot be @@ -569,7 +578,7 @@ impl Default for ShieldedContext { fn default() -> ShieldedContext { ShieldedContext:: { utils: U::default(), - last_scanned: None, + last_fetched: None, tree: CommitmentTree::empty(), pos_map: HashMap::default(), nf_map: HashMap::default(), @@ -582,7 +591,7 @@ impl Default for ShieldedContext { asset_types: HashMap::default(), vk_map: HashMap::default(), interrupt: ::InterruptHandler::default(), - unscanned: BTreeMap::default(), + unscanned: Default::default(), } } } @@ -592,7 +601,7 @@ impl ShieldedContext Option { - self.unscanned.keys().max().cloned() + self.unscanned.txs.keys().max().cloned() } /// Extract the relevant shield portions of a [`Tx`], if any. @@ -1568,7 +1577,7 @@ impl ShieldedContext { context.client(), &max_expected_time_per_block_key, ) - .await?; + .await?; let delta_blocks = u32::try_from( delta_time.num_seconds() / max_block_time.0 as i64, @@ -1933,7 +1942,7 @@ impl ShieldedContext { fn into_syncing(self, channel: &tokio::sync::watch::Receiver) -> ShieldedContext { ShieldedContext { utils: self.utils, - last_scanned: self.last_scanned, + last_fetched: self.last_fetched, tree: self.tree, pos_map: self.pos_map, nf_map: self.nf_map, @@ -1957,7 +1966,7 @@ impl ShieldedContext { last_query_height: Option, sks: &[ExtendedSpendingKey], fvks: &[ViewingKey], - ) -> Result<(), Error> { + ) -> Result { let shutdown_signal = async { let (tx, rx) = tokio::sync::oneshot::channel(); crate::control_flow::shutdown_send(tx).await; @@ -1965,20 +1974,33 @@ impl ShieldedContext { }; let (signal_sx, signal_rx) = tokio::sync::watch::channel(false); + + display_line!(io, "{}", "==== Shielded sync started ====".on_white()); + if let Some(indexed) = self.last_fetched.as_ref() { + display_line!(io, " -> last fetched tx: block height {}, index {}", indexed.height, indexed.index); + } + if !self.unscanned.vks.is_empty() { + display_line!(io, " -> viewing keys still to be synced: {}", self.unscanned.vks.len()); + } + if !self.unscanned.txs.is_empty() { + display_line!(io, " -> fetched txs to scan: {}", self.unscanned.txs.len()); + } + display_line!(io, "\n\n"); + let mut syncing = self.into_syncing(&signal_rx); let sync = async move { - let ControlFlow::Break(res) = syncing.fetch(client, last_query_height, sks, fvks).await else { - unreachable!() - }; - res.map(|_| syncing.stop_sync()) + syncing.fetch(client, io, last_query_height, sks, fvks) + .await + .map(|_| syncing.stop_sync()) }; let shielded = ShieldedSync { sync: Box::pin(sync), signal: Box::pin(shutdown_signal), watch: signal_sx, }.await?; - - shielded.save().await.map_err(|e| Error::Other(e.to_string())) + display_line!(io, "Saving shielded ctx... "); + shielded.save().await.map_err(|e| Error::Other(e.to_string()))?; + Ok(shielded) } } @@ -1993,7 +2015,7 @@ impl ShieldedContext { fn stop_sync(self) -> ShieldedContext { ShieldedContext { utils: self.utils, - last_scanned: self.last_scanned, + last_fetched: self.last_fetched, tree: self.tree, pos_map: self.pos_map, nf_map: self.nf_map, @@ -2014,12 +2036,8 @@ impl ShieldedContext { /// context. It must be the case that the two shielded contexts share the /// same last transaction ID and share identical commitment trees. pub fn merge(&mut self, new_ctx: ShieldedContext) { - debug_assert_eq!(self.last_scanned, new_ctx.last_scanned); // Merge by simply extending maps. Identical keys should contain // identical values, so overwriting should not be problematic. - if !self.interrupted() { - self.pos_map.extend(new_ctx.pos_map); - } self.nf_map.extend(new_ctx.nf_map); self.note_map.extend(new_ctx.note_map); self.memo_map.extend(new_ctx.memo_map); @@ -2028,7 +2046,13 @@ impl ShieldedContext { self.spents.extend(new_ctx.spents); self.asset_types.extend(new_ctx.asset_types); self.vk_map.extend(new_ctx.vk_map); - self.unscanned = new_ctx.unscanned; + // if we successfully finished scanning, we can clear + // out viewing keys from our unscanned data and add them to the + // pos map. + if !self.interrupted() { + self.pos_map.extend(new_ctx.pos_map); + self.unscanned.vks.clear(); + } // The deltas are the exception because different keys can reveal // different parts of the same transaction. Hence each delta needs to be // merged separately. @@ -2070,11 +2094,14 @@ impl ShieldedContext { // If unknown keys are being used, we need to scan older transactions // for any unspent notes - if !unknown_keys.is_empty() { + if !unknown_keys.is_empty() || !self.unscanned.vks.is_empty() { + display_line!(io, "Scanning historical notes to synchronize new viewing key(s)."); + self.unscanned.vks.extend(unknown_keys.clone()); // Load all transactions accepted until this point - let fetched = self.fetch_shielded_transfers(client, self.latest_unscanned(), last_query_height).await?; - self.unscanned.extend(fetched); + let fetched = self.fetch_shielded_transfers(client, io, self.latest_unscanned(), last_query_height).await?; + self.unscanned.txs.extend(fetched); if self.interrupted() { + display_line!(io, ""); return Ok(()); }; // Do this by constructing a shielding context only for unknown keys @@ -2086,41 +2113,37 @@ impl ShieldedContext { tx_ctx.pos_map.entry(vk).or_insert_with(BTreeSet::new); } // Update this unknown shielded context until it is level with self - let txs = self.unscanned.clone(); - while tx_ctx.last_scanned != self.last_scanned { - for (indexed_tx, (epoch, tx, stx)) in &txs { - if self.interrupted() { - break; - } - tx_ctx.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; - self.unscanned.remove(indexed_tx); + let txs = ProgressLogging::new(self.unscanned.txs.clone(), io, ProgressType::Scan); + for (indexed_tx, (epoch, tx, stx)) in txs.iter() { + if self.interrupted() || self.last_fetched.is_none() || indexed_tx >= self.last_fetched.as_ref().unwrap() { + break; } + tx_ctx.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; + self.unscanned.txs.remove(indexed_tx); } // Merge the context data originating from the unknown keys into the // current context self.merge(tx_ctx); } else { - let resume_point = std::cmp::max(self.latest_unscanned(), self.last_scanned); + let resume_point = std::cmp::max(self.latest_unscanned(), self.last_fetched); // fetch only new transactions - let fetched = self.fetch_shielded_transfers(client, resume_point, last_query_height).await?; - self.unscanned.extend(fetched); + let fetched = self.fetch_shielded_transfers(client, io, resume_point, last_query_height).await?; + self.unscanned.txs.extend(fetched); + display_line!(io, ""); } // Now that we possess the unspent notes corresponding to both old and // new keys up until tx_pos, proceed to scan the new transactions. let mut txs = BTreeMap::new(); - std::mem::swap(&mut self.unscanned, &mut txs); + std::mem::swap(&mut self.unscanned.txs, &mut txs); + let txs = ProgressLogging::new(txs, io, ProgressType::Scan); for (indexed_tx, (epoch, tx, stx)) in txs { if self.interrupted() { - self.unscanned.insert(indexed_tx, (epoch, tx, stx)); + self.unscanned.txs.insert(indexed_tx, (epoch, tx, stx)); } else { self.scan_tx(client, indexed_tx, epoch, &tx, &stx)?; } - } - Ok(()) - for (indexed_tx, (epoch, tx, stx)) in &mut tx_iter { - self.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; } - ControlFlow::Break(Ok(())) + Ok(()) } /// Obtain a chronologically-ordered list of all accepted shielded @@ -2128,6 +2151,7 @@ impl ShieldedContext { pub async fn fetch_shielded_transfers( &mut self, client: &C, + io: &impl Io, last_indexed_tx: Option, last_query_height: Option, ) -> Result, Error> @@ -2144,8 +2168,8 @@ impl ShieldedContext { last_indexed_tx.map_or_else(|| 1, |last| last.height.0); let first_idx_to_query = last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1); - - for height in first_height_to_query..=last_query_height.0 { + let heights = ProgressLogging::new(first_height_to_query..=last_query_height.0, io, ProgressType::Fetch); + for height in heights { if self.interrupted() { return Ok(shielded_txs) } @@ -2184,6 +2208,10 @@ impl ShieldedContext { .block .data; + self.last_fetched = txs_results + .last() + .map(|(idx, _)| IndexedTx{height: height.into(), index: *idx}) + .or_else(|| Some(IndexedTx{height: height.into(), index: Default::default()})); for (idx, tx_event) in txs_results { let tx = Tx::try_from(block[idx.0 as usize].as_ref()) .map_err(|e| Error::Other(e.to_string()))?; @@ -2331,7 +2359,6 @@ impl ShieldedContext { change: -tx.amount.amount().change(), }, ); - self.last_scanned = Some(indexed_tx); self.delta_map .insert(indexed_tx, (epoch, transfer_delta, transaction_delta)); Ok(()) @@ -2447,8 +2474,8 @@ async fn extract_payload_from_shielded_action<'args, C: Client + Sync>( namada_core::types::ibc::get_shielded_transfer( ibc_event, ) - .ok() - .flatten(); + .ok() + .flatten(); if event.is_some() { return event; } @@ -2705,13 +2732,9 @@ pub mod fs { // Atomicity is required to prevent other client instances from // reading corrupt data. std::fs::rename( - tmp_path.clone(), + tmp_path, self.context_dir.join(FILE_NAME), - )?; - // Finally, remove our temporary file to allow future saving of - // shielded contexts. - std::fs::remove_file(tmp_path)?; - Ok(()) + ) } } } @@ -2746,4 +2769,105 @@ where Poll::Pending } } -} \ No newline at end of file +} + + +#[derive(Debug, Copy, Clone)] +enum ProgressType { + Fetch, + Scan, +} + +struct ProgressLogging<'io, T, U: Io> { + items: Vec, + index: usize, + length: usize, + io: &'io U, + r#type: ProgressType, +} + +impl<'io, T, U: Io> ProgressLogging<'io, T, U> { + fn new(items: I, io: &'io U, r#type: ProgressType) -> Self + where + I: IntoIterator + { + let items: Vec<_> = items.into_iter().collect(); + Self { + length: items.len(), + items, + index: 0, + io, + r#type, + } + } + + fn iter(&self) -> ProgressLoggingIterator<'_, 'io, T, U> { + ProgressLoggingIterator { + items: &self.items, + index: 0, + io: self.io, + r#type: self.r#type, + } + } +} + +impl<'io, T: Debug, U: Io> Iterator for ProgressLogging<'io, T, U> +{ + type Item = T; + + fn next(&mut self) -> Option { + if self.index == 0 { + self.items = { + let mut new_items = vec![]; + std::mem::swap(&mut new_items, &mut self.items); + new_items.into_iter().rev().collect() + }; + } + if self.items.is_empty() { + return None; + } + self.index += 1; + let percent = (100 * self.index) / self.length; + let completed: String = vec!['#'; percent].iter().collect(); + let incomplete: String = vec!['.'; 100 - percent].iter().collect(); + display_line!(self.io, "\x1b[2A\x1b[J"); + match self.r#type { + ProgressType::Fetch => display_line!(self.io, "Fetched block {:?} of {:?}", self.items.last().unwrap(), self.items[0]), + ProgressType::Scan => display_line!(self.io, "Scanning {} of {}", self.index, self.length), + } + display!(self.io, "[{}{}] ~~ {} %", completed, incomplete, percent); + self.io.flush(); + self.items.pop() + } +} + +struct ProgressLoggingIterator<'iter, 'io, T, U: Io> { + items: &'iter [T], + index: usize, + io: &'io U, + r#type: ProgressType +} + +impl<'iter, 'io, T: Debug, U: Io> Iterator for ProgressLoggingIterator<'iter, 'io, T, U> { + type Item = &'iter T; + + fn next(&mut self) -> Option { + if self.index >= self.items.len() { + return None; + } + self.index += 1; + let percent = (100 * self.index) / self.items.len(); + let completed: String = vec!['#'; percent].iter().collect(); + let incomplete: String = vec!['.'; 100 - percent].iter().collect(); + display_line!(self.io, "\x1b[2A\x1b[J"); + match self.r#type { + ProgressType::Fetch => display_line!(self.io, "Fetched block {:?} of {:?}", self.items.last().unwrap(), self.items[0]), + ProgressType::Scan => display_line!(self.io, "Scanning {} of {}", self.index, self.items.len()), + } + display!(self.io, "[{}{}] ~~ {} %", completed, incomplete, percent); + self.io.flush(); + self.items.get(self.index - 1) + } +} + + diff --git a/crates/tests/src/e2e/ibc_tests.rs b/crates/tests/src/e2e/ibc_tests.rs index 4cd825b8c8..9ee3922e26 100644 --- a/crates/tests/src/e2e/ibc_tests.rs +++ b/crates/tests/src/e2e/ibc_tests.rs @@ -1130,7 +1130,7 @@ fn transfer_on_chain( "--node", &rpc, ]; - let mut client = run!(test, Bin::Client, tx_args, Some(40))?; + let mut client = run!(test, Bin::Client, tx_args, Some(120))?; client.exp_string(TX_ACCEPTED)?; client.exp_string(TX_APPLIED_SUCCESS)?; client.assert_success(); @@ -1297,6 +1297,16 @@ fn shielded_transfer( // Send a token to the shielded address on Chain A transfer_on_chain(test_a, ALBERT, AA_PAYMENT_ADDRESS, BTC, 10, ALBERT_KEY)?; + let rpc = get_actor_rpc(test_a, Who::Validator(0)); + let tx_args = vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + "--node", + &rpc, + ]; + let mut client = run!(test_a, Bin::Client, tx_args, Some(120))?; + client.assert_success(); // Send a token from SP(A) on Chain A to PA(B) on Chain B let amount = Amount::native_whole(10).to_string_native(); @@ -1895,6 +1905,16 @@ fn check_shielded_balances( ) -> Result<()> { // Check the balance on Chain B std::env::set_var(ENV_VAR_CHAIN_ID, test_a.net.chain_id.to_string()); + let rpc_b = get_actor_rpc(test_b, Who::Validator(0)); + let tx_args = vec![ + "shielded-sync", + "--viewing-keys", + AB_VIEWING_KEY, + "--node", + &rpc_b, + ]; + let mut client = run!(test_a, Bin::Client, tx_args, Some(120))?; + client.assert_success(); // PA(B) on Chain B has received BTC on chain A let token_addr = find_address(test_a, BTC)?.to_string(); std::env::set_var(ENV_VAR_CHAIN_ID, test_b.net.chain_id.to_string()); diff --git a/crates/tests/src/e2e/ledger_tests.rs b/crates/tests/src/e2e/ledger_tests.rs index 1185828196..e3b8dee732 100644 --- a/crates/tests/src/e2e/ledger_tests.rs +++ b/crates/tests/src/e2e/ledger_tests.rs @@ -725,7 +725,7 @@ fn wrapper_disposable_signer() -> Result<()> { "--token", NAM, "--amount", - "50", + "50000", "--ledger-address", &validator_one_rpc, ]; @@ -735,6 +735,16 @@ fn wrapper_disposable_signer() -> Result<()> { client.exp_string(TX_APPLIED_SUCCESS)?; let _ep1 = epoch_sleep(&test, &validator_one_rpc, 720)?; + let tx_args = vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + "--node", + &validator_one_rpc, + ]; + let mut client = run!(test, Bin::Client, tx_args, Some(120))?; + client.assert_success(); + let tx_args = vec![ "transfer", "--source", diff --git a/crates/tests/src/integration/masp.rs b/crates/tests/src/integration/masp.rs index 548e38e091..29d7bba6d3 100644 --- a/crates/tests/src/integration/masp.rs +++ b/crates/tests/src/integration/masp.rs @@ -50,6 +50,21 @@ fn masp_incentives() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + AB_VIEWING_KEY, + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert BTC balance at VK(A) is 1 let captured = CapturedOutput::of(|| { run( @@ -91,6 +106,18 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert BTC balance at VK(A) is still 1 let captured = CapturedOutput::of(|| { run( @@ -153,6 +180,18 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert BTC balance at VK(A) is still 1 let captured = CapturedOutput::of(|| { run( @@ -235,6 +274,18 @@ fn masp_incentives() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert ETH balance at VK(B) is 0.001 let captured = CapturedOutput::of(|| { run( @@ -276,6 +327,18 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert ETH balance at VK(B) is still 0.001 let captured = CapturedOutput::of(|| { run( @@ -337,7 +400,6 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); - // Send 0.001 ETH from SK(B) to Christel run( &node, @@ -360,6 +422,19 @@ fn masp_incentives() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + + // Assert ETH balance at VK(B) is 0 let captured = CapturedOutput::of(|| { run( @@ -380,6 +455,17 @@ fn masp_incentives() -> Result<()> { assert!(captured.contains("No shielded eth balance found")); node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Assert VK(B) retains the NAM rewards dispensed in the correct // amount. @@ -402,6 +488,17 @@ fn masp_incentives() -> Result<()> { assert!(captured.contains("nam: 0.09112")); node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Assert NAM balance at MASP pool is // the accumulation of rewards from the shielded assets (BTC and ETH) @@ -448,6 +545,18 @@ fn masp_incentives() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); + // Assert BTC balance at VK(A) is 0 let captured = CapturedOutput::of(|| { run( @@ -508,6 +617,17 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Assert NAM balance at VK(A) is the rewards dispensed earlier // (since VK(A) has no shielded assets, no further rewards should @@ -574,7 +694,17 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary to prevent conversion expiry during transaction // construction node.next_epoch(); - + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Send all NAM rewards from SK(B) to Christel run( &node, @@ -599,7 +729,17 @@ fn masp_incentives() -> Result<()> { // Wait till epoch boundary node.next_epoch(); - + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Send all NAM rewards from SK(A) to Bertha run( &node, @@ -641,6 +781,18 @@ fn masp_incentives() -> Result<()> { assert!(captured.result.is_ok()); assert!(captured.contains("No shielded nam balance found")); + + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ], + )?; + node.assert_success(); // Assert NAM balance at VK(B) is 0 let captured = CapturedOutput::of(|| { run( @@ -699,6 +851,20 @@ fn masp_pinned_txs() -> Result<()> { // Wait till epoch boundary let _ep0 = node.next_epoch(); + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--viewing-keys", + AC_VIEWING_KEY, + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // Assert PPA(C) cannot be recognized by incorrect viewing key let captured = CapturedOutput::with_input(AB_VIEWING_KEY.into()).run(|| { @@ -768,6 +934,18 @@ fn masp_pinned_txs() -> Result<()> { // This makes it more consistent for some reason? let _ep2 = node.next_epoch(); + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // Assert PPA(C) has the 20 BTC transaction pinned to it let captured = CapturedOutput::with_input(AC_VIEWING_KEY.into()).run(|| { @@ -811,6 +989,18 @@ fn masp_pinned_txs() -> Result<()> { // Wait till epoch boundary let _ep1 = node.next_epoch(); + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // Assert PPA(C) does not NAM pinned to it on epoch boundary let captured = CapturedOutput::with_input(AC_VIEWING_KEY.into()).run(|| { @@ -865,6 +1055,22 @@ fn masp_txs_and_queries() -> Result<()> { let (mut node, _services) = setup::setup()?; _ = node.next_epoch(); + + // add necessary viewing keys to shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + AB_VIEWING_KEY, + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + let txs_args = vec![ // 0. Attempt to spend 10 BTC at SK(A) to PA(B) ( @@ -1075,11 +1281,26 @@ fn masp_txs_and_queries() -> Result<()> { ]; for (tx_args, tx_result) in &txs_args { - // We ensure transfers don't cross epoch boundaries. - if tx_args[0] == "transfer" { + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // there is no need to dry run balance queries + let dry_run_args = if tx_args[0] == "transfer" { + // We ensure transfers don't cross epoch boundaries. node.next_epoch(); - } - for &dry_run in &[true, false] { + vec![true, false] + } else { + vec![false] + }; + for &dry_run in &dry_run_args { let tx_args = if dry_run && tx_args[0] == "transfer" { vec![tx_args.clone(), vec!["--dry-run"]].concat() } else { @@ -1191,6 +1412,21 @@ fn wrapper_fee_unshielding() -> Result<()> { node.assert_success(); _ = node.next_epoch(); + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + AB_VIEWING_KEY, + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // 2. Valid unshielding run( &node, @@ -1215,6 +1451,18 @@ fn wrapper_fee_unshielding() -> Result<()> { )?; node.assert_success(); + // sync shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // 3. Invalid unshielding let tx_args = vec![ "transfer", @@ -1280,6 +1528,20 @@ fn cross_epoch_tx() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec![ + "shielded-sync", + "--viewing-keys", + AA_VIEWING_KEY, + "--node", + validator_one_rpc, + ] + )?; + node.assert_success(); + // 2. Generate the tx in the current epoch let tempdir = tempfile::tempdir().unwrap(); run( From 787ff5333cdb60f0970f09c593a43d86602af39f Mon Sep 17 00:00:00 2001 From: satan Date: Mon, 22 Jan 2024 15:22:39 +0100 Subject: [PATCH 4/7] Rebased on v0.30 --- crates/apps/src/lib/cli.rs | 61 ++++--- crates/apps/src/lib/cli/client.rs | 27 ++-- crates/apps/src/lib/client/rpc.rs | 6 +- crates/core/src/types/storage.rs | 9 +- crates/sdk/src/args.rs | 2 +- crates/sdk/src/masp.rs | 230 ++++++++++++++++++--------- crates/tests/src/integration/masp.rs | 106 +++--------- 7 files changed, 224 insertions(+), 217 deletions(-) diff --git a/crates/apps/src/lib/cli.rs b/crates/apps/src/lib/cli.rs index 2fb561d090..8913808119 100644 --- a/crates/apps/src/lib/cli.rs +++ b/crates/apps/src/lib/cli.rs @@ -1349,24 +1349,23 @@ pub mod cmds { } #[derive(Clone, Debug)] - pub struct ShieldedSync( - pub args::ShieldedSync - ); + pub struct ShieldedSync(pub args::ShieldedSync); impl SubCmd for ShieldedSync { const CMD: &'static str = "shielded-sync"; fn parse(matches: &ArgMatches) -> Option { - matches.subcommand_matches(Self::CMD).map(|matches| { - ShieldedSync(args::ShieldedSync::parse(matches)) - }) + matches + .subcommand_matches(Self::CMD) + .map(|matches| ShieldedSync(args::ShieldedSync::parse(matches))) } fn def() -> App { App::new(Self::CMD) .about( - "Sync the local shielded context with MASP notes owned by the provided \ - viewing / spending keys up to an optional specified block height." + "Sync the local shielded context with MASP notes owned by \ + the provided viewing / spending keys up to an optional \ + specified block height.", ) .add_args::>() } @@ -3084,7 +3083,8 @@ pub mod args { pub const SIGNATURES: ArgMulti = arg_multi("signatures"); pub const SOURCE: Arg = arg("source"); pub const SOURCE_OPT: ArgOpt = SOURCE.opt(); - pub const SPENDING_KEYS: ArgMulti = arg_multi("spending-keys"); + pub const SPENDING_KEYS: ArgMulti = + arg_multi("spending-keys"); pub const STEWARD: Arg = arg("steward"); pub const SOURCE_VALIDATOR: Arg = arg("source-validator"); pub const STORAGE_KEY: Arg = arg("storage-key"); @@ -3121,7 +3121,8 @@ pub mod args { pub const VALUE: Arg = arg("value"); pub const VOTER_OPT: ArgOpt = arg_opt("voter"); pub const VIEWING_KEY: Arg = arg("key"); - pub const VIEWING_KEYS: ArgMulti = arg_multi("viewing-keys"); + pub const VIEWING_KEYS: ArgMulti = + arg_multi("viewing-keys"); pub const VP: ArgOpt = arg_opt("vp"); pub const WALLET_ALIAS_FORCE: ArgFlag = flag("wallet-alias-force"); pub const WASM_CHECKSUMS_PATH: Arg = arg("wasm-checksums-path"); @@ -5643,26 +5644,18 @@ pub mod args { } fn def(app: App) -> App { - app.arg( - LEDGER_ADDRESS_DEFAULT.def().help(LEDGER_ADDRESS_ABOUT), - ) - .arg( - BLOCK_HEIGHT_OPT.def().help( - "Option block height to sync up to. Default is latest." - ) - ) - .arg( - SPENDING_KEYS.def().help( - "List of new spending keys with which to check note ownership. \ - These will be added to the shielded context." - ) - ) - .arg( - VIEWING_KEYS.def().help( - "List of new viewing keys with which to check note ownership. \ - These will be added to the shielded context." - ) - ) + app.arg(LEDGER_ADDRESS_DEFAULT.def().help(LEDGER_ADDRESS_ABOUT)) + .arg(BLOCK_HEIGHT_OPT.def().help( + "Option block height to sync up to. Default is latest.", + )) + .arg(SPENDING_KEYS.def().help( + "List of new spending keys with which to check note \ + ownership. These will be added to the shielded context.", + )) + .arg(VIEWING_KEYS.def().help( + "List of new viewing keys with which to check note \ + ownership. These will be added to the shielded context.", + )) } } @@ -5672,11 +5665,13 @@ pub mod args { ShieldedSync { ledger_address: (), last_query_height: self.last_query_height, - spending_keys: self.spending_keys + spending_keys: self + .spending_keys .iter() .map(|sk| chain_ctx.get_cached(sk)) .collect(), - viewing_keys: self.viewing_keys + viewing_keys: self + .viewing_keys .iter() .map(|vk| chain_ctx.get_cached(vk)) .collect(), @@ -5967,11 +5962,11 @@ pub mod args { type Keypair = WalletKeypair; type NativeAddress = (); type PublicKey = WalletPublicKey; + type SpendingKey = WalletSpendingKey; type TendermintAddress = TendermintAddress; type TransferSource = WalletTransferSource; type TransferTarget = WalletTransferTarget; type ViewingKey = WalletViewingKey; - type SpendingKey = WalletSpendingKey; } impl CliToSdk> for Tx { diff --git a/crates/apps/src/lib/cli/client.rs b/crates/apps/src/lib/cli/client.rs index bc1c9b38c0..6c68c7c157 100644 --- a/crates/apps/src/lib/cli/client.rs +++ b/crates/apps/src/lib/cli/client.rs @@ -300,29 +300,32 @@ impl CliApi { } Sub::ShieldedSync(ShieldedSync(mut args)) => { let client = client.unwrap_or_else(|| { - C::from_tendermint_address( - &mut args.ledger_address, - ) + C::from_tendermint_address(&mut args.ledger_address) }); client.wait_until_node_is_synced(&io).await?; let args = args.to_sdk(&mut ctx); let mut chain_ctx = ctx.take_chain_or_exit(); - let sks = args.spending_keys + let sks = args + .spending_keys .into_iter() .map(|sk| sk.into()) .collect::>(); - let vks = args.viewing_keys + let vks = args + .viewing_keys .into_iter() .map(|vk| vk.into()) .collect::>(); _ = chain_ctx.shielded.load().await; - chain_ctx.shielded.syncing( - &client, - &io, - args.last_query_height, - &sks, - &vks, - ).await?; + chain_ctx + .shielded + .syncing( + &client, + &io, + args.last_query_height, + &sks, + &vks, + ) + .await?; } // Eth bridge Sub::AddToEthBridgePool(args) => { diff --git a/crates/apps/src/lib/client/rpc.rs b/crates/apps/src/lib/client/rpc.rs index e8683bdc40..8ccf71bbf0 100644 --- a/crates/apps/src/lib/client/rpc.rs +++ b/crates/apps/src/lib/client/rpc.rs @@ -133,11 +133,7 @@ pub async fn query_transfers( let _ = shielded.load().await; // Obtain the effects of all shielded and transparent transactions let transfers = shielded - .query_tx_deltas( - context.client(), - &query_owner, - &query_token, - ) + .query_tx_deltas(context.client(), &query_owner, &query_token) .await .unwrap(); // To facilitate lookups of human-readable token names diff --git a/crates/core/src/types/storage.rs b/crates/core/src/types/storage.rs index c5803f82a7..304dfde543 100644 --- a/crates/core/src/types/storage.rs +++ b/crates/core/src/types/storage.rs @@ -1454,14 +1454,7 @@ impl GetEventNonce for InnerEthEventsQueue { /// Represents the pointers of an indexed tx, which are the block height and the /// index inside that block #[derive( - Default, - Debug, - Copy, - Clone, - BorshSerialize, - BorshDeserialize, - Eq, - PartialEq, + Default, Debug, Copy, Clone, BorshSerialize, BorshDeserialize, Eq, PartialEq, )] pub struct IndexedTx { /// The block height of the indexed tx diff --git a/crates/sdk/src/args.rs b/crates/sdk/src/args.rs index 3467e69ed6..cc7b18c76d 100644 --- a/crates/sdk/src/args.rs +++ b/crates/sdk/src/args.rs @@ -96,11 +96,11 @@ impl NamadaTypes for SdkTypes { type Keypair = namada_core::types::key::common::SecretKey; type NativeAddress = Address; type PublicKey = namada_core::types::key::common::PublicKey; + type SpendingKey = namada_core::types::masp::ExtendedSpendingKey; type TendermintAddress = (); type TransferSource = namada_core::types::masp::TransferSource; type TransferTarget = namada_core::types::masp::TransferTarget; type ViewingKey = namada_core::types::masp::ExtendedViewingKey; - type SpendingKey = namada_core::types::masp::ExtendedSpendingKey; } /// Common query arguments diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index 572f641f8c..2817bad460 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -54,12 +54,11 @@ use masp_proofs::bellman::groth16::PreparedVerifyingKey; use masp_proofs::bls12_381::Bls12; use masp_proofs::prover::LocalTxProver; use masp_proofs::sapling::SaplingVerificationContext; -use owo_colors::OwoColorize; use namada_core::types::address::{Address, MASP}; use namada_core::types::ibc::IbcShieldedTransfer; use namada_core::types::masp::{ - encode_asset_type, BalanceOwner, PaymentAddress, - TransferSource, TransferTarget, + encode_asset_type, BalanceOwner, PaymentAddress, TransferSource, + TransferTarget, }; use namada_core::types::storage::{BlockHeight, Epoch, IndexedTx, TxIndex}; use namada_core::types::time::{DateTimeUtc, DurationSecs}; @@ -67,6 +66,7 @@ use namada_ibc::IbcMessage; use namada_token::{self as token, MaspDenom, Transfer}; use namada_tx::data::{TxResult, WrapperTx}; use namada_tx::Tx; +use owo_colors::OwoColorize; use rand_core::{CryptoRng, OsRng, RngCore}; use ripemd::Digest as RipemdDigest; use sha2::Digest; @@ -80,7 +80,9 @@ use crate::queries::Client; use crate::rpc::{query_block, query_conversion, query_epoch_at_height}; use crate::tendermint_rpc::query::Query; use crate::tendermint_rpc::Order; -use crate::{display_line, edisplay_line, rpc, MaybeSend, MaybeSync, Namada, display}; +use crate::{ + display, display_line, edisplay_line, rpc, MaybeSend, MaybeSync, Namada, +}; /// Env var to point to a dir with MASP parameters. When not specified, /// the default OS specific path is used. @@ -596,8 +598,9 @@ impl Default for ShieldedContext { } } -impl ShieldedContext { - +impl + ShieldedContext +{ /// If we have unscanned txs in the local cache, get the index of the /// latest one. pub fn latest_unscanned(&self) -> Option { @@ -1448,7 +1451,7 @@ impl ShieldedContext { let should_process = !transfers .contains_key(&IndexedTx { height, index: idx }) && block_results[u64::from(height) as usize] - .is_accepted(idx.0 as usize); + .is_accepted(idx.0 as usize); if !should_process { continue; } @@ -1939,7 +1942,10 @@ impl ShieldedContext { } } - fn into_syncing(self, channel: &tokio::sync::watch::Receiver) -> ShieldedContext { + fn into_syncing( + self, + channel: &tokio::sync::watch::Receiver, + ) -> ShieldedContext { ShieldedContext { utils: self.utils, last_fetched: self.last_fetched, @@ -1977,19 +1983,33 @@ impl ShieldedContext { display_line!(io, "{}", "==== Shielded sync started ====".on_white()); if let Some(indexed) = self.last_fetched.as_ref() { - display_line!(io, " -> last fetched tx: block height {}, index {}", indexed.height, indexed.index); + display_line!( + io, + " -> last fetched tx: block height {}, index {}", + indexed.height, + indexed.index + ); } if !self.unscanned.vks.is_empty() { - display_line!(io, " -> viewing keys still to be synced: {}", self.unscanned.vks.len()); + display_line!( + io, + " -> viewing keys still to be synced: {}", + self.unscanned.vks.len() + ); } if !self.unscanned.txs.is_empty() { - display_line!(io, " -> fetched txs to scan: {}", self.unscanned.txs.len()); + display_line!( + io, + " -> fetched txs to scan: {}", + self.unscanned.txs.len() + ); } display_line!(io, "\n\n"); let mut syncing = self.into_syncing(&signal_rx); let sync = async move { - syncing.fetch(client, io, last_query_height, sks, fvks) + syncing + .fetch(client, io, last_query_height, sks, fvks) .await .map(|_| syncing.stop_sync()) }; @@ -1997,18 +2017,21 @@ impl ShieldedContext { sync: Box::pin(sync), signal: Box::pin(shutdown_signal), watch: signal_sx, - }.await?; + } + .await?; display_line!(io, "Saving shielded ctx... "); - shielded.save().await.map_err(|e| Error::Other(e.to_string()))?; + shielded + .save() + .await + .map_err(|e| Error::Other(e.to_string()))?; Ok(shielded) } } impl ShieldedContext { - /// Check if a signal to stop syncing has been received. fn interrupted(&self) -> bool { - *self.interrupt.borrow() + *self.interrupt.borrow() } /// Transition back into a non-syncing context @@ -2095,10 +2118,20 @@ impl ShieldedContext { // If unknown keys are being used, we need to scan older transactions // for any unspent notes if !unknown_keys.is_empty() || !self.unscanned.vks.is_empty() { - display_line!(io, "Scanning historical notes to synchronize new viewing key(s)."); + display_line!( + io, + "Scanning historical notes to synchronize new viewing key(s)." + ); self.unscanned.vks.extend(unknown_keys.clone()); // Load all transactions accepted until this point - let fetched = self.fetch_shielded_transfers(client, io, self.latest_unscanned(), last_query_height).await?; + let fetched = self + .fetch_shielded_transfers( + client, + io, + self.latest_unscanned(), + last_query_height, + ) + .await?; self.unscanned.txs.extend(fetched); if self.interrupted() { display_line!(io, ""); @@ -2108,26 +2141,42 @@ impl ShieldedContext { let mut tx_ctx = ShieldedContext { utils: self.utils.clone(), ..Default::default() - }.into_syncing(&self.interrupt); + } + .into_syncing(&self.interrupt); for vk in unknown_keys { tx_ctx.pos_map.entry(vk).or_insert_with(BTreeSet::new); } // Update this unknown shielded context until it is level with self - let txs = ProgressLogging::new(self.unscanned.txs.clone(), io, ProgressType::Scan); + let txs = ProgressLogging::new( + self.unscanned.txs.clone(), + io, + ProgressType::Scan, + ); for (indexed_tx, (epoch, tx, stx)) in txs.iter() { - if self.interrupted() || self.last_fetched.is_none() || indexed_tx >= self.last_fetched.as_ref().unwrap() { + if self.interrupted() + || self.last_fetched.is_none() + || indexed_tx >= self.last_fetched.as_ref().unwrap() + { break; } - tx_ctx.scan_tx(client, *indexed_tx, *epoch, tx, stx)?; + tx_ctx.scan_tx(*indexed_tx, *epoch, tx, stx)?; self.unscanned.txs.remove(indexed_tx); } // Merge the context data originating from the unknown keys into the // current context self.merge(tx_ctx); } else { - let resume_point = std::cmp::max(self.latest_unscanned(), self.last_fetched); + let resume_point = + std::cmp::max(self.latest_unscanned(), self.last_fetched); // fetch only new transactions - let fetched = self.fetch_shielded_transfers(client, io, resume_point, last_query_height).await?; + let fetched = self + .fetch_shielded_transfers( + client, + io, + resume_point, + last_query_height, + ) + .await?; self.unscanned.txs.extend(fetched); display_line!(io, ""); } @@ -2140,7 +2189,7 @@ impl ShieldedContext { if self.interrupted() { self.unscanned.txs.insert(indexed_tx, (epoch, tx, stx)); } else { - self.scan_tx(client, indexed_tx, epoch, &tx, &stx)?; + self.scan_tx(indexed_tx, epoch, &tx, &stx)?; } } Ok(()) @@ -2168,18 +2217,25 @@ impl ShieldedContext { last_indexed_tx.map_or_else(|| 1, |last| last.height.0); let first_idx_to_query = last_indexed_tx.map_or_else(|| 0, |last| last.index.0 + 1); - let heights = ProgressLogging::new(first_height_to_query..=last_query_height.0, io, ProgressType::Fetch); + let heights = ProgressLogging::new( + first_height_to_query..=last_query_height.0, + io, + ProgressType::Fetch, + ); for height in heights { if self.interrupted() { - return Ok(shielded_txs) + return Ok(shielded_txs); } // Get the valid masp transactions at the specified height let epoch = query_epoch_at_height(client, height.into()) .await? - .ok_or_else(|| Error::from(QueryError::General( - "Queried height is greater than the last committed \ - block height".to_string(), - )))?; + .ok_or_else(|| { + Error::from(QueryError::General( + "Queried height is greater than the last committed \ + block height" + .to_string(), + )) + })?; let first_index_to_query = if height == first_height_to_query { Some(TxIndex(first_idx_to_query)) @@ -2191,7 +2247,8 @@ impl ShieldedContext { client, height.into(), first_index_to_query, - ).await? + ) + .await? { Some(events) => events, None => continue, @@ -2202,7 +2259,8 @@ impl ShieldedContext { // reduce the amount of data sent over the network, but this is a // minimal improvement and it's even hard to tell how many times // we'd need a single masp tx to make this worth it - let block = client.block(height as u32) + let block = client + .block(height as u32) .await .map_err(|e| Error::from(QueryError::General(e.to_string())))? .block @@ -2210,8 +2268,16 @@ impl ShieldedContext { self.last_fetched = txs_results .last() - .map(|(idx, _)| IndexedTx{height: height.into(), index: *idx}) - .or_else(|| Some(IndexedTx{height: height.into(), index: Default::default()})); + .map(|(idx, _)| IndexedTx { + height: height.into(), + index: *idx, + }) + .or_else(|| { + Some(IndexedTx { + height: height.into(), + index: Default::default(), + }) + }); for (idx, tx_event) in txs_results { let tx = Tx::try_from(block[idx.0 as usize].as_ref()) .map_err(|e| Error::Other(e.to_string()))?; @@ -2306,17 +2372,17 @@ impl ShieldedContext { // Note the account changes let balance = transaction_delta .entry(*vk) - .or_insert_with(MaspAmount::default); + .or_insert_with(I128Sum::zero); *balance += I128Sum::from_nonnegative( note.asset_type, note.value as i128, ) - .map_err(|()| { - Error::Other( - "found note with invalid value or asset type" - .to_string(), - ) - })?; + .map_err(|()| { + Error::Other( + "found note with invalid value or asset type" + .to_string(), + ) + })?; self.vk_map.insert(note_pos, *vk); break; } @@ -2335,18 +2401,18 @@ impl ShieldedContext { // Note the account changes let balance = transaction_delta .entry(self.vk_map[note_pos]) - .or_insert_with(MaspAmount::default); + .or_insert_with(I128Sum::zero); let note = self.note_map[note_pos]; *balance -= I128Sum::from_nonnegative( note.asset_type, note.value as i128, ) - .map_err(|()| { - Error::Other( - "found note with invalid value or asset type" - .to_string(), - ) - })?; + .map_err(|()| { + Error::Other( + "found note with invalid value or asset type" + .to_string(), + ) + })?; } } // Record the changes to the transparent accounts @@ -2731,10 +2797,7 @@ pub mod fs { // Atomically update the old shielded context file with new data. // Atomicity is required to prevent other client instances from // reading corrupt data. - std::fs::rename( - tmp_path, - self.context_dir.join(FILE_NAME), - ) + std::fs::rename(tmp_path, self.context_dir.join(FILE_NAME)) } } } @@ -2743,8 +2806,8 @@ pub mod fs { struct ShieldedSync where F: ShieldedUtils, - A: Future, Error>>, - B: Future>, + A: Future, Error>>, + B: Future>, { sync: Pin>, signal: Pin>, @@ -2754,16 +2817,21 @@ where impl Future for ShieldedSync where F: ShieldedUtils, - A: Future, Error>>, - B: Future>, + A: Future, Error>>, + B: Future>, { type Output = Result, Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { if let Poll::Ready(res) = self.sync.as_mut().poll(cx) { Poll::Ready(res) } else { - if !(*self.watch.borrow()) && self.signal.as_mut().poll(cx).is_ready() { + if !(*self.watch.borrow()) + && self.signal.as_mut().poll(cx).is_ready() + { self.watch.send(true).unwrap(); } Poll::Pending @@ -2771,7 +2839,6 @@ where } } - #[derive(Debug, Copy, Clone)] enum ProgressType { Fetch, @@ -2788,8 +2855,8 @@ struct ProgressLogging<'io, T, U: Io> { impl<'io, T, U: Io> ProgressLogging<'io, T, U> { fn new(items: I, io: &'io U, r#type: ProgressType) -> Self - where - I: IntoIterator + where + I: IntoIterator, { let items: Vec<_> = items.into_iter().collect(); Self { @@ -2811,8 +2878,7 @@ impl<'io, T, U: Io> ProgressLogging<'io, T, U> { } } -impl<'io, T: Debug, U: Io> Iterator for ProgressLogging<'io, T, U> -{ +impl<'io, T: Debug, U: Io> Iterator for ProgressLogging<'io, T, U> { type Item = T; fn next(&mut self) -> Option { @@ -2832,8 +2898,18 @@ impl<'io, T: Debug, U: Io> Iterator for ProgressLogging<'io, T, U> let incomplete: String = vec!['.'; 100 - percent].iter().collect(); display_line!(self.io, "\x1b[2A\x1b[J"); match self.r#type { - ProgressType::Fetch => display_line!(self.io, "Fetched block {:?} of {:?}", self.items.last().unwrap(), self.items[0]), - ProgressType::Scan => display_line!(self.io, "Scanning {} of {}", self.index, self.length), + ProgressType::Fetch => display_line!( + self.io, + "Fetched block {:?} of {:?}", + self.items.last().unwrap(), + self.items[0] + ), + ProgressType::Scan => display_line!( + self.io, + "Scanning {} of {}", + self.index, + self.length + ), } display!(self.io, "[{}{}] ~~ {} %", completed, incomplete, percent); self.io.flush(); @@ -2845,10 +2921,12 @@ struct ProgressLoggingIterator<'iter, 'io, T, U: Io> { items: &'iter [T], index: usize, io: &'io U, - r#type: ProgressType + r#type: ProgressType, } -impl<'iter, 'io, T: Debug, U: Io> Iterator for ProgressLoggingIterator<'iter, 'io, T, U> { +impl<'iter, 'io, T: Debug, U: Io> Iterator + for ProgressLoggingIterator<'iter, 'io, T, U> +{ type Item = &'iter T; fn next(&mut self) -> Option { @@ -2861,13 +2939,21 @@ impl<'iter, 'io, T: Debug, U: Io> Iterator for ProgressLoggingIterator<'iter, 'i let incomplete: String = vec!['.'; 100 - percent].iter().collect(); display_line!(self.io, "\x1b[2A\x1b[J"); match self.r#type { - ProgressType::Fetch => display_line!(self.io, "Fetched block {:?} of {:?}", self.items.last().unwrap(), self.items[0]), - ProgressType::Scan => display_line!(self.io, "Scanning {} of {}", self.index, self.items.len()), + ProgressType::Fetch => display_line!( + self.io, + "Fetched block {:?} of {:?}", + self.items.last().unwrap(), + self.items[0] + ), + ProgressType::Scan => display_line!( + self.io, + "Scanning {} of {}", + self.index, + self.items.len() + ), } display!(self.io, "[{}{}] ~~ {} %", completed, incomplete, percent); self.io.flush(); self.items.get(self.index - 1) } } - - diff --git a/crates/tests/src/integration/masp.rs b/crates/tests/src/integration/masp.rs index 29d7bba6d3..4353478ea2 100644 --- a/crates/tests/src/integration/masp.rs +++ b/crates/tests/src/integration/masp.rs @@ -110,11 +110,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -184,11 +180,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -278,11 +270,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -331,11 +319,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -426,15 +410,10 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); - // Assert ETH balance at VK(B) is 0 let captured = CapturedOutput::of(|| { run( @@ -459,11 +438,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -492,11 +467,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -549,11 +520,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -621,11 +588,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -698,11 +661,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); // Send all NAM rewards from SK(B) to Christel @@ -733,11 +692,7 @@ fn masp_incentives() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); // Send all NAM rewards from SK(A) to Bertha @@ -781,16 +736,11 @@ fn masp_incentives() -> Result<()> { assert!(captured.result.is_ok()); assert!(captured.contains("No shielded nam balance found")); - // sync the shielded context run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ], + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); // Assert NAM balance at VK(B) is 0 @@ -861,7 +811,7 @@ fn masp_pinned_txs() -> Result<()> { AC_VIEWING_KEY, "--node", validator_one_rpc, - ] + ], )?; node.assert_success(); @@ -938,11 +888,7 @@ fn masp_pinned_txs() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ] + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -993,11 +939,7 @@ fn masp_pinned_txs() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ] + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -1067,7 +1009,7 @@ fn masp_txs_and_queries() -> Result<()> { AB_VIEWING_KEY, "--node", validator_one_rpc, - ] + ], )?; node.assert_success(); @@ -1285,11 +1227,7 @@ fn masp_txs_and_queries() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ] + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); // there is no need to dry run balance queries @@ -1423,7 +1361,7 @@ fn wrapper_fee_unshielding() -> Result<()> { AB_VIEWING_KEY, "--node", validator_one_rpc, - ] + ], )?; node.assert_success(); @@ -1455,11 +1393,7 @@ fn wrapper_fee_unshielding() -> Result<()> { run( &node, Bin::Client, - vec![ - "shielded-sync", - "--node", - validator_one_rpc, - ] + vec!["shielded-sync", "--node", validator_one_rpc], )?; node.assert_success(); @@ -1538,7 +1472,7 @@ fn cross_epoch_tx() -> Result<()> { AA_VIEWING_KEY, "--node", validator_one_rpc, - ] + ], )?; node.assert_success(); From 44179532799fd88eec4bc694105887d460ef9257 Mon Sep 17 00:00:00 2001 From: satan Date: Tue, 23 Jan 2024 10:17:40 +0100 Subject: [PATCH 5/7] Fixing tests and checks --- crates/apps/src/lib/bench_utils.rs | 5 +---- crates/sdk/src/masp.rs | 1 + crates/tests/src/e2e/ibc_tests.rs | 4 ++-- crates/tests/src/e2e/ledger_tests.rs | 7 +++++++ crates/tests/src/e2e/setup.rs | 2 +- crates/tests/src/integration/masp.rs | 8 ++++++++ 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/crates/apps/src/lib/bench_utils.rs b/crates/apps/src/lib/bench_utils.rs index dfcfab631c..f2f7171846 100644 --- a/crates/apps/src/lib/bench_utils.rs +++ b/crates/apps/src/lib/bench_utils.rs @@ -685,12 +685,9 @@ impl ShieldedUtils for BenchShieldedUtils { // Atomicity is required to prevent other client instances from reading // corrupt data. std::fs::rename( - tmp_path.clone(), + tmp_path, self.context_dir.0.path().to_path_buf().join(FILE_NAME), )?; - // Finally, remove our temporary file to allow future saving of shielded - // contexts. - std::fs::remove_file(tmp_path)?; Ok(()) } } diff --git a/crates/sdk/src/masp.rs b/crates/sdk/src/masp.rs index 2817bad460..7c4e45be40 100644 --- a/crates/sdk/src/masp.rs +++ b/crates/sdk/src/masp.rs @@ -1965,6 +1965,7 @@ impl ShieldedContext { } } + #[cfg(not(target_arch = "wasm32"))] pub async fn syncing( self, client: &C, diff --git a/crates/tests/src/e2e/ibc_tests.rs b/crates/tests/src/e2e/ibc_tests.rs index 9ee3922e26..39f54e9daa 100644 --- a/crates/tests/src/e2e/ibc_tests.rs +++ b/crates/tests/src/e2e/ibc_tests.rs @@ -187,7 +187,7 @@ fn run_ledger_ibc() -> Result<()> { } #[test] -fn run_ledger_ibc_with_hermes() -> Result<()> { +fn drun_ledger_ibc_with_hermes() -> Result<()> { let update_genesis = |mut genesis: templates::All, base_dir: &_| { genesis.parameters.parameters.epochs_per_year = 31536; @@ -1913,7 +1913,7 @@ fn check_shielded_balances( "--node", &rpc_b, ]; - let mut client = run!(test_a, Bin::Client, tx_args, Some(120))?; + let mut client = run!(test_b, Bin::Client, tx_args, Some(120))?; client.assert_success(); // PA(B) on Chain B has received BTC on chain A let token_addr = find_address(test_a, BTC)?.to_string(); diff --git a/crates/tests/src/e2e/ledger_tests.rs b/crates/tests/src/e2e/ledger_tests.rs index e3b8dee732..6a74a31fb7 100644 --- a/crates/tests/src/e2e/ledger_tests.rs +++ b/crates/tests/src/e2e/ledger_tests.rs @@ -766,6 +766,13 @@ fn wrapper_disposable_signer() -> Result<()> { client.exp_string(TX_ACCEPTED)?; client.exp_string(TX_APPLIED_SUCCESS)?; let _ep1 = epoch_sleep(&test, &validator_one_rpc, 720)?; + let tx_args = vec![ + "shielded-sync", + "--node", + &validator_one_rpc, + ]; + let mut client = run!(test, Bin::Client, tx_args, Some(120))?; + client.assert_success(); let tx_args = vec![ "transfer", "--source", diff --git a/crates/tests/src/e2e/setup.rs b/crates/tests/src/e2e/setup.rs index 8720c2af65..451453372b 100644 --- a/crates/tests/src/e2e/setup.rs +++ b/crates/tests/src/e2e/setup.rs @@ -287,7 +287,7 @@ where let mut sign_pre_genesis_txs = run_cmd( Bin::Client, args, - Some(5), + Some(10), &working_dir(), base_dir, format!("{}:{}", std::file!(), std::line!()), diff --git a/crates/tests/src/integration/masp.rs b/crates/tests/src/integration/masp.rs index 4353478ea2..34238f137c 100644 --- a/crates/tests/src/integration/masp.rs +++ b/crates/tests/src/integration/masp.rs @@ -717,6 +717,14 @@ fn masp_incentives() -> Result<()> { )?; node.assert_success(); + // sync the shielded context + run( + &node, + Bin::Client, + vec!["shielded-sync", "--node", validator_one_rpc], + )?; + node.assert_success(); + // Assert NAM balance at VK(A) is 0 let captured = CapturedOutput::of(|| { run( From 80978b81e9645a0da7ca33a65baeff319ed81e97 Mon Sep 17 00:00:00 2001 From: satan Date: Tue, 23 Jan 2024 10:56:00 +0100 Subject: [PATCH 6/7] formatting --- crates/tests/src/e2e/ledger_tests.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/tests/src/e2e/ledger_tests.rs b/crates/tests/src/e2e/ledger_tests.rs index 6a74a31fb7..06576511d9 100644 --- a/crates/tests/src/e2e/ledger_tests.rs +++ b/crates/tests/src/e2e/ledger_tests.rs @@ -766,11 +766,7 @@ fn wrapper_disposable_signer() -> Result<()> { client.exp_string(TX_ACCEPTED)?; client.exp_string(TX_APPLIED_SUCCESS)?; let _ep1 = epoch_sleep(&test, &validator_one_rpc, 720)?; - let tx_args = vec![ - "shielded-sync", - "--node", - &validator_one_rpc, - ]; + let tx_args = vec!["shielded-sync", "--node", &validator_one_rpc]; let mut client = run!(test, Bin::Client, tx_args, Some(120))?; client.assert_success(); let tx_args = vec![ From dfa371572a6e13779b6d76adde635d07d6cece81 Mon Sep 17 00:00:00 2001 From: satan Date: Tue, 23 Jan 2024 11:30:07 +0100 Subject: [PATCH 7/7] Fixed ibc e2e test --- crates/tests/src/e2e/ibc_tests.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/tests/src/e2e/ibc_tests.rs b/crates/tests/src/e2e/ibc_tests.rs index 39f54e9daa..c0f6fd4864 100644 --- a/crates/tests/src/e2e/ibc_tests.rs +++ b/crates/tests/src/e2e/ibc_tests.rs @@ -1905,6 +1905,9 @@ fn check_shielded_balances( ) -> Result<()> { // Check the balance on Chain B std::env::set_var(ENV_VAR_CHAIN_ID, test_a.net.chain_id.to_string()); + // PA(B) on Chain B has received BTC on chain A + let token_addr = find_address(test_a, BTC)?.to_string(); + std::env::set_var(ENV_VAR_CHAIN_ID, test_b.net.chain_id.to_string()); let rpc_b = get_actor_rpc(test_b, Who::Validator(0)); let tx_args = vec![ "shielded-sync", @@ -1915,10 +1918,6 @@ fn check_shielded_balances( ]; let mut client = run!(test_b, Bin::Client, tx_args, Some(120))?; client.assert_success(); - // PA(B) on Chain B has received BTC on chain A - let token_addr = find_address(test_a, BTC)?.to_string(); - std::env::set_var(ENV_VAR_CHAIN_ID, test_b.net.chain_id.to_string()); - let rpc_b = get_actor_rpc(test_b, Who::Validator(0)); let ibc_denom = format!("{dest_port_id}/{dest_channel_id}/btc"); let query_args = vec![ "balance",