Skip to content

Commit

Permalink
batch insert coins test
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Oct 24, 2024
1 parent 9f84641 commit e0b7d3e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 52 deletions.
7 changes: 4 additions & 3 deletions warehouse/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use anyhow::Result;
use libra_backwards_compatibility::version_five::state_snapshot_v5::v5_accounts_from_manifest_path;
use libra_types::exports::AccountAddress;

use crate::table_structs::{WarehouseAccount, WarehouseState};
use crate::table_structs::{WarehouseAccount, WarehouseRecord, WarehouseTime};

pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseState>> {
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseRecord>> {
let account_blobs = v5_accounts_from_manifest_path(&v5_manifest_path).await?;
dbg!(&account_blobs.len());
let mut warehouse_state = vec![];
Expand All @@ -17,10 +17,11 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
Ok(a) => {
let address_literal = a.to_hex_literal();
let cast_address = AccountAddress::from_hex_literal(&address_literal)?;
let s = WarehouseState {
let s = WarehouseRecord {
account: WarehouseAccount {
address: cast_address,
},
time: WarehouseTime::default(),
balance: None,
};
warehouse_state.push(s);
Expand Down
12 changes: 6 additions & 6 deletions warehouse/src/load_account.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::table_structs::{WarehouseAccount, WarehouseState};
use crate::table_structs::{WarehouseAccount, WarehouseRecord};
use anyhow::Result;
use sqlx::{sqlite::SqliteQueryResult, QueryBuilder, Sqlite, SqlitePool};

pub async fn load_account_state(pool: &SqlitePool, accounts: &Vec<WarehouseState>) -> Result<i64> {
pub async fn load_account_state(pool: &SqlitePool, accounts: &Vec<WarehouseRecord>) -> Result<i64> {
let mut rows = 0;
// insert missing accounts
for ws in accounts.iter() {
Expand All @@ -24,7 +24,7 @@ pub async fn insert_one_account(
VALUES ($1,$2)
"#,
)
.bind(acc.address.to_string())
.bind(acc.address.to_hex_literal())
.bind(true)
.execute(pool)
.await?;
Expand All @@ -47,7 +47,7 @@ pub async fn batch_insert_account(
}

// TODO: return specific commit errors for this batch
pub async fn impl_batch_insert(pool: &SqlitePool, batch_accounts: &[WarehouseAccount]) -> Result<()> {
pub async fn impl_batch_insert(pool: &SqlitePool, batch_accounts: &[WarehouseAccount]) -> Result<SqliteQueryResult> {
let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(
// Note the trailing space; most calls to `QueryBuilder` don't automatically insert
"INSERT INTO users (account_address, is_legacy) ",
Expand All @@ -61,7 +61,7 @@ pub async fn impl_batch_insert(pool: &SqlitePool, batch_accounts: &[WarehouseAcc
query_builder.push("ON CONFLICT (account_address) DO NOTHING");

let query = query_builder.build();
let _res = query.execute(pool).await?;
let res = query.execute(pool).await?;

Ok(())
Ok(res)
}
24 changes: 12 additions & 12 deletions warehouse/src/load_coin.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::table_structs::WarehouseState;
use crate::table_structs::WarehouseRecord;
use anyhow::Result;
use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use sqlx::{sqlite::SqliteQueryResult, QueryBuilder, Sqlite, SqlitePool};


pub async fn batch_insert_account(
pool: &SqlitePool,
acc: &[WarehouseState],
acc: &[WarehouseRecord],
batch_len: usize,
) -> Result<()> {
let chunks: Vec<&[WarehouseState]> = acc.chunks(batch_len).collect();
let chunks: Vec<&[WarehouseRecord]> = acc.chunks(batch_len).collect();

for c in chunks {
impl_batch_coin_insert(pool, c).await?;
Expand All @@ -18,7 +18,7 @@ pub async fn batch_insert_account(
}

// TODO: return specific commit errors for this batch
pub async fn impl_batch_coin_insert(pool: &SqlitePool, batch_accounts: &[WarehouseState]) -> Result<()> {
pub async fn impl_batch_coin_insert(pool: &SqlitePool, batch_accounts: &[WarehouseRecord]) -> Result<SqliteQueryResult> {
let filtered = batch_accounts.iter().filter(|el| {
el.balance.is_some()
});
Expand All @@ -30,15 +30,15 @@ pub async fn impl_batch_coin_insert(pool: &SqlitePool, batch_accounts: &[Warehou
);

query_builder.push_values(filtered, |mut b, acc| {
b.push_bind(acc.account.address.to_hex_literal()).push_bind(true)
.push_bind(acc.balance.as_ref().unwrap().legacy_balance.unwrap() as i64).push_bind(true)
.push_bind(0) // todo
.push_bind(0) // todo
.push_bind(0); // todo
b.push_bind(acc.account.address.to_hex_literal())
.push_bind(acc.balance.as_ref().unwrap().legacy_balance.unwrap() as i64)
.push_bind(acc.time.timestamp as i64) // todo
.push_bind(acc.time.version as i64) // todo
.push_bind(acc.time.epoch as i64); // todo
});

let query = query_builder.build();
let _res = query.execute(pool).await?;
let res = query.execute(pool).await?;

Ok(())
Ok(res)
}
30 changes: 28 additions & 2 deletions warehouse/src/table_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,43 @@ use libra_types::exports::AccountAddress;

#[derive(Debug, Clone)]
/// The basic information for an account
pub struct WarehouseState {
pub struct WarehouseRecord {
pub account: WarehouseAccount,
pub time: WarehouseTime,
pub balance: Option<WarehouseBalance>,
}

#[allow(dead_code)]
impl WarehouseRecord {
fn new(address: AccountAddress) -> Self {
Self {
account: WarehouseAccount {
address,
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance::default()),
}
}
fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
self.time.timestamp = timestamp;
self.time.version = version;
self.time.epoch = epoch;
}
}

// holds timestamp, chain height, and epoch
#[derive(Debug, Clone, Default)]
pub struct WarehouseTime {
pub timestamp: u64,
pub version: u64,
pub epoch: u64,
}
#[derive(Debug, Clone)]
pub struct WarehouseAccount {
pub address: AccountAddress,
}

#[derive(Debug, Clone)]
#[derive(Debug, Default, Clone)]
pub struct WarehouseBalance {
// balances in v6+ terms
pub balance: u64,
Expand Down
95 changes: 66 additions & 29 deletions warehouse/tests/test_load.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::path::PathBuf;

use libra_types::exports::AccountAddress;
use libra_warehouse::table_structs::WarehouseAccount;
use libra_warehouse::extract::extract_v5_snapshot;
use libra_warehouse::table_structs::{WarehouseAccount, WarehouseBalance, WarehouseRecord, WarehouseTime};

use sqlx::SqlitePool;

Expand All @@ -12,62 +12,66 @@ fn v5_state_manifest_fixtures_path() -> PathBuf {
project_root.join("compatibility/fixtures/v5/state_ver_119757649.17a8/state.manifest")
}


#[sqlx::test]
async fn insert_one_account(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
let marlon = AccountAddress::random();
let acc = WarehouseAccount {
address: marlon
};
let acc = WarehouseAccount { address: marlon };

libra_warehouse::load_account::insert_one_account(&pool, &acc).await?;
let res = libra_warehouse::load_account::insert_one_account(&pool, &acc).await?;
assert!(res.rows_affected() == 1);

// second time should error if we are using the same account
assert!(libra_warehouse::load_account::insert_one_account(&pool, &acc).await.is_err());
assert!(
libra_warehouse::load_account::insert_one_account(&pool, &acc)
.await
.is_err()
);

Ok(())
}

#[sqlx::test]
async fn batch_insert(pool: SqlitePool) -> anyhow::Result<()>{
libra_warehouse::migrate::maybe_init(&pool).await?;
async fn batch_insert_account(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
let mut vec_acct: Vec<WarehouseAccount> = vec![];

for _i in [..3] {
let acc = WarehouseAccount {
// uniques
address: AccountAddress::random()
};
vec_acct.push(acc);
for _i in 0..3 {
let acc = WarehouseAccount {
// uniques
address: AccountAddress::random(),
};
vec_acct.push(acc);
}

libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;
Ok(())
let res = libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;
assert!(res.rows_affected() == 3);

Ok(())
}

#[sqlx::test]
async fn batch_duplicates_fail_gracefully(pool: SqlitePool) -> anyhow::Result<()>{
libra_warehouse::migrate::maybe_init(&pool).await?;
async fn batch_duplicates_fail_gracefully(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
let mut vec_acct: Vec<WarehouseAccount> = vec![];

// will create duplicates
let marlon = AccountAddress::random();

for _i in [..3] {
let acc = WarehouseAccount {
address: marlon
};
vec_acct.push(acc);
for _i in 0..3 {
let acc = WarehouseAccount { address: marlon };
vec_acct.push(acc);
}

// should not fail if duplicates exists on same batch
libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;
// should not fail if duplicates exists on same batch
let res = libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;
assert!(res.rows_affected() == 1);
// also should not fail if duplicates are on separate batches
let res = libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;
assert!(res.rows_affected() == 0);

// also should not fail if duplicates are on separate batches
libra_warehouse::load_account::impl_batch_insert(&pool, &vec_acct).await?;

Ok(())
Ok(())
}

#[sqlx::test]
Expand All @@ -86,3 +90,36 @@ async fn test_e2e_load_v5_snapshot(pool: SqlitePool) -> anyhow::Result<()> {
assert!(res == 17338);
Ok(())
}

#[sqlx::test]
async fn batch_insert_coin(pool: SqlitePool) -> anyhow::Result<()> {
libra_warehouse::migrate::maybe_init(&pool).await?;
let mut vec_state: Vec<WarehouseRecord> = vec![];

for _i in 0..3 {
let state = WarehouseRecord {
account: WarehouseAccount {
// uniques
address: AccountAddress::random(),
},
time: WarehouseTime::default(),
balance: Some(WarehouseBalance {
balance: 0,
legacy_balance: Some(10),
}),
};

vec_state.push(state);
}

// fist must load accounts
let res = libra_warehouse::load_account::load_account_state(&pool, &vec_state).await?;

assert!(res == 3);

let res = libra_warehouse::load_coin::impl_batch_coin_insert(&pool, &vec_state).await?;

assert!(res.rows_affected() == 3);

Ok(())
}

0 comments on commit e0b7d3e

Please sign in to comment.