Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] store balances at block heights - implementation #81

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,13 @@ rand = "0.8.5"
bigdecimal = "0.4.5"
strum = "0.26.3"
strum_macros = "0.26.3"

[patch.'https://github.com/anoma/namada']
namada_core = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_sdk = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_tx = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_governance = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_ibc = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_token = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_parameters = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
namada_proof_of_stake = { git = "https://github.com/joel-u410/namada.git", tag = "v0.42.0-u410" }
11 changes: 7 additions & 4 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ async fn crawling_fn(
.into_rpc_error()?;

let addresses = block.addresses_with_balance_change(native_token);
let balances = namada_service::query_balance(&client, &addresses)
.await
.into_rpc_error()?;
let balances =
namada_service::query_balance(&client, &addresses, block_height)
.await
.into_rpc_error()?;
tracing::info!("Updating balance for {} addresses...", addresses.len());

let next_governance_proposal_id =
Expand Down Expand Up @@ -326,7 +327,9 @@ async fn initial_query(
sleep(Duration::from_secs(initial_query_retry_time)).await;
}

let balances = query_all_balances(client).await.into_rpc_error()?;
let balances = query_all_balances(client, block_height)
.await
.into_rpc_error()?;

tracing::info!("Querying bonds and unbonds...");
let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None)
Expand Down
48 changes: 29 additions & 19 deletions chain/src/repository/balance.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
use anyhow::Context;
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::balances::BalancesInsertDb;
use orm::schema::balances;
use diesel::{PgConnection, RunQueryDsl};
use orm::balances::BalanceChangesInsertDb;
use orm::schema::balance_changes;
use shared::balance::Balances;

pub fn insert_balance(
transaction_conn: &mut PgConnection,
balances: Balances,
) -> anyhow::Result<()> {
diesel::insert_into(balances::table)
.values::<&Vec<BalancesInsertDb>>(
&balances
.into_iter()
.map(BalancesInsertDb::from_balance)
.collect::<Vec<_>>(),
)
.on_conflict((balances::columns::owner, balances::columns::token))
.do_update()
.set(
balances::columns::raw_amount
.eq(excluded(balances::columns::raw_amount)),
)
.execute(transaction_conn)
.context("Failed to update balances in db")?;
tracing::info!("Inserting {} balances into db", balances.len());

// Group balances into chunks to avoid hitting the limit of the number of bind parameters in one query.
balances
.chunks(10000)
.into_iter()
.try_for_each(|balances_chunk| {
diesel::insert_into(balance_changes::table)
.values::<&Vec<BalanceChangesInsertDb>>(
&balances_chunk
.to_vec()
.into_iter()
.map(BalanceChangesInsertDb::from_balance)
.collect::<Vec<_>>(),
)
.on_conflict((
balance_changes::columns::owner,
balance_changes::columns::token,
balance_changes::columns::height,
))
.do_nothing()
.execute(transaction_conn)
.context("Failed to update balances in db")?;

anyhow::Ok(())
})?;

anyhow::Ok(())
}
30 changes: 22 additions & 8 deletions chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn get_epoch_at_block_height(
pub async fn query_balance(
client: &HttpClient,
balance_changes: &HashSet<BalanceChange>,
block_height: BlockHeight,
) -> anyhow::Result<Balances> {
Ok(futures::stream::iter(balance_changes)
.filter_map(|balance_change| async move {
Expand All @@ -86,14 +87,20 @@ pub async fn query_balance(
.context("Failed to parse token address")
.ok()?;

let amount = rpc::get_token_balance(client, &token, &owner)
.await
.unwrap_or_default();
let amount = rpc::get_token_balance(
client,
&token,
&owner,
Some(to_block_height(block_height)),
)
.await
.unwrap_or_default();

Some(Balance {
owner: Id::from(owner),
token: Id::from(token),
amount: Amount::from(amount),
height: block_height,
})
})
.map(futures::future::ready)
Expand All @@ -104,6 +111,7 @@ pub async fn query_balance(

pub async fn query_all_balances(
client: &HttpClient,
height: BlockHeight,
) -> anyhow::Result<Balances> {
let token_addr = RPC
.shell()
Expand All @@ -113,10 +121,13 @@ pub async fn query_all_balances(

let balance_prefix = namada_token::storage_key::balance_prefix(&token_addr);

let balances =
query_storage_prefix::<token::Amount>(client, &balance_prefix)
.await
.context("Failed to query all balances")?;
let balances = query_storage_prefix::<token::Amount>(
client,
&balance_prefix,
Some(height),
)
.await
.context("Failed to query all balances")?;

let mut all_balances: Balances = vec![];

Expand All @@ -133,6 +144,7 @@ pub async fn query_all_balances(
owner: Id::from(o),
token: Id::from(t),
amount: Amount::from(b),
height,
});
}
}
Expand Down Expand Up @@ -522,6 +534,8 @@ pub async fn query_all_votes(
anyhow::Ok(votes.iter().flatten().cloned().collect())
}

fn to_block_height(block_height: u32) -> NamadaSdkBlockHeight {
pub(super) fn to_block_height(
block_height: BlockHeight,
) -> NamadaSdkBlockHeight {
NamadaSdkBlockHeight::from(block_height as u64)
}
11 changes: 10 additions & 1 deletion chain/src/services/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@ use namada_sdk::queries::RPC;
use namada_sdk::storage::{self, PrefixValue};
use tendermint_rpc::HttpClient;

use shared::block::BlockHeight;

/// Query a range of storage values with a matching prefix and decode them with
/// [`BorshDeserialize`]. Returns an iterator of the storage keys paired with
/// their associated values.
pub async fn query_storage_prefix<T>(
client: &HttpClient,
key: &storage::Key,
height: Option<BlockHeight>,
) -> anyhow::Result<Option<impl Iterator<Item = (storage::Key, T)>>>
where
T: BorshDeserialize,
{
let values = RPC
.shell()
.storage_prefix(client, None, None, false, key)
.storage_prefix(
client,
None,
height.map(super::namada::to_block_height),
false,
key,
)
.await?;

let decode = |PrefixValue { key, value }: PrefixValue| {
Expand Down
18 changes: 18 additions & 0 deletions orm/migrations/2024-07-19-203433_add_height_to_balances/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- This file should undo anything in `up.sql`
DROP VIEW balances;

CREATE INDEX index_balances_owner ON balance_changes (OWNER, token);

DROP INDEX index_balance_changes_owner_token_height;

ALTER TABLE balance_changes
DROP CONSTRAINT balance_changes_owner_token_height_key;

ALTER TABLE balance_changes RENAME TO balances;

ALTER TABLE balances
ADD CONSTRAINT balances_owner_token_key UNIQUE (OWNER, token);

ALTER TABLE balances
DROP COLUMN height;

40 changes: 40 additions & 0 deletions orm/migrations/2024-07-19-203433_add_height_to_balances/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Your SQL goes here
ALTER TABLE balances
ADD COLUMN height integer NOT NULL DEFAULT 0;

ALTER TABLE balances
ALTER COLUMN height DROP DEFAULT;

ALTER TABLE balances
DROP CONSTRAINT balances_owner_token_key;

ALTER TABLE balances RENAME TO balance_changes;

ALTER TABLE balance_changes
ADD CONSTRAINT balance_changes_owner_token_height_key UNIQUE (OWNER, token, height);

CREATE INDEX index_balance_changes_owner_token_height ON balance_changes (OWNER, token, height);

DROP INDEX index_balances_owner;

CREATE VIEW balances AS
SELECT
bc.id,
bc.owner,
bc.token,
bc.raw_amount
FROM
balance_changes bc
JOIN (
SELECT
OWNER,
token,
MAX(height) AS max_height
FROM
balance_changes
GROUP BY
OWNER,
token) max_heights ON bc.owner = max_heights.owner
AND bc.token = max_heights.token
AND bc.height = max_heights.max_height;

22 changes: 17 additions & 5 deletions orm/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,38 @@ use bigdecimal::BigDecimal;
use diesel::{Insertable, Queryable, Selectable};
use shared::balance::Balance;

use crate::schema::balances;
use crate::schema::balance_changes;
use crate::views::balances;

#[derive(Insertable, Clone, Queryable, Selectable)]
#[diesel(table_name = balances)]
#[diesel(table_name = balance_changes)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct BalancesInsertDb {
pub struct BalanceChangesInsertDb {
pub owner: String,
pub token: String,
pub raw_amount: BigDecimal,
pub height: i32,
}

pub type BalanceDb = BalancesInsertDb;
pub type BalanceChangeDb = BalanceChangesInsertDb;

#[derive(Clone, Queryable, Selectable)]
#[diesel(table_name = balances)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct BalanceDb {
pub owner: String,
pub token: String,
pub raw_amount: BigDecimal,
}

impl BalancesInsertDb {
impl BalanceChangesInsertDb {
pub fn from_balance(balance: Balance) -> Self {
Self {
owner: balance.owner.to_string(),
token: balance.token.to_string(),
raw_amount: BigDecimal::from_str(&balance.amount.to_string())
.expect("Invalid amount"),
height: balance.height as i32,
}
}
}
1 change: 1 addition & 0 deletions orm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ pub mod schema;
pub mod transactions;
pub mod unbond;
pub mod validators;
pub mod views;
5 changes: 3 additions & 2 deletions orm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ pub mod sql_types {
}

diesel::table! {
balances (id) {
balance_changes (id) {
id -> Int4,
owner -> Varchar,
token -> Varchar,
raw_amount -> Numeric,
height -> Int4,
}
}

Expand Down Expand Up @@ -256,7 +257,7 @@ diesel::joinable!(pos_rewards -> validators (validator_id));
diesel::joinable!(unbonds -> validators (validator_id));

diesel::allow_tables_to_appear_in_same_query!(
balances,
balance_changes,
bonds,
chain_parameters,
crawler_state,
Expand Down
9 changes: 9 additions & 0 deletions orm/src/views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Manually create schema for views - see also https://github.com/diesel-rs/diesel/issues/1482
diesel::table! {
balances (id) {
id -> Int4,
owner -> Varchar,
token -> Varchar,
raw_amount -> Numeric,
}
}
14 changes: 7 additions & 7 deletions seeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use anyhow::Context;
use clap::Parser;
use clap_verbosity_flag::LevelFilter;
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper};
use orm::balances::BalancesInsertDb;
use orm::balances::BalanceChangesInsertDb;
use orm::bond::BondInsertDb;
use orm::governance_proposal::{
GovernanceProposalInsertDb, GovernanceProposalUpdateStatusDb,
};
use orm::governance_votes::GovernanceProposalVoteInsertDb;
use orm::pos_rewards::PosRewardInsertDb;
use orm::schema::{
balances, bonds, governance_proposals, governance_votes, pos_rewards,
unbonds, validators,
balance_changes, bonds, governance_proposals, governance_votes,
pos_rewards, unbonds, validators,
};
use orm::unbond::UnbondInsertDb;
use orm::validators::{ValidatorDb, ValidatorInsertDb};
Expand Down Expand Up @@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<(), MainError> {
.execute(transaction_conn)
.context("Failed to remove all validators")?;

diesel::delete(balances::table)
diesel::delete(balance_changes::table)
.execute(transaction_conn)
.context("Failed to remove all validators")?;

Expand Down Expand Up @@ -201,12 +201,12 @@ async fn main() -> anyhow::Result<(), MainError> {
.execute(transaction_conn)
.context("Failed to insert pos rewards in db")?;

diesel::insert_into(balances::table)
.values::<&Vec<BalancesInsertDb>>(
diesel::insert_into(balance_changes::table)
.values::<&Vec<BalanceChangesInsertDb>>(
&balances
.into_iter()
.map(|balance| {
BalancesInsertDb::from_balance(balance)
BalanceChangesInsertDb::from_balance(balance)
})
.collect::<Vec<_>>(),
)
Expand Down
Loading