diff --git a/Cargo.toml b/Cargo.toml index 5f49e2a5..e9c67fca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,13 @@ rand = "0.8.5" bigdecimal = "0.4.5" strum = "0.26.3" strum_macros = "0.26.3" + +[patch.'https://github.com/anoma/namada'] +namada_core = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_sdk = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_tx = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_governance = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_ibc = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_token = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_parameters = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } +namada_proof_of_stake = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" } diff --git a/chain/src/main.rs b/chain/src/main.rs index c5973abe..72de4ab0 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -157,9 +157,10 @@ async fn crawling_fn( .into_rpc_error()?; let addresses = block.addresses_with_balance_change(native_token); - let balances = namada_service::query_balance(&client, &addresses) - .await - .into_rpc_error()?; + let balances = + namada_service::query_balance(&client, &addresses, block_height) + .await + .into_rpc_error()?; tracing::info!("Updating balance for {} addresses...", addresses.len()); let next_governance_proposal_id = @@ -326,7 +327,9 @@ async fn initial_query( sleep(Duration::from_secs(initial_query_retry_time)).await; } - let balances = query_all_balances(client).await.into_rpc_error()?; + let balances = query_all_balances(client, block_height) + .await + .into_rpc_error()?; tracing::info!("Querying bonds and unbonds..."); let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None) diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index 5510ed95..ea408202 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -1,29 +1,39 @@ use anyhow::Context; -use diesel::upsert::excluded; -use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; -use orm::balances::BalancesInsertDb; -use orm::schema::balances; +use diesel::{PgConnection, RunQueryDsl}; +use orm::balances::BalanceChangesInsertDb; +use orm::schema::balance_changes; use shared::balance::Balances; pub fn insert_balance( transaction_conn: &mut PgConnection, balances: Balances, ) -> anyhow::Result<()> { - diesel::insert_into(balances::table) - .values::<&Vec>( - &balances - .into_iter() - .map(BalancesInsertDb::from_balance) - .collect::>(), - ) - .on_conflict((balances::columns::owner, balances::columns::token)) - .do_update() - .set( - balances::columns::raw_amount - .eq(excluded(balances::columns::raw_amount)), - ) - .execute(transaction_conn) - .context("Failed to update balances in db")?; + tracing::info!("Inserting {} balances into db", balances.len()); + + // Group balances into chunks to avoid hitting the limit of the number of bind parameters in one query. + balances + .chunks(10000) + .into_iter() + .try_for_each(|balances_chunk| { + diesel::insert_into(balance_changes::table) + .values::<&Vec>( + &balances_chunk + .to_vec() + .into_iter() + .map(BalanceChangesInsertDb::from_balance) + .collect::>(), + ) + .on_conflict(( + balance_changes::columns::owner, + balance_changes::columns::token, + balance_changes::columns::height, + )) + .do_nothing() + .execute(transaction_conn) + .context("Failed to update balances in db")?; + + anyhow::Ok(()) + })?; anyhow::Ok(()) } diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 5c57d59d..f43fdc75 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -69,6 +69,7 @@ pub async fn get_epoch_at_block_height( pub async fn query_balance( client: &HttpClient, balance_changes: &HashSet, + block_height: BlockHeight, ) -> anyhow::Result { Ok(futures::stream::iter(balance_changes) .filter_map(|balance_change| async move { @@ -86,14 +87,20 @@ pub async fn query_balance( .context("Failed to parse token address") .ok()?; - let amount = rpc::get_token_balance(client, &token, &owner) - .await - .unwrap_or_default(); + let amount = rpc::get_token_balance( + client, + &token, + &owner, + Some(to_block_height(block_height)), + ) + .await + .unwrap_or_default(); Some(Balance { owner: Id::from(owner), token: Id::from(token), amount: Amount::from(amount), + height: block_height, }) }) .map(futures::future::ready) @@ -104,6 +111,7 @@ pub async fn query_balance( pub async fn query_all_balances( client: &HttpClient, + height: BlockHeight, ) -> anyhow::Result { let token_addr = RPC .shell() @@ -113,10 +121,13 @@ pub async fn query_all_balances( let balance_prefix = namada_token::storage_key::balance_prefix(&token_addr); - let balances = - query_storage_prefix::(client, &balance_prefix) - .await - .context("Failed to query all balances")?; + let balances = query_storage_prefix::( + client, + &balance_prefix, + Some(height), + ) + .await + .context("Failed to query all balances")?; let mut all_balances: Balances = vec![]; @@ -133,6 +144,7 @@ pub async fn query_all_balances( owner: Id::from(o), token: Id::from(t), amount: Amount::from(b), + height, }); } } @@ -522,6 +534,8 @@ pub async fn query_all_votes( anyhow::Ok(votes.iter().flatten().cloned().collect()) } -fn to_block_height(block_height: u32) -> NamadaSdkBlockHeight { +pub(super) fn to_block_height( + block_height: BlockHeight, +) -> NamadaSdkBlockHeight { NamadaSdkBlockHeight::from(block_height as u64) } diff --git a/chain/src/services/utils.rs b/chain/src/services/utils.rs index 4e131158..64dcfc57 100644 --- a/chain/src/services/utils.rs +++ b/chain/src/services/utils.rs @@ -3,19 +3,28 @@ use namada_sdk::queries::RPC; use namada_sdk::storage::{self, PrefixValue}; use tendermint_rpc::HttpClient; +use shared::block::BlockHeight; + /// Query a range of storage values with a matching prefix and decode them with /// [`BorshDeserialize`]. Returns an iterator of the storage keys paired with /// their associated values. pub async fn query_storage_prefix( client: &HttpClient, key: &storage::Key, + height: Option, ) -> anyhow::Result>> where T: BorshDeserialize, { let values = RPC .shell() - .storage_prefix(client, None, None, false, key) + .storage_prefix( + client, + None, + height.map(super::namada::to_block_height), + false, + key, + ) .await?; let decode = |PrefixValue { key, value }: PrefixValue| { diff --git a/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql b/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql new file mode 100644 index 00000000..db8efdcd --- /dev/null +++ b/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql @@ -0,0 +1,18 @@ +-- This file should undo anything in `up.sql` +DROP VIEW balances; + +CREATE INDEX index_balances_owner ON balance_changes (OWNER, token); + +DROP INDEX index_balance_changes_owner_token_height; + +ALTER TABLE balance_changes + DROP CONSTRAINT balance_changes_owner_token_height_key; + +ALTER TABLE balance_changes RENAME TO balances; + +ALTER TABLE balances + ADD CONSTRAINT balances_owner_token_key UNIQUE (OWNER, token); + +ALTER TABLE balances + DROP COLUMN height; + diff --git a/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql b/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql new file mode 100644 index 00000000..53617c6c --- /dev/null +++ b/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql @@ -0,0 +1,40 @@ +-- Your SQL goes here +ALTER TABLE balances + ADD COLUMN height integer NOT NULL DEFAULT 0; + +ALTER TABLE balances + ALTER COLUMN height DROP DEFAULT; + +ALTER TABLE balances + DROP CONSTRAINT balances_owner_token_key; + +ALTER TABLE balances RENAME TO balance_changes; + +ALTER TABLE balance_changes + ADD CONSTRAINT balance_changes_owner_token_height_key UNIQUE (OWNER, token, height); + +CREATE INDEX index_balance_changes_owner_token_height ON balance_changes (OWNER, token, height); + +DROP INDEX index_balances_owner; + +CREATE VIEW balances AS +SELECT + bc.id, + bc.owner, + bc.token, + bc.raw_amount +FROM + balance_changes bc + JOIN ( + SELECT + OWNER, + token, + MAX(height) AS max_height + FROM + balance_changes + GROUP BY + OWNER, + token) max_heights ON bc.owner = max_heights.owner + AND bc.token = max_heights.token + AND bc.height = max_heights.max_height; + diff --git a/orm/src/balances.rs b/orm/src/balances.rs index 0f5d2eb6..f81afa48 100644 --- a/orm/src/balances.rs +++ b/orm/src/balances.rs @@ -4,26 +4,38 @@ use bigdecimal::BigDecimal; use diesel::{Insertable, Queryable, Selectable}; use shared::balance::Balance; -use crate::schema::balances; +use crate::schema::balance_changes; +use crate::views::balances; #[derive(Insertable, Clone, Queryable, Selectable)] -#[diesel(table_name = balances)] +#[diesel(table_name = balance_changes)] #[diesel(check_for_backend(diesel::pg::Pg))] -pub struct BalancesInsertDb { +pub struct BalanceChangesInsertDb { pub owner: String, pub token: String, pub raw_amount: BigDecimal, + pub height: i32, } -pub type BalanceDb = BalancesInsertDb; +pub type BalanceChangeDb = BalanceChangesInsertDb; + +#[derive(Clone, Queryable, Selectable)] +#[diesel(table_name = balances)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct BalanceDb { + pub owner: String, + pub token: String, + pub raw_amount: BigDecimal, +} -impl BalancesInsertDb { +impl BalanceChangesInsertDb { pub fn from_balance(balance: Balance) -> Self { Self { owner: balance.owner.to_string(), token: balance.token.to_string(), raw_amount: BigDecimal::from_str(&balance.amount.to_string()) .expect("Invalid amount"), + height: balance.height as i32, } } } diff --git a/orm/src/lib.rs b/orm/src/lib.rs index c289f572..2fa49de7 100644 --- a/orm/src/lib.rs +++ b/orm/src/lib.rs @@ -13,3 +13,4 @@ pub mod schema; pub mod transactions; pub mod unbond; pub mod validators; +pub mod views; diff --git a/orm/src/schema.rs b/orm/src/schema.rs index 9e90f252..86b9590e 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -67,11 +67,12 @@ pub mod sql_types { } diesel::table! { - balances (id) { + balance_changes (id) { id -> Int4, owner -> Varchar, token -> Varchar, raw_amount -> Numeric, + height -> Int4, } } @@ -256,7 +257,7 @@ diesel::joinable!(pos_rewards -> validators (validator_id)); diesel::joinable!(unbonds -> validators (validator_id)); diesel::allow_tables_to_appear_in_same_query!( - balances, + balance_changes, bonds, chain_parameters, crawler_state, diff --git a/orm/src/views.rs b/orm/src/views.rs new file mode 100644 index 00000000..b44b5c4d --- /dev/null +++ b/orm/src/views.rs @@ -0,0 +1,9 @@ +// Manually create schema for views - see also https://github.com/diesel-rs/diesel/issues/1482 +diesel::table! { + balances (id) { + id -> Int4, + owner -> Varchar, + token -> Varchar, + raw_amount -> Numeric, + } +} diff --git a/seeder/src/main.rs b/seeder/src/main.rs index 17a8865d..d763cd14 100644 --- a/seeder/src/main.rs +++ b/seeder/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Context; use clap::Parser; use clap_verbosity_flag::LevelFilter; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; -use orm::balances::BalancesInsertDb; +use orm::balances::BalanceChangesInsertDb; use orm::bond::BondInsertDb; use orm::governance_proposal::{ GovernanceProposalInsertDb, GovernanceProposalUpdateStatusDb, @@ -10,8 +10,8 @@ use orm::governance_proposal::{ use orm::governance_votes::GovernanceProposalVoteInsertDb; use orm::pos_rewards::PosRewardInsertDb; use orm::schema::{ - balances, bonds, governance_proposals, governance_votes, pos_rewards, - unbonds, validators, + balance_changes, bonds, governance_proposals, governance_votes, + pos_rewards, unbonds, validators, }; use orm::unbond::UnbondInsertDb; use orm::validators::{ValidatorDb, ValidatorInsertDb}; @@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<(), MainError> { .execute(transaction_conn) .context("Failed to remove all validators")?; - diesel::delete(balances::table) + diesel::delete(balance_changes::table) .execute(transaction_conn) .context("Failed to remove all validators")?; @@ -201,12 +201,12 @@ async fn main() -> anyhow::Result<(), MainError> { .execute(transaction_conn) .context("Failed to insert pos rewards in db")?; - diesel::insert_into(balances::table) - .values::<&Vec>( + diesel::insert_into(balance_changes::table) + .values::<&Vec>( &balances .into_iter() .map(|balance| { - BalancesInsertDb::from_balance(balance) + BalanceChangesInsertDb::from_balance(balance) }) .collect::>(), ) diff --git a/shared/src/balance.rs b/shared/src/balance.rs index eec0d1d6..9db8eb84 100644 --- a/shared/src/balance.rs +++ b/shared/src/balance.rs @@ -77,6 +77,7 @@ pub struct Balance { pub owner: Id, pub token: Id, pub amount: Amount, + pub height: u32, } pub type Balances = Vec; @@ -92,6 +93,7 @@ impl Balance { owner: Id::Account(address.to_string()), token: Id::Account(token_address.to_string()), amount: Amount::fake(), + height: (0..10000).fake::(), } } } diff --git a/transactions/src/repository/transactions.rs b/transactions/src/repository/transactions.rs index 8fb98217..1257fc43 100644 --- a/transactions/src/repository/transactions.rs +++ b/transactions/src/repository/transactions.rs @@ -18,6 +18,7 @@ pub fn insert_inner_transactions( .map(InnerTransactionInsertDb::from) .collect::>(), ) + .on_conflict_do_nothing() .execute(transaction_conn) .context("Failed to insert inner transactions in db")?; @@ -34,6 +35,7 @@ pub fn insert_wrapper_transactions( .map(WrapperTransactionInsertDb::from) .collect::>(), ) + .on_conflict_do_nothing() .execute(transaction_conn) .context("Failed to insert wrapper transactions in db")?; diff --git a/webserver/src/repository/balance.rs b/webserver/src/repository/balance.rs index 702fe645..f428b212 100644 --- a/webserver/src/repository/balance.rs +++ b/webserver/src/repository/balance.rs @@ -1,7 +1,7 @@ use axum::async_trait; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; use orm::balances::BalanceDb; -use orm::schema::balances; +use orm::views::balances; use crate::appstate::AppState;