Skip to content

Commit

Permalink
batch insert coins and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Oct 24, 2024
1 parent 9f84641 commit 82a348f
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 68 deletions.
7 changes: 4 additions & 3 deletions warehouse/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use anyhow::Result;
use libra_backwards_compatibility::version_five::state_snapshot_v5::v5_accounts_from_manifest_path;
use libra_types::exports::AccountAddress;

use crate::table_structs::{WarehouseAccount, WarehouseState};
use crate::table_structs::{WarehouseAccount, WarehouseRecord, WarehouseTime};

pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseState>> {
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseRecord>> {
let account_blobs = v5_accounts_from_manifest_path(&v5_manifest_path).await?;
dbg!(&account_blobs.len());
let mut warehouse_state = vec![];
Expand All @@ -17,10 +17,11 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
Ok(a) => {
let address_literal = a.to_hex_literal();
let cast_address = AccountAddress::from_hex_literal(&address_literal)?;
let s = WarehouseState {
let s = WarehouseRecord {
account: WarehouseAccount {
address: cast_address,
},
time: WarehouseTime::default(),
balance: None,
};
warehouse_state.push(s);
Expand Down
27 changes: 14 additions & 13 deletions warehouse/src/load_account.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::table_structs::{WarehouseAccount, WarehouseState};
use crate::table_structs::{WarehouseAccount, WarehouseRecord};
use anyhow::Result;
use sqlx::{sqlite::SqliteQueryResult, QueryBuilder, Sqlite, SqlitePool};

pub async fn load_account_state(pool: &SqlitePool, accounts: &Vec<WarehouseState>) -> Result<i64> {
pub async fn load_account_state(pool: &SqlitePool, accounts: &Vec<WarehouseRecord>) -> Result<i64> {
let mut rows = 0;
// insert missing accounts
for ws in accounts.iter() {
Expand All @@ -24,7 +24,7 @@ pub async fn insert_one_account(
VALUES ($1,$2)
"#,
)
.bind(acc.address.to_string())
.bind(acc.address.to_hex_literal())
.bind(true)
.execute(pool)
.await?;
Expand All @@ -34,34 +34,35 @@ pub async fn insert_one_account(

pub async fn batch_insert_account(
pool: &SqlitePool,
acc: &[WarehouseAccount],
acc: &[WarehouseRecord],
batch_len: usize,
) -> Result<()> {
let chunks: Vec<&[WarehouseAccount]> = acc.chunks(batch_len).collect();

) -> Result<u64> {
let chunks: Vec<&[WarehouseRecord]> = acc.chunks(batch_len).collect();
let mut rows = 0;
for c in chunks {
impl_batch_insert(pool, c).await?;
let res = impl_batch_insert(pool, c).await?;
rows += res.rows_affected();
}

Ok(())
Ok(rows)
}

// TODO: return specific commit errors for this batch
pub async fn impl_batch_insert(pool: &SqlitePool, batch_accounts: &[WarehouseAccount]) -> Result<()> {
pub async fn impl_batch_insert(pool: &SqlitePool, batch_accounts: &[WarehouseRecord]) -> Result<SqliteQueryResult> {
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
// Note the trailing space; most calls to `QueryBuilder` don't automatically insert
"INSERT INTO users (account_address, is_legacy) ",
);

query_builder.push_values(batch_accounts, |mut b, acc| {
b.push_bind(acc.address.to_hex_literal()).push_bind(true);
b.push_bind(acc.account.address.to_hex_literal()).push_bind(true);
});

// makes sure the txs don't fail on repeated attempts to add users
query_builder.push("ON CONFLICT (account_address) DO NOTHING");

let query = query_builder.build();
let _res = query.execute(pool).await?;
let res = query.execute(pool).await?;

Ok(())
Ok(res)
}
95 changes: 78 additions & 17 deletions warehouse/src/load_coin.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::table_structs::WarehouseState;
use crate::table_structs::WarehouseRecord;
use anyhow::Result;
use sqlx::{QueryBuilder, Sqlite, SqlitePool};

use sqlx::{sqlite::SqliteQueryResult, QueryBuilder, Sqlite, SqlitePool};

pub async fn batch_insert_account(
pool: &SqlitePool,
acc: &[WarehouseState],
acc: &[WarehouseRecord],
batch_len: usize,
) -> Result<()> {
let chunks: Vec<&[WarehouseState]> = acc.chunks(batch_len).collect();
let chunks: Vec<&[WarehouseRecord]> = acc.chunks(batch_len).collect();

for c in chunks {
impl_batch_coin_insert(pool, c).await?;
Expand All @@ -18,27 +17,89 @@ pub async fn batch_insert_account(
}

// TODO: return specific commit errors for this batch
pub async fn impl_batch_coin_insert(pool: &SqlitePool, batch_accounts: &[WarehouseState]) -> Result<()> {
let filtered = batch_accounts.iter().filter(|el| {
el.balance.is_some()
});
pub async fn impl_batch_coin_insert(
pool: &SqlitePool,
batch_accounts: &[WarehouseRecord],
) -> Result<SqliteQueryResult> {
let filtered = batch_accounts.iter().filter(|el| el.balance.is_some());

let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
r#"
r#"
INSERT INTO balance (account_address, balance, chain_timestamp, db_version, epoch_number)
"#,
);

query_builder.push_values(filtered, |mut b, acc| {
b.push_bind(acc.account.address.to_hex_literal()).push_bind(true)
.push_bind(acc.balance.as_ref().unwrap().legacy_balance.unwrap() as i64).push_bind(true)
.push_bind(0) // todo
.push_bind(0) // todo
.push_bind(0); // todo
let this_account = acc.account.address.to_hex_literal();
let this_balance = acc.balance.as_ref().unwrap().legacy_balance.unwrap() as i64;
let this_timestamp = acc.time.timestamp as i64;
b.push_bind(this_account)
.push_bind(this_balance)
.push_bind(this_timestamp) // todo
.push_bind(acc.time.version as i64) // todo
.push_bind(acc.time.epoch as i64); // todo;
});

let query = query_builder.build();
let _res = query.execute(pool).await?;
let res = query.execute(pool).await?;

Ok(())
Ok(res)
}

pub async fn alt_increment_one_balance(
pool: &SqlitePool,
record: &WarehouseRecord,
) -> Result<SqliteQueryResult> {
// let filtered = batch_accounts.iter().filter(|el| el.balance.is_some());

let mut query_builder: QueryBuilder<Sqlite> =
QueryBuilder::new(increment_balance_template(record));
let query = query_builder.build();
let res = query.execute(pool).await?;

Ok(res)
}

fn increment_balance_template(record: &WarehouseRecord) -> String {
let this_account = record.account.address.to_hex_literal();
let this_balance = record.balance.as_ref().unwrap().legacy_balance.unwrap() as i64;
let this_timestamp = record.time.timestamp as i64;
let this_version = record.time.version as i64;
let this_epoch = record.time.epoch as i64;

let query_template = format!(
r#"
INSERT INTO balance (account_address, balance, chain_timestamp, db_version, epoch_number)
SELECT '{this_account}', {this_balance}, {this_timestamp}, {this_version}, {this_epoch}
WHERE NOT EXISTS (
SELECT 1
FROM balance
WHERE account_address = '{this_account}'
AND balance = {this_balance}
ORDER BY chain_timestamp DESC
LIMIT 1
);"#
);

query_template
}

#[test]
fn test_format() {
use crate::table_structs::{WarehouseAccount, WarehouseBalance, WarehouseTime};
use libra_types::exports::AccountAddress;

let record = WarehouseRecord {
account: WarehouseAccount {
// uniques
address: AccountAddress::random(),
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance {
balance: 0,
legacy_balance: Some(10),
}),
};
let s = increment_balance_template(&record);
dbg!(&s);
}
29 changes: 27 additions & 2 deletions warehouse/src/table_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,42 @@ use libra_types::exports::AccountAddress;

#[derive(Debug, Clone)]
/// The basic information for an account
pub struct WarehouseState {
pub struct WarehouseRecord {
pub account: WarehouseAccount,
pub time: WarehouseTime,
pub balance: Option<WarehouseBalance>,
}

impl WarehouseRecord {
pub fn new(address: AccountAddress) -> Self {
Self {
account: WarehouseAccount {
address,
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance::default()),
}
}
pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
self.time.timestamp = timestamp;
self.time.version = version;
self.time.epoch = epoch;
}
}

// holds timestamp, chain height, and epoch
#[derive(Debug, Clone, Default)]
pub struct WarehouseTime {
pub timestamp: u64,
pub version: u64,
pub epoch: u64,
}
#[derive(Debug, Clone)]
pub struct WarehouseAccount {
pub address: AccountAddress,
}

#[derive(Debug, Clone)]
#[derive(Debug, Default, Clone)]
pub struct WarehouseBalance {
// balances in v6+ terms
pub balance: u64,
Expand Down
Loading

0 comments on commit 82a348f

Please sign in to comment.