diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml index 7454c26cc0..37d13d09af 100644 --- a/.github/workflows/smoke.yml +++ b/.github/workflows/smoke.yml @@ -5,12 +5,13 @@ on: paths-ignore: - 'docs/**' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: smoke_test: runs-on: buildjet-16vcpu-ubuntu-2204 - concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true environment: smoke-test steps: - uses: actions/checkout@v4 @@ -39,3 +40,30 @@ jobs: - name: Display smoke-test logs if: always() run: cat deployments/logs/smoke-*.log + + pmonitor-integration: + runs-on: buildjet-16vcpu-ubuntu-2204 + steps: + - uses: actions/checkout@v4 + with: + lfs: true + + - name: install nix + uses: nixbuild/nix-quick-install-action@v28 + + - name: setup nix cache + uses: nix-community/cache-nix-action@v5 + with: + primary-key: nix-${{ runner.os }}-${{ hashFiles('**/*.nix') }} + restore-prefixes-first-match: nix-${{ runner.os }}- + backend: buildjet + + - name: Load rust cache + uses: astriaorg/buildjet-rust-cache@v2.5.1 + + # Confirm that the nix devshell is buildable and runs at all. + - name: validate nix env + run: nix develop --command echo hello + + - name: run the pmonitor integration tests + run: nix develop --command just test-pmonitor diff --git a/Cargo.lock b/Cargo.lock index 4681e50097..965eea0d35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5904,6 +5904,43 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "pmonitor" +version = "0.80.6" +dependencies = [ + "anyhow", + "assert_cmd", + "camino", + "clap", + "colored", + "directories", + "futures", + "indicatif", + "once_cell", + "pcli", + "penumbra-app", + "penumbra-asset", + "penumbra-compact-block", + "penumbra-keys", + "penumbra-num", + "penumbra-proto", + "penumbra-shielded-pool", + "penumbra-stake", + "penumbra-tct", + "penumbra-view", + "regex", + "serde", + "serde_json", + "tempfile", + "tokio", + "toml 0.7.8", + "tonic", + "tracing", + "tracing-subscriber 0.3.18", + "url", + "uuid", +] + [[package]] name = "polling" version = "2.8.0" @@ -8588,6 +8625,7 @@ checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" dependencies = [ "getrandom", "rand", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7ae2d4d471..3d2de2f1bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "crates/bin/pclientd", "crates/bin/pd", "crates/bin/pindexer", + "crates/bin/pmonitor", "crates/cnidarium", "crates/cnidarium-component", "crates/core/app", diff --git a/crates/bin/pcli/src/opt.rs b/crates/bin/pcli/src/opt.rs index ca4f644f01..bfa9a6cc1c 100644 --- a/crates/bin/pcli/src/opt.rs +++ b/crates/bin/pcli/src/opt.rs @@ -156,7 +156,7 @@ impl Opt { tracing::info!(%path, "using local view service"); let registry_path = self.home.join("registry.json"); - // Check if the path exists or set it to nojne + // Check if the path exists or set it to none let registry_path = if registry_path.exists() { Some(registry_path) } else { diff --git a/crates/bin/pmonitor/Cargo.toml b/crates/bin/pmonitor/Cargo.toml new file mode 100644 index 0000000000..71612426ba --- /dev/null +++ b/crates/bin/pmonitor/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "pmonitor" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = {workspace = true} +camino = {workspace = true} +clap = {workspace = true, features = ["derive", "env"]} +colored = "2.1.0" +directories = {workspace = true} +futures = {workspace = true} +indicatif = {workspace = true} +pcli = {path = "../pcli", default-features = true} +penumbra-app = {workspace = true} +penumbra-asset = {workspace = true, default-features = false} +penumbra-compact-block = {workspace = true, default-features = false} +penumbra-keys = {workspace = true, default-features = false} +penumbra-num = {workspace = true, default-features = false} +penumbra-proto = {workspace = true} +penumbra-shielded-pool = {workspace = true, default-features = false} +penumbra-stake = {workspace = true, default-features = false} +penumbra-tct = {workspace = true, default-features = false} +penumbra-view = {workspace = true} +regex = {workspace = true} +serde = {workspace = true, features = ["derive"]} +serde_json = {workspace = true} +tokio = {workspace = true, features = ["full"]} +toml = {workspace = true} +tonic = {workspace = true, features = ["tls-webpki-roots", "tls"]} +tracing = {workspace = true} +tracing-subscriber = { workspace = true, features = ["env-filter", "ansi"] } +url = {workspace = true, features = ["serde"]} +uuid = { version = "1.3", features = ["v4", "serde"] } + +[dev-dependencies] +assert_cmd = {workspace = true} +once_cell = {workspace = true} +tempfile = {workspace = true} diff --git a/crates/bin/pmonitor/src/config.rs b/crates/bin/pmonitor/src/config.rs new file mode 100644 index 0000000000..8b521ff8a1 --- /dev/null +++ b/crates/bin/pmonitor/src/config.rs @@ -0,0 +1,118 @@ +//! Logic for reading and writing config files for `pmonitor`, in the TOML format. +use anyhow::Result; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use url::Url; +use uuid::Uuid; + +use penumbra_keys::FullViewingKey; +use penumbra_num::Amount; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct FvkEntry { + pub fvk: FullViewingKey, + pub wallet_id: Uuid, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +/// Representation of a single Penumbra wallet to track. +pub struct AccountConfig { + /// The initial [FullViewingKey] has specified during `pmonitor init`. + /// + /// Distinct because the tool understands account migrations. + original: FvkEntry, + /// The amount held by the account at the time of genesis. + genesis_balance: Amount, + /// List of account migrations, performed via `pcli migrate balance`, if any. + migrations: Vec, +} + +impl AccountConfig { + pub fn new(original: FvkEntry, genesis_balance: Amount) -> Self { + Self { + original, + genesis_balance, + migrations: vec![], + } + } + + /// Get original/genesis FVK. + pub fn original_fvk(&self) -> FullViewingKey { + self.original.fvk.clone() + } + + /// Get genesis balance. + pub fn genesis_balance(&self) -> Amount { + self.genesis_balance + } + + /// Add migration to the account config. + pub fn add_migration(&mut self, fvk_entry: FvkEntry) { + self.migrations.push(fvk_entry); + } + + /// Get the active wallet, which is the last migration or the original FVK if no migrations have occurred. + pub fn active_wallet(&self) -> FvkEntry { + if self.migrations.is_empty() { + self.original.clone() + } else { + self.migrations + .last() + .expect("migrations must not be empty") + .clone() + } + } + + pub fn active_fvk(&self) -> FullViewingKey { + self.active_wallet().fvk + } + + pub fn active_uuid(&self) -> Uuid { + self.active_wallet().wallet_id + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +/// The primary TOML file for configuring `pmonitor`, containing all its account info. +/// +/// During `pmonitor audit` runs, the config will be automatically updated +/// if tracked FVKs were detected to migrate, via `pcli migrate balance`, to save time +/// on future syncs. +pub struct PmonitorConfig { + /// The gRPC URL for a Penumbra node's `pd` endpoint, used for retrieving account activity. + grpc_url: Url, + /// The list of Penumbra wallets to track. + accounts: Vec, +} + +impl PmonitorConfig { + pub fn new(grpc_url: Url, accounts: Vec) -> Self { + Self { grpc_url, accounts } + } + + pub fn grpc_url(&self) -> Url { + self.grpc_url.clone() + } + + pub fn accounts(&self) -> &Vec { + &self.accounts + } + + pub fn set_account(&mut self, index: usize, account: AccountConfig) { + self.accounts[index] = account; + } +} + +/// Get the destination FVK from a migration memo. +pub fn parse_dest_fvk_from_memo(memo: &str) -> Result { + let re = Regex::new(r"Migrating balance from .+ to (.+)")?; + if let Some(captures) = re.captures(memo) { + if let Some(dest_fvk_str) = captures.get(1) { + return dest_fvk_str + .as_str() + .parse::() + .map_err(|_| anyhow::anyhow!("Invalid destination FVK in memo")); + } + } + Err(anyhow::anyhow!("Could not parse destination FVK from memo")) +} diff --git a/crates/bin/pmonitor/src/genesis.rs b/crates/bin/pmonitor/src/genesis.rs new file mode 100644 index 0000000000..a526e2899d --- /dev/null +++ b/crates/bin/pmonitor/src/genesis.rs @@ -0,0 +1,117 @@ +//! Logic for inspecting the [CompactBlock] at genesis of the target chain. +//! Used to compute balances for tracked FVKs at genesis time. The initial genesis balance is +//! stored in the `pmonitor` config file, so that audit actions can reference it. +use std::{collections::BTreeMap, str::FromStr}; + +use penumbra_asset::STAKING_TOKEN_ASSET_ID; +use penumbra_compact_block::{CompactBlock, StatePayload}; +use penumbra_keys::FullViewingKey; +use penumbra_num::Amount; +use penumbra_shielded_pool::{Note, NotePayload}; +use penumbra_stake::{ + rate::{BaseRateData, RateData}, + DelegationToken, +}; +use penumbra_tct::StateCommitment; + +#[derive(Debug, Clone)] +pub struct FilteredGenesisBlock { + // Notes per FVK + #[allow(dead_code)] + pub notes: BTreeMap>, + // UM-equivalent balances per FVK + pub balances: BTreeMap, +} + +/// Scanning of the genesis `CompactBlock` with a list of FVKs to determine the +/// initial balances of the relevant addresses. +/// +/// Assumption: There are no swaps or nullifiers in the genesis block. +pub async fn scan_genesis_block( + CompactBlock { + height, + state_payloads, + .. + }: CompactBlock, + fvks: Vec, +) -> anyhow::Result { + assert_eq!(height, 0); + + let mut notes = BTreeMap::new(); + let mut balances = BTreeMap::new(); + + // Calculate the rate data for each validator in the initial validator set. + let base_rate = BaseRateData { + epoch_index: 0, + base_reward_rate: 0u128.into(), + base_exchange_rate: 1_0000_0000u128.into(), + }; + + // We proceed one FVK at a time. + for fvk in fvks { + // Trial-decrypt a note with our a specific viewing key + let trial_decrypt_note = + |note_payload: NotePayload| -> tokio::task::JoinHandle> { + let fvk2 = fvk.clone(); + tokio::spawn(async move { note_payload.trial_decrypt(&fvk2) }) + }; + + // Trial-decrypt the notes in this block, keeping track of the ones that were meant for the FVK + // we're monitoring. + let mut note_decryptions = Vec::new(); + + // We only care about notes, so we're ignoring swaps and rolled-up commitments. + for payload in state_payloads.iter() { + if let StatePayload::Note { note, .. } = payload { + note_decryptions.push(trial_decrypt_note((**note).clone())); + } + } + + let mut notes_for_this_fvk = BTreeMap::new(); + for decryption in note_decryptions { + if let Some(note) = decryption + .await + .expect("able to join tokio note decryption handle") + { + notes_for_this_fvk.insert(note.commit(), note.clone()); + + // Balance is expected to be in the staking or delegation token + let note_value = note.value(); + if note_value.asset_id == *STAKING_TOKEN_ASSET_ID { + balances + .entry(fvk.to_string()) + .and_modify(|existing_amount| *existing_amount += note.amount()) + .or_insert(note.amount()); + } else if let Ok(delegation_token) = + DelegationToken::from_str(¬e_value.asset_id.to_string()) + { + // We need to convert the amount to the UM-equivalent amount + let rate_data = RateData { + identity_key: delegation_token.validator(), + validator_reward_rate: 0u128.into(), + validator_exchange_rate: base_rate.base_exchange_rate, + }; + let um_equivalent_balance = rate_data.unbonded_amount(note.amount()); + + balances + .entry(fvk.to_string()) + .and_modify(|existing_amount| *existing_amount += um_equivalent_balance) + .or_insert(um_equivalent_balance); + } else { + tracing::warn!( + "ignoring note with unknown asset id: {}", + note_value.asset_id + ); + } + } + } + + // Save all the notes for this FVK, and continue. + notes.insert(fvk.to_string(), notes_for_this_fvk); + } + + // Construct filtered genesis block with allocations + let result = FilteredGenesisBlock { notes, balances }; + + Ok(result) +} diff --git a/crates/bin/pmonitor/src/main.rs b/crates/bin/pmonitor/src/main.rs new file mode 100644 index 0000000000..372f425adb --- /dev/null +++ b/crates/bin/pmonitor/src/main.rs @@ -0,0 +1,598 @@ +//! The `pmonitor` tool tracks the balances of Penumbra wallets, as identified +//! by a [FullViewingKey] (FVK), in order to perform auditing. It accepts a JSON file +//! of FVKs and a `pd` gRPC URL to initialize: +//! +//! pmonitor init --grpc-url http://127.0.0.1:8080 --fvks fvks.json +//! +//! The audit functionality runs as a single operation, evaluating compliance up to the +//! current block height: +//! +//! pmonitor audit +//! +//! If regular auditing is desired, consider automating the `pmonitor audit` action via +//! cron or similar. `pmonitor` will cache view databases for each tracked FVK, so that future +//! `audit` actions need only inspect the blocks generated between the previous audit and the +//! current height. + +use anyhow::{Context, Result}; +use camino::Utf8PathBuf; +use clap::{self, Parser}; +use directories::ProjectDirs; +use futures::StreamExt; +use penumbra_asset::STAKING_TOKEN_ASSET_ID; +use std::fs; +use std::io::IsTerminal as _; +use std::str::FromStr; +use tonic::transport::{Channel, ClientTlsConfig}; +use tracing_subscriber::{prelude::*, EnvFilter}; +use url::Url; +use uuid::Uuid; + +use colored::Colorize; + +use pcli::config::PcliConfig; +use penumbra_compact_block::CompactBlock; +use penumbra_keys::FullViewingKey; +use penumbra_num::Amount; +use penumbra_proto::box_grpc_svc; +use penumbra_proto::view::v1::{ + view_service_client::ViewServiceClient, view_service_server::ViewServiceServer, +}; +use penumbra_proto::{ + core::component::compact_block::v1::CompactBlockRequest, + core::component::stake::v1::query_service_client::QueryServiceClient as StakeQueryServiceClient, + penumbra::core::component::compact_block::v1::query_service_client::QueryServiceClient as CompactBlockQueryServiceClient, +}; +use penumbra_stake::rate::RateData; +use penumbra_stake::DelegationToken; +use penumbra_view::{Storage, ViewClient, ViewServer}; + +mod config; +mod genesis; + +use config::{parse_dest_fvk_from_memo, AccountConfig, FvkEntry, PmonitorConfig}; + +/// The maximum size of a compact block, in bytes (12MB). +const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024; + +/// The name of the view database file +const VIEW_FILE_NAME: &str = "pcli-view.sqlite"; + +/// The permitted difference between genesis balance and current balance, +/// specified in number of staking tokens. +const ALLOWED_DISCREPANCY: f64 = 0.1; + +/// Configure tracing_subscriber for logging messages +fn init_tracing() -> anyhow::Result<()> { + // Instantiate tracing layers. + // The `FmtLayer` is used to print to the console. + let fmt_layer = tracing_subscriber::fmt::layer() + .with_ansi(std::io::stdout().is_terminal()) + .with_writer(std::io::stderr) + .with_target(true); + // The `EnvFilter` layer is used to filter events based on `RUST_LOG`. + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info,penumbra_view=off"))?; + + // Register the tracing subscribers. + let registry = tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer); + registry.init(); + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt = Opt::parse(); + init_tracing()?; + tracing::info!(?opt, version = env!("CARGO_PKG_VERSION"), "running command"); + opt.exec().await +} + +/// The path to the default `pmonitor` home directory. +/// +/// Can be overridden on the command-line via `--home`. +pub fn default_home() -> Utf8PathBuf { + let path = ProjectDirs::from("zone", "penumbra", "pmonitor") + .expect("Failed to get platform data dir") + .data_dir() + .to_path_buf(); + Utf8PathBuf::from_path_buf(path).expect("Platform default data dir was not UTF-8") +} + +#[derive(Debug, Parser)] +#[clap( + name = "pmonitor", + about = "The Penumbra account activity monitor.", + version +)] +pub struct Opt { + /// Command to run. + #[clap(subcommand)] + pub cmd: Command, + /// The path used to store pmonitor state. + #[clap(long, default_value_t = default_home(), env = "PENUMBRA_PMONITOR_HOME")] + pub home: Utf8PathBuf, +} + +#[derive(Debug, clap::Subcommand)] +pub enum Command { + /// Generate configs for `pmonitor`. + Init { + /// Provide JSON file with the list of full viewing keys to monitor. + #[clap(long, display_order = 200)] + fvks: String, + /// Sets the URL of the gRPC endpoint used to sync the wallets. + #[clap( + long, + display_order = 900, + parse(try_from_str = Url::parse) + )] + grpc_url: Url, + }, + /// Sync to latest block height and verify all configured wallets have the correct balance. + Audit {}, + /// Delete `pmonitor` storage to reset local state. + Reset {}, +} + +impl Opt { + /// Set up the view service for a given wallet. + pub async fn view( + &self, + path: Utf8PathBuf, + fvk: FullViewingKey, + grpc_url: Url, + ) -> Result> { + let registry_path = path.join("registry.json"); + // Check if the path exists or set it to none + let registry_path = if registry_path.exists() { + Some(registry_path) + } else { + None + }; + let db_path: Utf8PathBuf = path.join(VIEW_FILE_NAME); + + let svc: ViewServer = + ViewServer::load_or_initialize(Some(db_path), registry_path, &fvk, grpc_url).await?; + + let svc: ViewServiceServer = ViewServiceServer::new(svc); + let view_service = ViewServiceClient::new(box_grpc_svc::local(svc)); + Ok(view_service) + } + + /// Get the path to the wallet directory for a given wallet ID. + pub fn wallet_path(&self, wallet_id: &Uuid) -> Utf8PathBuf { + self.home.join(format!("wallet_{}", wallet_id)) + } + + /// Sync a given wallet to the latest block height. + pub async fn sync( + &self, + view_service: &mut ViewServiceClient, + ) -> Result<()> { + let mut status_stream = ViewClient::status_stream(view_service).await?; + + let initial_status = status_stream + .next() + .await + .transpose()? + .ok_or_else(|| anyhow::anyhow!("view service did not report sync status"))?; + + tracing::debug!( + "scanning blocks from last sync height {} to latest height {}", + initial_status.full_sync_height, + initial_status.latest_known_block_height, + ); + + // use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; + // let progress_bar = ProgressBar::with_draw_target( + // initial_status.latest_known_block_height - initial_status.full_sync_height, + // ProgressDrawTarget::stdout(), + // ) + // .with_style( + // ProgressStyle::default_bar() + // .template("[{elapsed}] {bar:50.cyan/blue} {pos:>7}/{len:7} {per_sec} ETA: {eta}"), + // ); + // progress_bar.set_position(0); + + // On large networks, logging an update every 100k blocks or so seems reasonable. + // let log_every_n_blocks = 100000; + let log_every_n_blocks = 100; + while let Some(status) = status_stream.next().await.transpose()? { + if status.full_sync_height % log_every_n_blocks == 0 { + tracing::debug!("synced {} blocks", status.full_sync_height); + } + // progress_bar.set_position(status.full_sync_height - initial_status.full_sync_height); + } + // progress_bar.finish(); + + Ok(()) + } + + /// Fetch the genesis compact block + pub async fn fetch_genesis_compact_block(&self, grpc_url: Url) -> Result { + let height = 0; + let mut client = CompactBlockQueryServiceClient::connect(grpc_url.to_string()) + .await? + .max_decoding_message_size(MAX_CB_SIZE_BYTES); + let compact_block = client + .compact_block(CompactBlockRequest { height }) + .await? + .into_inner() + .compact_block + .expect("response has compact block"); + compact_block.try_into() + } + + /// Stolen from pcli + pub async fn pd_channel(&self, grpc_url: Url) -> anyhow::Result { + match grpc_url.scheme() { + "http" => Ok(Channel::from_shared(grpc_url.to_string())? + .connect() + .await?), + "https" => Ok(Channel::from_shared(grpc_url.to_string())? + .tls_config(ClientTlsConfig::new())? + .connect() + .await?), + other => Err(anyhow::anyhow!("unknown url scheme {other}")) + .with_context(|| format!("could not connect to {}", grpc_url)), + } + } + + /// Create wallet given a path and fvk + pub async fn create_wallet( + &self, + wallet_dir: &Utf8PathBuf, + fvk: &FullViewingKey, + grpc_url: &Url, + ) -> Result<()> { + // Create the wallet directory if it doesn't exist + if !wallet_dir.exists() { + fs::create_dir_all(&wallet_dir)?; + } + + // Use FVK to build a pcli config file, + // which we'll reference when syncing wallets. + let pcli_config = PcliConfig { + grpc_url: grpc_url.clone(), + view_url: None, + governance_custody: None, + full_viewing_key: fvk.clone(), + disable_warning: true, + custody: pcli::config::CustodyConfig::ViewOnly, + }; + + let pcli_config_path = wallet_dir.join("config.toml"); + pcli_config.save(pcli_config_path).with_context(|| { + format!("failed to initialize wallet in {}", wallet_dir.to_string()) + })?; + + Ok(()) + } + + /// Compute the UM-equivalent balance for a given (synced) wallet. + pub async fn compute_um_equivalent_balance( + &self, + view_client: &mut ViewServiceClient, + stake_client: &mut StakeQueryServiceClient, + ) -> Result { + let notes = view_client.unspent_notes_by_asset_and_address().await?; + let mut total_um_equivalent_amount = Amount::from(0u64); + for (asset_id, map) in notes.iter() { + if *asset_id == *STAKING_TOKEN_ASSET_ID { + let total_amount = map + .iter() + .map(|(_, spendable_notes)| { + spendable_notes + .iter() + .map(|spendable_note| spendable_note.note.amount()) + .sum::() + }) + .sum::(); + total_um_equivalent_amount += total_amount; + } else if let Ok(delegation_token) = DelegationToken::from_str(&asset_id.to_string()) { + let total_amount = map + .iter() + .map(|(_, spendable_notes)| { + spendable_notes + .iter() + .map(|spendable_note| spendable_note.note.amount()) + .sum::() + }) + .sum::(); + + // We need to convert the amount to the UM-equivalent amount using the appropriate rate data + let rate_data: RateData = stake_client + .current_validator_rate(tonic::Request::new( + (delegation_token.validator()).into(), + )) + .await? + .into_inner() + .try_into()?; + let um_equivalent_balance = rate_data.unbonded_amount(total_amount); + total_um_equivalent_amount += um_equivalent_balance; + }; + } + Ok(total_um_equivalent_amount) + } + + /// Execute the specified command. + pub async fn exec(&self) -> Result<()> { + let opt = self; + match &opt.cmd { + Command::Reset {} => { + // Delete the home directory + fs::remove_dir_all(&opt.home)?; + println!( + "Successfully cleaned up pmonitor directory: \"{}\"", + opt.home + ); + Ok(()) + } + Command::Init { fvks, grpc_url } => { + // Parse the JSON file into a list of full viewing keys + let fvks_str = fs::read_to_string(fvks)?; + + // Take elements from the array and parse them into FullViewingKeys + let fvk_string_list: Vec = serde_json::from_str(&fvks_str)?; + let fvk_list: Vec = fvk_string_list + .iter() + .map(|fvk| FullViewingKey::from_str(&fvk)) + .collect::>>()?; + println!("Successfully read FVKs from provided file"); + + // Create the home directory if it doesn't exist + if !opt.home.exists() { + fs::create_dir_all(&opt.home)?; + } else { + anyhow::bail!("pmonitor home directory already exists: {}", opt.home); + } + + // During init, we also compute and save the genesis balance for each + // FVK, since that won't change in the future. + let genesis_compact_block = + self.fetch_genesis_compact_block(grpc_url.clone()).await?; + println!("About to scan the genesis block... this may take a moment"); + let genesis_filtered_block = + genesis::scan_genesis_block(genesis_compact_block, fvk_list.clone()).await?; + + let mut accounts = Vec::new(); + + // Now we need to make subdirectories for each of the FVKs and setup their + // config files, with the selected FVK and GRPC URL. + for fvk in fvk_list.iter() { + let wallet_id = Uuid::new_v4(); + let wallet_dir = self.wallet_path(&wallet_id); + tracing::debug!("creating wallet at {}", wallet_dir.to_string()); + self.create_wallet(&wallet_dir, &fvk, &grpc_url).await?; + + accounts.push(AccountConfig::new( + FvkEntry { + fvk: fvk.clone(), + wallet_id, + }, + *(genesis_filtered_block + .balances + .get(&fvk.to_string()) + .unwrap_or(&Amount::from(0u64))), + )); + } + + tracing::info!("successfully initialized {} wallets", accounts.len()); + let pmonitor_config = PmonitorConfig::new(grpc_url.clone(), accounts); + + // Save the config + let config_path = opt.home.join("pmonitor_config.toml"); + fs::write(config_path, toml::to_string(&pmonitor_config)?)?; + + Ok(()) + } + Command::Audit {} => { + // Parse the config file to get the accounts to monitor. + // + // Note that each logical genesis entry might now have one or more FVKs, depending on if the + // user migrated their account to a new FVK, i.e. if they migrated once, they'll have two + // FVKs. This can happen an unlimited number of times. + let config_path = opt.home.join("pmonitor_config.toml"); + let pmonitor_config: PmonitorConfig = + toml::from_str(&fs::read_to_string(config_path.clone()).context(format!( + "failed to load pmonitor config file: {}", + config_path + ))?)?; + + let mut stake_client = StakeQueryServiceClient::new( + self.pd_channel(pmonitor_config.grpc_url()).await?, + ); + + // Sync each wallet to the latest block height, check for new migrations, and check the balance. + let mut updated_config = pmonitor_config.clone(); + let mut config_updated = false; + + let num_accounts = pmonitor_config.accounts().len(); + + // Create bucket for documenting non-compliant FVKs, for reporting in summary. + let mut failures: Vec<&AccountConfig> = vec![]; + + for (index, config) in pmonitor_config.accounts().iter().enumerate() { + let active_fvk = config.active_fvk(); + let active_path = self.wallet_path(&config.active_uuid()); + tracing::info!( + "syncing wallet {}/{}: {}", + index + 1, + num_accounts, + active_path.to_string() + ); + + let mut view_client = self + .view( + active_path.clone(), + active_fvk.clone(), + pmonitor_config.grpc_url(), + ) + .await?; + + // todo: do this in parallel? + self.sync(&mut view_client).await?; + tracing::debug!("finished syncing wallet {}/{}", index + 1, num_accounts); + + // Check if the account has been migrated + let storage = Storage::load_or_initialize( + Some(active_path.join(VIEW_FILE_NAME)), + &active_fvk, + pmonitor_config.grpc_url(), + ) + .await?; + + let migration_tx = storage + .transactions_matching_memo(format!( + // N.B. the `%` symbol is an SQLite wildcard, required to match the + // remainder of the memo field. + "Migrating balance from {}%", + active_fvk.to_string() + )) + .await?; + if migration_tx.is_empty() { + tracing::debug!( + "account has not been migrated, continuing using existing FVK..." + ); + } else if migration_tx.len() == 1 { + tracing::warn!( + "❗ account has been migrated to new FVK, continuing using new FVK..." + ); + let (_, _, _tx, memo_text) = &migration_tx[0]; + let new_fvk = parse_dest_fvk_from_memo(&memo_text)?; + let wallet_id = Uuid::new_v4(); + let wallet_dir = self.wallet_path(&wallet_id); + self.create_wallet(&wallet_dir, &new_fvk, &pmonitor_config.grpc_url()) + .await?; + + let new_fvk_entry = FvkEntry { + fvk: new_fvk.clone(), + wallet_id, + }; + // Mark that the config needs to get saved again for the next time we run the audit command. + config_updated = true; + + // We need to update the config with the new FVK and path on disk + // to the wallet for the next time we run the audit command. + let mut new_config_entry = config.clone(); + new_config_entry.add_migration(new_fvk_entry); + updated_config.set_account(index, new_config_entry.clone()); + + view_client = self + .view(wallet_dir, new_fvk.clone(), pmonitor_config.grpc_url()) + .await?; + + tracing::info!("syncing migrated wallet"); + self.sync(&mut view_client).await?; + tracing::info!("finished syncing migrated wallet"); + // Now we can exit the else if statement and continue by computing the balance, + // which will use the new migrated wallet. + } else { + // we expect a single migration tx per FVK, if this assumption is violated we should bail. + anyhow::bail!( + "Expected a single migration tx, found {}", + migration_tx.len() + ); + } + + let current_um_equivalent_amount = self + .compute_um_equivalent_balance(&mut view_client, &mut stake_client) + .await?; + + tracing::debug!("original FVK: {:?}", config.original_fvk()); + + let genesis_um_equivalent_amount = config.genesis_balance(); + // Let the user know if the balance is unexpected or not + if check_wallet_compliance( + genesis_um_equivalent_amount, + current_um_equivalent_amount, + ) { + tracing::info!( + ?genesis_um_equivalent_amount, + ?current_um_equivalent_amount, + "✅ expected balance! current balance is within compliant range of the genesis balance", + ); + } else { + tracing::error!( + ?genesis_um_equivalent_amount, + ?current_um_equivalent_amount, + "❌ unexpected balance! current balance is less than the genesis balance, by more than {ALLOWED_DISCREPANCY}UM", + ); + failures.push(config); + } + } + + // If at any point we marked the config for updating, we need to save it. + if config_updated { + fs::write(config_path.clone(), toml::to_string(&updated_config)?)?; + } + + // Print summary message + emit_summary_message(pmonitor_config.accounts(), failures)?; + + Ok(()) + } + } + } +} + +/// Prepare a human-readable text summary at the end of the audit run. +/// This is important, as errors logged during scanning are likely to be off-screen +/// due to backscroll. +fn emit_summary_message( + all_accounts: &Vec, + failures: Vec<&AccountConfig>, +) -> Result<()> { + println!("#######################"); + println!("Summary of FVK scanning"); + println!("#######################"); + println!("Total number of FVKs scanned: {}", all_accounts.len(),); + let compliant_count = format!( + "Number deemed compliant: {}", + all_accounts.len() - failures.len(), + ); + let failure_count = format!("Number deemed in violation: {}", failures.len(),); + if failures.is_empty() { + println!("{}", compliant_count.green()); + println!("{}", failure_count); + } else { + println!("{}", compliant_count.yellow()); + println!("{}", failure_count.red()); + println!("The non-compliant FVKs are:"); + println!(""); + for f in &failures { + println!("\t* {}", f.active_fvk().to_string()); + } + println!(""); + // println!("{}", "Error: non-compliant balances were detected".red()); + anyhow::bail!("non-compliant balances were detected".red()); + } + Ok(()) +} + +/// Check whether the wallet is compliant. +/// +/// Rather than a naive comparison that the current balance is greater than or +/// equal to the genesis balance, we permit less than within a tolerance of +/// 0.1UM. Doing so allows for discrepancies due to gas fees, for instance +/// if `pcli migrate balance` was used. +fn check_wallet_compliance(genesis_balance: Amount, current_balance: Amount) -> bool { + // Since the `Amount` of the staking token will be in millionths, + // we multiply 0.1 * 1_000_000. + let allowed_discrepancy = ALLOWED_DISCREPANCY * 1_000_000 as f64; + let mut result = false; + if current_balance >= genesis_balance { + result = true; + } else { + let actual_discrepancy = genesis_balance - current_balance; + let discrepancy_formatted = f64::from(actual_discrepancy) / 1_000_000 as f64; + tracing::trace!("detected low balance, missing {}UM", discrepancy_formatted); + if f64::from(actual_discrepancy) <= allowed_discrepancy { + result = true + } + } + result +} diff --git a/crates/bin/pmonitor/tests/common/mod.rs b/crates/bin/pmonitor/tests/common/mod.rs new file mode 100644 index 0000000000..a1465ee712 --- /dev/null +++ b/crates/bin/pmonitor/tests/common/mod.rs @@ -0,0 +1,332 @@ +//! Integration test helpers for `pmonitor`. +//! Contains logic to bootstrap a local devnet, complete with genesis +//! allocations for pre-existing wallets, so that `pmonitor` can audit +//! the behavior of those wallets on the target chain. + +use anyhow::{Context, Result}; +use assert_cmd::Command as AssertCommand; +use once_cell::sync::Lazy; +use pcli::config::PcliConfig; +use penumbra_keys::address::Address; +use std::fs::{create_dir_all, remove_dir_all, File}; +use std::io::{BufWriter, Write}; +use std::path::PathBuf; +use std::process::{Child, Command, Stdio}; +use std::time::Duration; +pub mod pcli_helpers; +use crate::common::pcli_helpers::{pcli_init_softkms, pcli_view_address}; + +/// The TCP port for the process-compose API, used to start/stop devnet. +const PROCESS_COMPOSE_PORT: u16 = 8888; + +/// The path in-repo to the `process-compose` manifest used for running a devnet, +/// relative to the current crate root. This is a minimal manifest, that only runs pd & cometbft. +static PROCESS_COMPOSE_MANIFEST_FILEPATH: Lazy = Lazy::new(|| { + let p: PathBuf = [ + env!("CARGO_MANIFEST_DIR"), + "..", + "..", + "..", + "deployments", + "compose", + "process-compose.yml", + ] + .iter() + .collect(); + p +}); + +/// The path to the root of the git repo, used for setting the working directory +/// when running `process-compose`. +static REPO_ROOT: Lazy = Lazy::new(|| { + let p: PathBuf = [env!("CARGO_MANIFEST_DIR"), "../", "../", "../"] + .iter() + .collect(); + p +}); + +/// Manager for running suites of integration tests for `pmonitor`. +/// Only one instance should exist at a time! The test suites +/// assume access to global resources such as 8080/TCP for pd, +/// and a hardcoded directory in `/tmp/` for the pmonitor configs. +pub struct PmonitorTestRunner { + /// Top-level directory for storing all integration test info, + /// such as wallets and pd network state. + pmonitor_integration_test_dir: PathBuf, + /// How many client wallets to create for testing. + num_wallets: u16, +} + +/// Make sure to halt the running devnet, regardless of test pass/fail. +impl Drop for PmonitorTestRunner { + fn drop(&mut self) { + let _result = self.stop_devnet(); + } +} + +impl PmonitorTestRunner { + /// Create a new test runner environment. + /// Caller must ensure no other instances exist, because this method + /// will destroy existing test data directories. + pub fn new() -> Self { + // Ideally we'd use a tempdir but using a hardcoded dir for debugging. + let p: PathBuf = ["/tmp", "pmonitor-integration-test"].iter().collect(); + // Nuke any pre-existing state + if p.exists() { + remove_dir_all(&p).expect("failed to remove directory for pmonitor integration tests"); + } + // Ensure parent dir exists; other methods will create subdirs as necessary. + create_dir_all(&p).expect("failed to create directory for pmonitor integration tests"); + Self { + pmonitor_integration_test_dir: p, + num_wallets: 10, + } + } + // Return path for pmonitor home directory. + // Does not create the path, because `pmonitor` will fail if its home already exists. + pub fn pmonitor_home(&self) -> PathBuf { + self.pmonitor_integration_test_dir.join("pmonitor") + } + // Create directory and return path for storing client wallets + pub fn wallets_dir(&self) -> Result { + let p = self.pmonitor_integration_test_dir.join("wallets"); + create_dir_all(&p)?; + Ok(p) + } + + /// Initialize local pcli configs for all wallets specified in config. + pub fn create_pcli_wallets(&self) -> anyhow::Result<()> { + for i in 0..self.num_wallets - 1 { + let pcli_home = self.wallets_dir()?.join(format!("wallet-{}", i)); + pcli_init_softkms(&pcli_home)?; + } + Ok(()) + } + + /// Iterate over all client wallets and return a `PcliConfig` for each. + pub fn get_pcli_wallet_configs(&self) -> anyhow::Result> { + let mut results = Vec::::new(); + for i in 0..self.num_wallets - 1 { + let pcli_home = self.wallets_dir()?.join(format!("wallet-{}", i)); + let pcli_config_path = pcli_home.join("config.toml"); + let pcli_config = PcliConfig::load( + pcli_config_path + .to_str() + .expect("failed to convert pcli wallet path to str"), + )?; + results.push(pcli_config); + } + Ok(results) + } + + /// Iterate over all client wallets and return address 0 for each. + pub fn get_pcli_wallet_addresses(&self) -> anyhow::Result> { + let mut results = Vec::
::new(); + for i in 0..self.num_wallets - 1 { + let pcli_home = self.wallets_dir()?.join(format!("wallet-{}", i)); + let penumbra_address = pcli_view_address(&pcli_home)?; + results.push(penumbra_address); + } + Ok(results) + } + /// Iterate over all client wallets, grab an FVK for each, write those + /// FVKs to a local JSON file, and return the path to that file. + pub fn get_pcli_wallet_fvks_filepath(&self) -> anyhow::Result { + let p = self.pmonitor_integration_test_dir.join("fvks.json"); + if !p.exists() { + // We use a Vec rather than Vec so we get the string + // representations + let fvks: Vec = self + .get_pcli_wallet_configs()? + .into_iter() + .map(|c| c.full_viewing_key.to_string()) + .collect(); + let mut w = BufWriter::new(File::create(&p)?); + serde_json::to_writer(&mut w, &fvks)?; + w.flush()?; + } + Ok(p) + } + + /// Create a CSV file of genesis allocations for all pcli test wallets. + pub fn generate_genesis_allocations(&self) -> anyhow::Result { + let allocations_filepath = self.pmonitor_integration_test_dir.join("allocations.csv"); + + // Generate file contents + if !allocations_filepath.exists() { + let mut w = BufWriter::new(File::create(&allocations_filepath)?); + let csv_header = String::from("amount,denom,address\n"); + w.write(csv_header.as_bytes())?; + for a in self.get_pcli_wallet_addresses()? { + let allo = format!("1_000_000__000_000,upenumbra,{}\n1000,test_usd,{}\n", a, a); + w.write(allo.as_bytes())?; + } + w.flush()?; + } + Ok(allocations_filepath) + } + + /// Create a genesis event for the local devnet, with genesis allocations for all pcli wallets. + /// This is a *destructive* action, as it removes the contents of the default pd network_data + /// directory prior to generation. + pub fn generate_network_data(&self) -> anyhow::Result<()> { + // TODO: it'd be nice if we wrote all this network_data to a tempdir, + // but instead we just reuse the default pd home. + + let reset_cmd = AssertCommand::cargo_bin("pd")? + .args(["network", "unsafe-reset-all"]) + .output(); + assert!( + reset_cmd.unwrap().status.success(), + "failed to clear out prior local devnet config" + ); + + // Ideally we'd use a rust interface to compose the network config, rather than shelling + // out to `pd`, but the current API for network config isn't ergonomic. Also, we get free + // integration testing for the `pd` CLI by shelling out, which is nice. + let cmd = AssertCommand::cargo_bin("pd")? + .args([ + "network", + "generate", + "--chain-id", + "penumbra-devnet-pmonitor", + "--unbonding-delay", + "50", + "--epoch-duration", + "50", + "--proposal-voting-blocks", + "50", + "--timeout-commit", + "3s", + // we must opt in to fees, in order to test the migration functionality! + "--gas-price-simple", + "500", + // include allocations for the generated pcli wallets + "--allocations-input-file", + &self + .generate_genesis_allocations()? + .to_str() + .expect("failed to convert allocations csv to str"), + ]) + .output(); + assert!( + cmd.unwrap().status.success(), + "failed to generate local devnet config" + ); + Ok(()) + } + + /// Generate a config directory for `pmonitor`, based on input FVKs. + pub fn initialize_pmonitor(&self) -> anyhow::Result<()> { + let cmd = AssertCommand::cargo_bin("pmonitor")? + .args([ + "--home", + self.pmonitor_home() + .to_str() + .expect("failed to convert pmonitor home to str"), + "init", + "--grpc-url", + "http://127.0.0.1:8080", + "--fvks", + self.get_pcli_wallet_fvks_filepath() + .context("failed to get wallet fvks")? + .to_str() + .expect("failed to convert fvks json filepath to str"), + ]) + .output(); + + assert!( + cmd.unwrap().status.success(), + "failed to initialize pmonitor" + ); + Ok(()) + } + + /// Run `pmonitor audit` based on the pcli wallets and associated FVKs. + pub fn pmonitor_audit(&self) -> anyhow::Result<()> { + let p = self.pmonitor_integration_test_dir.join("pmonitor"); + let cmd = AssertCommand::cargo_bin("pmonitor")? + .args([ + "--home", + p.to_str().expect("failed to convert pmonitor home to str"), + "audit", + ]) + .ok(); + if cmd.is_ok() { + Ok(()) + } else { + anyhow::bail!("failed during 'pmonitor audit'") + } + } + + /// Halt any pre-existing local devnet for these integration tests. + /// We assume that the port `8888` is unique to the process-compose API for this test suite. + fn stop_devnet(&self) -> anyhow::Result<()> { + // Confirm that process-compose is installed, otherwise integration tests can't run. + Command::new("process-compose") + .arg("--help") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .expect("process-compose is not available on PATH; activate the nix dev env"); + + // Stop an existing devnet on the custom port; ignore error, since we don't know one is + // running. + let cmd = Command::new("process-compose") + .env("PC_PORT_NUM", PROCESS_COMPOSE_PORT.to_string()) + .arg("down") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + + match cmd { + Ok(_c) => { + tracing::trace!( + "'process-compose down' completed, sleeping briefly during teardown" + ); + + std::thread::sleep(Duration::from_secs(3)); + return Ok(()); + } + Err(_e) => { + tracing::trace!( + "'process-compose down' failed, presumably no prior network running" + ); + Ok(()) + } + } + } + + /// Run a local devnet based on input config. Returns a handle to the spawned process, + /// so that cleanup can be handled gracefully. + /// We assume that the port `8888` is unique to the process-compose API for this test suite. + pub fn start_devnet(&self) -> anyhow::Result { + // Ensure no other instance is currently running; + self.stop_devnet()?; + + self.generate_network_data()?; + + // Stop an existing devnet on the custom port; ignore error, since we don't know one is + // running. + let child = Command::new("process-compose") + .env("PC_PORT_NUM", PROCESS_COMPOSE_PORT.to_string()) + .current_dir(REPO_ROOT.as_os_str()) + .args([ + "up", + "--detached", + "--config", + PROCESS_COMPOSE_MANIFEST_FILEPATH + .to_str() + .expect("failed to convert process-compose manifest to str"), + ]) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("failed to execute devnet start cmd"); + // Sleep a bit, to let network start + // TODO: use process-compose API to check for "Running" status on pd. + std::thread::sleep(Duration::from_secs(8)); + Ok(child) + } +} diff --git a/crates/bin/pmonitor/tests/common/pcli_helpers.rs b/crates/bin/pmonitor/tests/common/pcli_helpers.rs new file mode 100644 index 0000000000..c62cc4c057 --- /dev/null +++ b/crates/bin/pmonitor/tests/common/pcli_helpers.rs @@ -0,0 +1,63 @@ +//! Convenience methods for wrangling `pcli` CLI invocations, +//! via `cargo bin` commands, for use in integration testing. + +use anyhow::{Context, Result}; +use assert_cmd::Command as AssertCommand; +use penumbra_keys::{address::Address, FullViewingKey}; +use std::path::PathBuf; +use std::str::FromStr; + +/// Initialize a new pcli wallet at the target directory. +/// Discards the generated seed phrase. +pub fn pcli_init_softkms(pcli_home: &PathBuf) -> Result<()> { + let mut cmd = AssertCommand::cargo_bin("pcli")?; + cmd.args([ + "--home", + pcli_home + .to_str() + .expect("can convert wallet path to string"), + "init", + "--grpc-url", + "http://127.0.0.1:8080", + "soft-kms", + "generate", + ]) + // send empty string to accept the interstitial seed phrase display + .write_stdin(""); + cmd.assert().success(); + Ok(()) +} + +/// Convenience method for looking up `address 0` from +/// pcli wallet stored at `pcli_home`. +pub fn pcli_view_address(pcli_home: &PathBuf) -> Result
{ + let output = AssertCommand::cargo_bin("pcli")? + .args(["--home", pcli_home.to_str().unwrap(), "view", "address"]) + .output() + .expect("failed to retrieve address from pcli wallet"); + + // Convert output to String, to trim trailing newline. + let mut a = String::from_utf8_lossy(&output.stdout).to_string(); + if a.ends_with('\n') { + a.pop(); + } + Address::from_str(&a).with_context(|| format!("failed to convert str to Address: '{}'", a)) +} + +/// Perform a `pcli migrate balance` transaction from the wallet at `pcli_home`, +/// transferring funds to the destination `FullViewingKey`. +pub fn pcli_migrate_balance(pcli_home: &PathBuf, fvk: &FullViewingKey) -> Result<()> { + let mut cmd = AssertCommand::cargo_bin("pcli")?; + cmd.args([ + "--home", + pcli_home + .to_str() + .expect("can convert wallet path to string"), + "migrate", + "balance", + ]) + // pipe FVK to stdin + .write_stdin(fvk.to_string()); + cmd.assert().success(); + Ok(()) +} diff --git a/crates/bin/pmonitor/tests/network_integration.rs b/crates/bin/pmonitor/tests/network_integration.rs new file mode 100644 index 0000000000..c5bad684fb --- /dev/null +++ b/crates/bin/pmonitor/tests/network_integration.rs @@ -0,0 +1,247 @@ +//! Integration integration testing of `pmonitor` against a local devnet. +//! Sets up various scenarios of genesis allocations, and ensures the tool reports +//! violations as errors. +//! +//! As a convenience to developers, there's a commented-out `sleep` call in the +//! `audit_passes_on_compliant_wallets` test. If enabled, the setup testbed can be interacted with +//! manually, which helps when trying to diagnose behavior of the tool. +use anyhow::Context; +use assert_cmd::Command as AssertCommand; +use pcli::config::PcliConfig; +mod common; +use crate::common::pcli_helpers::{pcli_init_softkms, pcli_migrate_balance, pcli_view_address}; +use crate::common::PmonitorTestRunner; + +#[ignore] +#[test] +/// Tests the simplest happy path for pmonitor: all wallets have genesis balances, +/// they never transferred any funds out, nor migrated balances, so all +/// current balances equal the genesis balances. In this case `pmonitor` +/// should exit 0. +fn audit_passes_on_compliant_wallets() -> anyhow::Result<()> { + tracing_subscriber::fmt::try_init().ok(); + let p = PmonitorTestRunner::new(); + p.create_pcli_wallets()?; + let _network = p.start_devnet()?; + p.initialize_pmonitor()?; + + // Debugging: uncomment the sleep line below if you want to interact with the pmonitor testbed + // that was set up already. Use e.g.: + // + // cargo run --bin pmonitor -- --home /tmp/pmonitor-integration-test/pmonitor audit + // + // to view the output locally. + // + // std::thread::sleep(std::time::Duration::from_secs(3600)); + + p.pmonitor_audit()?; + Ok(()) +} + +#[ignore] +#[test] +/// Tests another happy path for pmonitor: all wallets have genesis balances, +/// one of the wallets ran `pcli migrate balance` once. This means that all +/// wallets still have their genesis balance, save one, which has the genesis +/// balance minus gas fees. In this case, `pmonitor` should exit 0, +/// because it understood the balance migration and updated the FVK. +fn audit_passes_on_wallets_that_migrated_once() -> anyhow::Result<()> { + let p = PmonitorTestRunner::new(); + p.create_pcli_wallets()?; + let _network = p.start_devnet()?; + // Run audit once, to confirm compliance on clean slate. + p.initialize_pmonitor()?; + p.pmonitor_audit()?; + + // Create an empty wallet, with no genesis funds, to which we'll migrate a balance. + let alice_pcli_home = p.wallets_dir()?.join("wallet-alice"); + pcli_init_softkms(&alice_pcli_home)?; + let alice_pcli_config = PcliConfig::load( + alice_pcli_home + .join("config.toml") + .to_str() + .expect("failed to convert alice wallet to str"), + )?; + + // Take the second wallet, and migrate its balance to Alice. + let migrated_wallet = p.wallets_dir()?.join("wallet-1"); + pcli_migrate_balance(&migrated_wallet, &alice_pcli_config.full_viewing_key)?; + + // Now re-run the audit tool: it should report OK again, because all we did was migrate. + p.pmonitor_audit()?; + Ok(()) +} + +#[ignore] +#[test] +/// Tests another happy path for pmonitor: all wallets have genesis balances, +/// one of the wallets ran `pcli migrate balance` once, then that receiving +/// wallet ran `pcli migrate balance` itself, so the genesis funds are now +/// two (2) FVKs away from the original account. In this case, +/// pmonitor` should exit 0, because it understood all balance migrations +/// and updated the FVK in its config file accordingly. +fn audit_passes_on_wallets_that_migrated_twice() -> anyhow::Result<()> { + let p = PmonitorTestRunner::new(); + p.create_pcli_wallets()?; + let _network = p.start_devnet()?; + // Run audit once, to confirm compliance on clean slate. + p.initialize_pmonitor()?; + p.pmonitor_audit() + .context("failed unexpectedly during initial audit run")?; + + // Create an empty wallet, with no genesis funds, to which we'll migrate a balance. + let alice_pcli_home = p.wallets_dir()?.join("wallet-alice"); + pcli_init_softkms(&alice_pcli_home)?; + let alice_pcli_config = PcliConfig::load( + alice_pcli_home + .join("config.toml") + .to_str() + .expect("failed to convert alice wallet to str"), + )?; + + // Take the second wallet, and migrate its balance to Alice. + let migrated_wallet = p.wallets_dir()?.join("wallet-1"); + pcli_migrate_balance(&migrated_wallet, &alice_pcli_config.full_viewing_key)?; + + // Now re-run the audit tool: it should report OK again, because all we did was migrate. + p.pmonitor_audit() + .context("failed unexpectedly during second audit run")?; + + // Create another empty wallet, with no genesis funds, to which we'll migrate a balance. + let bob_pcli_home = p.wallets_dir()?.join("wallet-bob"); + pcli_init_softkms(&bob_pcli_home)?; + let bob_pcli_config = PcliConfig::load( + bob_pcli_home + .join("config.toml") + .to_str() + .expect("failed to convert bob wallet to str"), + )?; + + // Re-migrate the balance from Alice to Bob. + pcli_migrate_balance(&alice_pcli_home, &bob_pcli_config.full_viewing_key)?; + + // Now re-run the audit tool: it should report OK again, confirming that it + // successfully tracks multiple migratrions. + p.pmonitor_audit() + .context("failed unexpectedly during final audit run in test")?; + + Ok(()) +} +#[ignore] +#[test] +/// Tests an unhappy path for `pmonitor`: a single wallet has sent all its funds +/// to non-genesis account, via `pcli tx send` rather than `pcli migrate balance`. +/// In this case, `pmonitor` should exit non-zero. +fn audit_fails_on_misbehaving_wallet_that_sent_funds() -> anyhow::Result<()> { + let p = PmonitorTestRunner::new(); + p.create_pcli_wallets()?; + let _network = p.start_devnet()?; + // Run audit once, to confirm compliance on clean slate. + p.initialize_pmonitor()?; + p.pmonitor_audit()?; + + // Create an empty wallet, with no genesis funds, to which we'll + // manually send balance. + let alice_pcli_home = p.wallets_dir()?.join("wallet-alice"); + pcli_init_softkms(&alice_pcli_home)?; + + let alice_address = pcli_view_address(&alice_pcli_home)?; + + // Take the second wallet, and send most of its funds of staking tokens to Alice. + let misbehaving_wallet = p.wallets_dir()?.join("wallet-1"); + + let send_cmd = AssertCommand::cargo_bin("pcli")? + .args([ + "--home", + misbehaving_wallet.to_str().unwrap(), + "tx", + "send", + "--to", + &alice_address.to_string(), + "900penumbra", + ]) + .output() + .expect("failed to execute sending tx to alice wallet"); + assert!(send_cmd.status.success(), "failed to send tx to alice"); + + // Now re-run the audit tool: it should report failure, via a non-zero exit code, + // because of the missing funds. + let result = p.pmonitor_audit(); + assert!( + result.is_err(), + "expected pmonitor to fail due to missing funds" + ); + Ok(()) +} + +#[ignore] +#[test] +/// Tests a happy path for `pmonitor`: a single wallet has sent all its funds +/// to non-genesis account, via `pcli tx send` rather than `pcli migrate balance`, +/// but the receiving wallet then sent those funds back. +/// In this case, `pmonitor` should exit zero. +fn audit_passes_on_misbehaving_wallet_that_sent_funds_but_got_them_back() -> anyhow::Result<()> { + tracing_subscriber::fmt::try_init().ok(); + let p = PmonitorTestRunner::new(); + p.create_pcli_wallets()?; + let _network = p.start_devnet()?; + // Run audit once, to confirm compliance on clean slate. + p.initialize_pmonitor()?; + p.pmonitor_audit()?; + + // Create an empty wallet, with no genesis funds, to which we'll + // manually send balance. + let alice_pcli_home = p.wallets_dir()?.join("wallet-alice"); + pcli_init_softkms(&alice_pcli_home)?; + + let alice_address = pcli_view_address(&alice_pcli_home)?; + + // Take the second wallet, and send most of its funds of staking tokens to Alice. + let misbehaving_wallet = p.wallets_dir()?.join("wallet-1"); + + let send_cmd = AssertCommand::cargo_bin("pcli")? + .args([ + "--home", + misbehaving_wallet.to_str().unwrap(), + "tx", + "send", + "--to", + &alice_address.to_string(), + "900penumbra", + ]) + .output() + .expect("failed to execute sending tx to alice wallet"); + assert!(send_cmd.status.success(), "failed to send tx to alice"); + + // The audit tool detects this state as a failure, since funds are missing. + let result = p.pmonitor_audit(); + assert!( + result.is_err(), + "expected pmonitor to fail due to missing funds" + ); + + // Send the funds from alice back to the misbehaving wallet. + let misbehaving_address = pcli_view_address(&misbehaving_wallet)?; + let refund_cmd = AssertCommand::cargo_bin("pcli")? + .args([ + "--home", + alice_pcli_home.to_str().unwrap(), + "tx", + "send", + "--to", + &misbehaving_address.to_string(), + // We intentionally specify a bit less than we received, to account for gas. + "899.99penumbra", + ]) + .output() + .expect("failed to execute refund tx from alice wallet"); + assert!( + refund_cmd.status.success(), + "failed to send refund tx from alice" + ); + + // The audit tool detects this state as compliant again, because the funds were returned. + p.pmonitor_audit()?; + + Ok(()) +} diff --git a/crates/view/src/storage.rs b/crates/view/src/storage.rs index 8da8a3d74c..330b47c7a9 100644 --- a/crates/view/src/storage.rs +++ b/crates/view/src/storage.rs @@ -1258,11 +1258,11 @@ impl Storage { dbtx.execute( "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed) VALUES (?1, ?2, ?3, ?4, ?5) - ON CONFLICT (note_commitment) - DO UPDATE SET - address = excluded.address, - amount = excluded.amount, - asset_id = excluded.asset_id, + ON CONFLICT (note_commitment) + DO UPDATE SET + address = excluded.address, + amount = excluded.amount, + asset_id = excluded.asset_id, rseed = excluded.rseed", (note_commitment, address, amount, asset_id, rseed), )?; @@ -1432,7 +1432,7 @@ impl Storage { let params_bytes = params.encode_to_vec(); // We expect app_params to be present already but may as well use an upsert dbtx.execute( - "INSERT INTO kv (k, v) VALUES ('app_params', ?1) + "INSERT INTO kv (k, v) VALUES ('app_params', ?1) ON CONFLICT(k) DO UPDATE SET v = excluded.v", [¶ms_bytes[..]], )?; @@ -1460,12 +1460,12 @@ impl Storage { (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash) VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7) ON CONFLICT (note_commitment) - DO UPDATE SET nullifier = excluded.nullifier, - position = excluded.position, - height_created = excluded.height_created, - address_index = excluded.address_index, - source = excluded.source, - height_spent = excluded.height_spent, + DO UPDATE SET nullifier = excluded.nullifier, + position = excluded.position, + height_created = excluded.height_created, + address_index = excluded.address_index, + source = excluded.source, + height_spent = excluded.height_spent, tx_hash = excluded.tx_hash", ( ¬e_commitment, @@ -1492,12 +1492,12 @@ impl Storage { dbtx.execute( "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source) VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6) - ON CONFLICT (swap_commitment) - DO UPDATE SET swap = excluded.swap, - position = excluded.position, - nullifier = excluded.nullifier, - output_data = excluded.output_data, - height_claimed = excluded.height_claimed, + ON CONFLICT (swap_commitment) + DO UPDATE SET swap = excluded.swap, + position = excluded.position, + nullifier = excluded.nullifier, + output_data = excluded.output_data, + height_claimed = excluded.height_claimed, source = excluded.source", ( &swap_commitment, @@ -1589,13 +1589,15 @@ impl Storage { let tx_hash_owned = sha2::Sha256::digest(&tx_bytes); let tx_hash = tx_hash_owned.as_slice(); let tx_block_height = filtered_block.height as i64; - let return_address = transaction.decrypt_memo(&fvk).map_or(None, |x| Some(x.return_address().to_vec())); + let decrypted_memo = transaction.decrypt_memo(&fvk).ok(); + let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string())); + let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec())); tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction"); dbtx.execute( - "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address) VALUES (?1, ?2, ?3, ?4)", - (&tx_hash, &tx_bytes, tx_block_height, return_address), + "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)", + (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text), )?; // Associate all of the spent nullifiers with the transaction by hash. @@ -1756,4 +1758,31 @@ impl Storage { Ok(records) } + + /// Get all transactions with a matching memo text. The `pattern` argument + /// should include SQL wildcards, such as `%` and `_`, to match substrings, + /// e.g. `%foo%`. + pub async fn transactions_matching_memo( + &self, + pattern: String, + ) -> anyhow::Result, Transaction, String)>> { + let pattern = pattern.to_owned(); + tracing::trace!(?pattern, "searching for memos matching"); + let pool = self.pool.clone(); + + spawn_blocking(move || { + pool.get()? + .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")? + .query_and_then([pattern], |row| { + let block_height: u64 = row.get("block_height")?; + let tx_hash: Vec = row.get("tx_hash")?; + let tx_bytes: Vec = row.get("tx_bytes")?; + let tx = Transaction::decode(tx_bytes.as_slice())?; + let memo_text: String = row.get("memo_text")?; + anyhow::Ok((block_height, tx_hash, tx, memo_text)) + })? + .collect() + }) + .await? + } } diff --git a/crates/view/src/storage/schema.sql b/crates/view/src/storage/schema.sql index f50e6d12f8..41c52ba820 100644 --- a/crates/view/src/storage/schema.sql +++ b/crates/view/src/storage/schema.sql @@ -54,7 +54,8 @@ CREATE TABLE tx ( tx_hash BLOB PRIMARY KEY NOT NULL, tx_bytes BLOB NOT NULL, block_height BIGINT NOT NULL, - return_address BLOB + return_address BLOB, + memo_text TEXT ); -- This table just records the mapping from note commitments to note plaintexts. diff --git a/deployments/containerfiles/Dockerfile b/deployments/containerfiles/Dockerfile index 5a4ff4b3c2..b33402c255 100644 --- a/deployments/containerfiles/Dockerfile +++ b/deployments/containerfiles/Dockerfile @@ -49,6 +49,7 @@ COPY --from=build-env \ /usr/src/penumbra/target/release/pclientd \ /usr/src/penumbra/target/release/pd \ /usr/src/penumbra/target/release/pindexer \ + /usr/src/penumbra/target/release/pmonitor \ /usr/src/penumbra/target/release/tct-live-edit \ /usr/bin/ diff --git a/deployments/scripts/pmonitor-integration-test.sh b/deployments/scripts/pmonitor-integration-test.sh new file mode 100755 index 0000000000..51d2f3acc5 --- /dev/null +++ b/deployments/scripts/pmonitor-integration-test.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# quick script to test the `pmonitor` tool during review +# set -euo pipefail +set -eu + +>&2 echo "Preparing pmonitor test bed..." +num_wallets=10 + +# ideally we'd use a tempdir but using a hardcoded dir for debugging +# pmonitor_integration_test_dir="$(mktemp -p /tmp -d pmonitor-integration-test.XXXXXX)" +pmonitor_integration_test_dir="/tmp/pmonitor-integration-test" +rm -rf "$pmonitor_integration_test_dir" +mkdir "$pmonitor_integration_test_dir" + +pmonitor_home="${pmonitor_integration_test_dir}/pmonitor" +wallets_dir="${pmonitor_integration_test_dir}/wallets" +wallet_addresses="${pmonitor_integration_test_dir}/addresses.txt" +allocations_csv="${pmonitor_integration_test_dir}/pmonitor-test-allocations.csv" +fvks_json="${pmonitor_integration_test_dir}/fvks.json" +cargo run --release --bin pd -- network unsafe-reset-all || true +cargo run --release --bin pmonitor -- reset || true +mkdir "$wallets_dir" +# override process-compose default port of 8080, which we use for pd +export PC_PORT_NUM="8888" +process-compose down || true + +>&2 echo "creating pcli wallets" +for i in $(seq 1 "$num_wallets"); do + yes | cargo run -q --release --bin pcli -- --home "${wallets_dir}/wallet-$i" init --grpc-url http://localhost:8080 soft-kms generate +done + +# collect addresses +>&2 echo "collecting pcli wallet addresses" +for i in $(seq 1 "$num_wallets"); do + cargo run -q --release --bin pcli -- --home "${wallets_dir}/wallet-$i" view address +done > "$wallet_addresses" + + +# generate genesis allocations +>&2 echo "generating genesis allocations" +printf 'amount,denom,address\n' > "$allocations_csv" +while read -r a ; do + printf '1_000_000__000_000,upenumbra,%s\n1000,test_usd,%s\n' "$a" "$a" +done < "$wallet_addresses" >> "$allocations_csv" + +# generate network data +>&2 echo "generating network data" +cargo run --release --bin pd -- network generate \ + --chain-id penumbra-devnet-pmonitor \ + --unbonding-delay 50 \ + --epoch-duration 50 \ + --proposal-voting-blocks 50 \ + --timeout-commit 3s \ + --gas-price-simple 500 \ + --allocations-input-file "$allocations_csv" + +# run network +>&2 echo "running local devnet" +process-compose up --detached --config deployments/compose/process-compose.yml + +# ensure network is torn down afterward; comment this out if you want +# to interact with the network after tests complete. +trap 'process-compose down || true' EXIT + +# wait for network to come up; lazily sleeping, rather than polling process-compose for "ready" state +sleep 8 + +>&2 echo "collecting fvks" +fd config.toml "$wallets_dir" -x toml get {} full_viewing_key | jq -s > "$fvks_json" + +>&2 echo "initializing pmonitor" +cargo run --release --bin pmonitor -- \ + --home "$pmonitor_home" \ + init --fvks "$fvks_json" --grpc-url http://localhost:8080 + +>&2 echo "running pmonitor audit" +# happy path: we expect this audit to exit 0, because no transfers have occurred yet +cargo run --release --bin pmonitor -- \ + --home "$pmonitor_home" \ + audit + +>&2 echo "exiting BEFORE misbehavior" +exit 0 + + + +>&2 echo "committing misbehavior" +alice_wallet="${wallets_dir}/wallet-alice" +yes | cargo run --quiet --release --bin pcli -- --home "$alice_wallet" init --grpc-url http://localhost:8080 soft-kms generate +alice_address="$(cargo run --quiet --release --bin pcli -- --home "$alice_wallet" view address)" +misbehaving_wallet="${wallets_dir}/wallet-2" +cargo run --quiet --release --bin pcli -- --home "$misbehaving_wallet" tx send --memo "take these tokens, but tell no one" 500penumbra --to "$alice_address" + +>&2 echo "re-running pmonitor audit" +# unhappy path: we expect this audit to exit 10, because a transfer occurred from a monitored wallet +# TODO: make pmonitor exit non-zero when there's bad misbehavior +cargo run --release --bin pmonitor -- \ + --home "$pmonitor_home" \ + audit | tee "${wallets_dir}/pmonitor-log-1.txt" + +printf '#################################\n' +printf 'PMONITOR INTEGRATION TEST SUMMARY\n' +printf '#################################\n' + +if grep -q "Unexpected balance! Balance is less than the genesis balance" "${wallets_dir}/pmonitor-log-1.txt" ; then + >&2 echo "OK: 'pmonitor audit' reported unexpected balance, due to misbehavior" +else + >&2 echo "ERROR: 'pmonitor audit' failed to identify misbehavior, which we know occurred" + exit 1 +fi diff --git a/deployments/scripts/rust-docs b/deployments/scripts/rust-docs index 89428747ff..571418bb2f 100755 --- a/deployments/scripts/rust-docs +++ b/deployments/scripts/rust-docs @@ -31,6 +31,7 @@ cargo +nightly doc --no-deps \ -p pcli \ -p pclientd \ -p pd \ + -p pmonitor \ -p penumbra-app \ -p penumbra-asset \ -p penumbra-community-pool \ diff --git a/flake.nix b/flake.nix index 3c094cb3e5..eba0a4f28f 100644 --- a/flake.nix +++ b/flake.nix @@ -81,7 +81,7 @@ [clang openssl rocksdb]; inherit system PKG_CONFIG_PATH LIBCLANG_PATH ROCKSDB_LIB_DIR; - cargoExtraArgs = "-p pd -p pcli -p pclientd -p pindexer"; + cargoExtraArgs = "-p pd -p pcli -p pclientd -p pindexer -p pmonitor"; meta = { description = "A fully private proof-of-stake network and decentralized exchange for the Cosmos ecosystem"; homepage = "https://penumbra.zone"; @@ -137,6 +137,8 @@ pclientd.program = "${penumbra}/bin/pclientd"; pindexer.type = "app"; pindexer.program = "${penumbra}/bin/pindexer"; + pmonitor.type = "app"; + pmonitor.program = "${penumbra}/bin/pmonitor"; cometbft.type = "app"; cometbft.program = "${cometbft}/bin/cometbft"; }; diff --git a/justfile b/justfile index e7c72fe9c6..bb5f5d29d6 100644 --- a/justfile +++ b/justfile @@ -2,6 +2,15 @@ default: @just --list +# Run integration tests for pmonitor tool +test-pmonitor: + # prebuild cargo binaries required for integration tests + cargo -q build --package pcli --package pd --package pmonitor + cargo -q run --release --bin pd -- network unsafe-reset-all + rm -rf /tmp/pmonitor-integration-test + cargo nextest run -p pmonitor --run-ignored=ignored-only --test-threads 1 + # cargo test -p pmonitor -- --ignored --test-threads 1 --nocapture + # Creates and runs a local devnet with solo validator. Includes ancillary services # like metrics, postgres for storing ABCI events, and pindexer for munging those events. dev: