diff --git a/chain/src/main.rs b/chain/src/main.rs index 3f388430..d6cf5a0c 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -151,7 +151,7 @@ async fn crawling_fn( let addresses = block.addresses_with_balance_change(native_token); let balances = - namada_service::query_balance(&client, &addresses, Some(block_height)) + namada_service::query_balance(&client, &addresses, block_height) .await .into_rpc_error()?; tracing::info!("Updating balance for {} addresses...", addresses.len()); @@ -292,7 +292,7 @@ async fn initial_query( sleep(Duration::from_secs(initial_query_retry_time)).await; } - let balances = query_all_balances(client, Some(block_height)) + let balances = query_all_balances(client, block_height) .await .into_rpc_error()?; diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index 5510ed95..aec3fed9 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -1,27 +1,26 @@ use anyhow::Context; -use diesel::upsert::excluded; -use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; -use orm::balances::BalancesInsertDb; -use orm::schema::balances; +use diesel::{PgConnection, RunQueryDsl}; +use orm::balances::BalanceChangesInsertDb; +use orm::schema::balance_changes; use shared::balance::Balances; pub fn insert_balance( transaction_conn: &mut PgConnection, balances: Balances, ) -> anyhow::Result<()> { - diesel::insert_into(balances::table) - .values::<&Vec>( + diesel::insert_into(balance_changes::table) + .values::<&Vec>( &balances .into_iter() - .map(BalancesInsertDb::from_balance) + .map(BalanceChangesInsertDb::from_balance) .collect::>(), ) - .on_conflict((balances::columns::owner, balances::columns::token)) - .do_update() - .set( - balances::columns::raw_amount - .eq(excluded(balances::columns::raw_amount)), - ) + .on_conflict(( + balance_changes::columns::owner, + balance_changes::columns::token, + balance_changes::columns::height, + )) + .do_nothing() .execute(transaction_conn) .context("Failed to update balances in db")?; diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 7833da09..bec7f023 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -57,7 +57,7 @@ pub async fn get_epoch_at_block_height( pub async fn query_balance( client: &HttpClient, balance_changes: &HashSet, - block_height: Option, + block_height: BlockHeight, ) -> anyhow::Result { Ok(futures::stream::iter(balance_changes) .filter_map(|balance_change| async move { @@ -79,7 +79,7 @@ pub async fn query_balance( client, &token, &owner, - block_height.map(to_block_height), + Some(to_block_height(block_height)), ) .await .unwrap_or_default(); @@ -88,6 +88,7 @@ pub async fn query_balance( owner: Id::from(owner), token: Id::from(token), amount: Amount::from(amount), + height: block_height, }) }) .map(futures::future::ready) @@ -98,7 +99,7 @@ pub async fn query_balance( pub async fn query_all_balances( client: &HttpClient, - height: Option, + height: BlockHeight, ) -> anyhow::Result { let token_addr = RPC .shell() @@ -108,10 +109,13 @@ pub async fn query_all_balances( let balance_prefix = namada_token::storage_key::balance_prefix(&token_addr); - let balances = - query_storage_prefix::(client, &balance_prefix, height) - .await - .context("Failed to query all balances")?; + let balances = query_storage_prefix::( + client, + &balance_prefix, + Some(height), + ) + .await + .context("Failed to query all balances")?; let mut all_balances: Balances = vec![]; @@ -128,6 +132,7 @@ pub async fn query_all_balances( owner: Id::from(o), token: Id::from(t), amount: Amount::from(b), + height, }); } } diff --git a/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql b/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql new file mode 100644 index 00000000..db8efdcd --- /dev/null +++ b/orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql @@ -0,0 +1,18 @@ +-- This file should undo anything in `up.sql` +DROP VIEW balances; + +CREATE INDEX index_balances_owner ON balance_changes (OWNER, token); + +DROP INDEX index_balance_changes_owner_token_height; + +ALTER TABLE balance_changes + DROP CONSTRAINT balance_changes_owner_token_height_key; + +ALTER TABLE balance_changes RENAME TO balances; + +ALTER TABLE balances + ADD CONSTRAINT balances_owner_token_key UNIQUE (OWNER, token); + +ALTER TABLE balances + DROP COLUMN height; + diff --git a/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql b/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql new file mode 100644 index 00000000..53617c6c --- /dev/null +++ b/orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql @@ -0,0 +1,40 @@ +-- Your SQL goes here +ALTER TABLE balances + ADD COLUMN height integer NOT NULL DEFAULT 0; + +ALTER TABLE balances + ALTER COLUMN height DROP DEFAULT; + +ALTER TABLE balances + DROP CONSTRAINT balances_owner_token_key; + +ALTER TABLE balances RENAME TO balance_changes; + +ALTER TABLE balance_changes + ADD CONSTRAINT balance_changes_owner_token_height_key UNIQUE (OWNER, token, height); + +CREATE INDEX index_balance_changes_owner_token_height ON balance_changes (OWNER, token, height); + +DROP INDEX index_balances_owner; + +CREATE VIEW balances AS +SELECT + bc.id, + bc.owner, + bc.token, + bc.raw_amount +FROM + balance_changes bc + JOIN ( + SELECT + OWNER, + token, + MAX(height) AS max_height + FROM + balance_changes + GROUP BY + OWNER, + token) max_heights ON bc.owner = max_heights.owner + AND bc.token = max_heights.token + AND bc.height = max_heights.max_height; + diff --git a/orm/src/balances.rs b/orm/src/balances.rs index 0f5d2eb6..f81afa48 100644 --- a/orm/src/balances.rs +++ b/orm/src/balances.rs @@ -4,26 +4,38 @@ use bigdecimal::BigDecimal; use diesel::{Insertable, Queryable, Selectable}; use shared::balance::Balance; -use crate::schema::balances; +use crate::schema::balance_changes; +use crate::views::balances; #[derive(Insertable, Clone, Queryable, Selectable)] -#[diesel(table_name = balances)] +#[diesel(table_name = balance_changes)] #[diesel(check_for_backend(diesel::pg::Pg))] -pub struct BalancesInsertDb { +pub struct BalanceChangesInsertDb { pub owner: String, pub token: String, pub raw_amount: BigDecimal, + pub height: i32, } -pub type BalanceDb = BalancesInsertDb; +pub type BalanceChangeDb = BalanceChangesInsertDb; + +#[derive(Clone, Queryable, Selectable)] +#[diesel(table_name = balances)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct BalanceDb { + pub owner: String, + pub token: String, + pub raw_amount: BigDecimal, +} -impl BalancesInsertDb { +impl BalanceChangesInsertDb { pub fn from_balance(balance: Balance) -> Self { Self { owner: balance.owner.to_string(), token: balance.token.to_string(), raw_amount: BigDecimal::from_str(&balance.amount.to_string()) .expect("Invalid amount"), + height: balance.height as i32, } } } diff --git a/orm/src/lib.rs b/orm/src/lib.rs index c289f572..2fa49de7 100644 --- a/orm/src/lib.rs +++ b/orm/src/lib.rs @@ -13,3 +13,4 @@ pub mod schema; pub mod transactions; pub mod unbond; pub mod validators; +pub mod views; diff --git a/orm/src/schema.rs b/orm/src/schema.rs index ad7f4d0b..dac977ac 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -67,11 +67,12 @@ pub mod sql_types { } diesel::table! { - balances (id) { + balance_changes (id) { id -> Int4, owner -> Varchar, token -> Varchar, raw_amount -> Numeric, + height -> Int4, } } @@ -255,7 +256,7 @@ diesel::joinable!(pos_rewards -> validators (validator_id)); diesel::joinable!(unbonds -> validators (validator_id)); diesel::allow_tables_to_appear_in_same_query!( - balances, + balance_changes, bonds, chain_parameters, crawler_state, diff --git a/orm/src/views.rs b/orm/src/views.rs new file mode 100644 index 00000000..b44b5c4d --- /dev/null +++ b/orm/src/views.rs @@ -0,0 +1,9 @@ +// Manually create schema for views - see also https://github.com/diesel-rs/diesel/issues/1482 +diesel::table! { + balances (id) { + id -> Int4, + owner -> Varchar, + token -> Varchar, + raw_amount -> Numeric, + } +} diff --git a/seeder/src/main.rs b/seeder/src/main.rs index 17a8865d..d763cd14 100644 --- a/seeder/src/main.rs +++ b/seeder/src/main.rs @@ -2,7 +2,7 @@ use anyhow::Context; use clap::Parser; use clap_verbosity_flag::LevelFilter; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; -use orm::balances::BalancesInsertDb; +use orm::balances::BalanceChangesInsertDb; use orm::bond::BondInsertDb; use orm::governance_proposal::{ GovernanceProposalInsertDb, GovernanceProposalUpdateStatusDb, @@ -10,8 +10,8 @@ use orm::governance_proposal::{ use orm::governance_votes::GovernanceProposalVoteInsertDb; use orm::pos_rewards::PosRewardInsertDb; use orm::schema::{ - balances, bonds, governance_proposals, governance_votes, pos_rewards, - unbonds, validators, + balance_changes, bonds, governance_proposals, governance_votes, + pos_rewards, unbonds, validators, }; use orm::unbond::UnbondInsertDb; use orm::validators::{ValidatorDb, ValidatorInsertDb}; @@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<(), MainError> { .execute(transaction_conn) .context("Failed to remove all validators")?; - diesel::delete(balances::table) + diesel::delete(balance_changes::table) .execute(transaction_conn) .context("Failed to remove all validators")?; @@ -201,12 +201,12 @@ async fn main() -> anyhow::Result<(), MainError> { .execute(transaction_conn) .context("Failed to insert pos rewards in db")?; - diesel::insert_into(balances::table) - .values::<&Vec>( + diesel::insert_into(balance_changes::table) + .values::<&Vec>( &balances .into_iter() .map(|balance| { - BalancesInsertDb::from_balance(balance) + BalanceChangesInsertDb::from_balance(balance) }) .collect::>(), ) diff --git a/shared/src/balance.rs b/shared/src/balance.rs index eec0d1d6..9db8eb84 100644 --- a/shared/src/balance.rs +++ b/shared/src/balance.rs @@ -77,6 +77,7 @@ pub struct Balance { pub owner: Id, pub token: Id, pub amount: Amount, + pub height: u32, } pub type Balances = Vec; @@ -92,6 +93,7 @@ impl Balance { owner: Id::Account(address.to_string()), token: Id::Account(token_address.to_string()), amount: Amount::fake(), + height: (0..10000).fake::(), } } } diff --git a/webserver/src/repository/balance.rs b/webserver/src/repository/balance.rs index 702fe645..f428b212 100644 --- a/webserver/src/repository/balance.rs +++ b/webserver/src/repository/balance.rs @@ -1,7 +1,7 @@ use axum::async_trait; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; use orm::balances::BalanceDb; -use orm::schema::balances; +use orm::views::balances; use crate::appstate::AppState;