diff --git a/mm2src/coins/eth.rs b/mm2src/coins/eth.rs index 068c750d37..170091a453 100644 --- a/mm2src/coins/eth.rs +++ b/mm2src/coins/eth.rs @@ -157,6 +157,7 @@ mod eip1559_gas_fee; pub(crate) use eip1559_gas_fee::FeePerGasEstimated; use eip1559_gas_fee::{BlocknativeGasApiCaller, FeePerGasSimpleEstimator, GasApiConfig, GasApiProvider, InfuraGasApiCaller}; + pub(crate) mod eth_swap_v2; /// https://github.com/artemii235/etomic-swap/blob/master/contracts/EtomicSwap.sol @@ -912,20 +913,20 @@ pub async fn withdraw_erc1155(ctx: MmArc, withdraw_type: WithdrawErc1155) -> Wit get_valid_nft_addr_to_withdraw(coin, &withdraw_type.to, &withdraw_type.token_address)?; let token_id_str = &withdraw_type.token_id.to_string(); - let wallet_amount = eth_coin.erc1155_balance(token_addr, token_id_str).await?; + let wallet_erc1155_amount = eth_coin.erc1155_balance(token_addr, token_id_str).await?; - let amount_dec = if withdraw_type.max { - wallet_amount.clone() + let amount_uint = if withdraw_type.max { + wallet_erc1155_amount.clone() } else { - withdraw_type.amount.unwrap_or_else(|| 1.into()) + withdraw_type.amount.unwrap_or_else(|| BigUint::from(1u32)) }; - if amount_dec > wallet_amount { + if amount_uint > wallet_erc1155_amount { return MmError::err(WithdrawError::NotEnoughNftsAmount { token_address: withdraw_type.token_address, token_id: withdraw_type.token_id.to_string(), - available: wallet_amount, - required: amount_dec, + available: wallet_erc1155_amount, + required: amount_uint, }); } @@ -936,7 +937,7 @@ pub async fn withdraw_erc1155(ctx: MmArc, withdraw_type: WithdrawErc1155) -> Wit let token_id_u256 = U256::from_dec_str(token_id_str).map_to_mm(|e| NumConversError::new(format!("{:?}", e)))?; let amount_u256 = - U256::from_dec_str(&amount_dec.to_string()).map_to_mm(|e| NumConversError::new(format!("{:?}", e)))?; + U256::from_dec_str(&amount_uint.to_string()).map_to_mm(|e| NumConversError::new(format!("{:?}", e)))?; let data = function.encode_input(&[ Token::Address(my_address), Token::Address(to_addr), @@ -995,7 +996,7 @@ pub async fn withdraw_erc1155(ctx: MmArc, withdraw_type: WithdrawErc1155) -> Wit contract_type: ContractType::Erc1155, token_address: withdraw_type.token_address, token_id: withdraw_type.token_id, - amount: amount_dec, + amount: amount_uint, fee_details: Some(fee_details.into()), coin: eth_coin.ticker.clone(), block_height: 0, @@ -1086,7 +1087,7 @@ pub async fn withdraw_erc721(ctx: MmArc, withdraw_type: WithdrawErc721) -> Withd contract_type: ContractType::Erc721, token_address: withdraw_type.token_address, token_id: withdraw_type.token_id, - amount: 1.into(), + amount: BigUint::from(1u8), fee_details: Some(fee_details.into()), coin: eth_coin.ticker.clone(), block_height: 0, @@ -4412,7 +4413,7 @@ impl EthCoin { self.get_token_balance_for_address(my_address, token_address).await } - async fn erc1155_balance(&self, token_addr: Address, token_id: &str) -> MmResult { + async fn erc1155_balance(&self, token_addr: Address, token_id: &str) -> MmResult { let wallet_amount_uint = match self.coin_type { EthCoinType::Eth | EthCoinType::Nft { .. } => { let function = ERC1155_CONTRACT.function("balanceOf")?; @@ -4438,7 +4439,8 @@ impl EthCoin { )) }, }; - let wallet_amount = u256_to_big_decimal(wallet_amount_uint, self.decimals)?; + // The "balanceOf" function in ERC1155 standard returns the exact count of tokens held by address without any decimals or scaling factors + let wallet_amount = wallet_amount_uint.to_string().parse::()?; Ok(wallet_amount) } diff --git a/mm2src/coins/lp_coins.rs b/mm2src/coins/lp_coins.rs index 6c7fea7b26..cce84e99af 100644 --- a/mm2src/coins/lp_coins.rs +++ b/mm2src/coins/lp_coins.rs @@ -66,7 +66,7 @@ use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; use mm2_metrics::MetricsWeak; use mm2_number::{bigdecimal::{BigDecimal, ParseBigDecimalError, Zero}, - MmNumber}; + BigUint, MmNumber, ParseBigIntError}; use mm2_rpc::data::legacy::{EnabledCoin, GetEnabledResponse, Mm2RpcResult}; use parking_lot::Mutex as PaMutex; use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json}; @@ -2642,7 +2642,7 @@ pub enum BalanceError { UnexpectedDerivationMethod(UnexpectedDerivationMethod), #[display(fmt = "Wallet storage error: {}", _0)] WalletStorageError(String), - #[from_stringify("Bip32Error", "NumConversError")] + #[from_stringify("Bip32Error", "NumConversError", "ParseBigIntError")] #[display(fmt = "Internal: {}", _0)] Internal(String), } @@ -2994,8 +2994,8 @@ pub enum WithdrawError { NotEnoughNftsAmount { token_address: String, token_id: String, - available: BigDecimal, - required: BigDecimal, + available: BigUint, + required: BigUint, }, #[display(fmt = "DB error {}", _0)] DbError(String), diff --git a/mm2src/coins/nft.rs b/mm2src/coins/nft.rs index 2298ae9648..9fd796e60a 100644 --- a/mm2src/coins/nft.rs +++ b/mm2src/coins/nft.rs @@ -25,6 +25,8 @@ use crate::nft::nft_errors::{ClearNftDbError, MetaFromUrlError, ProtectFromSpamE use crate::nft::nft_structs::{build_nft_with_empty_meta, BuildNftFields, ClearNftDbReq, NftCommon, NftCtx, NftInfo, NftTransferCommon, PhishingDomainReq, PhishingDomainRes, RefreshMetadataReq, SpamContractReq, SpamContractRes, TransferMeta, TransferStatus, UriMeta}; +#[cfg(not(target_arch = "wasm32"))] +use crate::nft::storage::NftMigrationOps; use crate::nft::storage::{NftListStorageOps, NftTransferHistoryStorageOps}; use common::log::error; use common::parse_rfc3339_to_timestamp; @@ -155,6 +157,9 @@ pub async fn get_nft_transfers(ctx: MmArc, req: NftTransfersReq) -> MmResult MmResult<(), UpdateNft let transfer_history_initialized = NftTransferHistoryStorageOps::is_initialized(&storage, chain).await?; let from_block = if transfer_history_initialized { + #[cfg(not(target_arch = "wasm32"))] + NftMigrationOps::migrate_tx_history_if_needed(&storage, chain).await?; let last_transfer_block = NftTransferHistoryStorageOps::get_last_block_number(&storage, chain).await?; last_transfer_block.map(|b| b + 1) } else { diff --git a/mm2src/coins/nft/nft_structs.rs b/mm2src/coins/nft/nft_structs.rs index f772b92f56..b10f68d05d 100644 --- a/mm2src/coins/nft/nft_structs.rs +++ b/mm2src/coins/nft/nft_structs.rs @@ -21,6 +21,9 @@ use crate::nft::nft_errors::{LockDBError, ParseChainTypeError, ParseContractType use crate::nft::storage::{NftListStorageOps, NftTransferHistoryStorageOps}; use crate::{TransactionType, TxFeeDetails, WithdrawFee}; +#[cfg(not(target_arch = "wasm32"))] +use crate::nft::storage::NftMigrationOps; + cfg_native! { use db_common::async_sql_conn::AsyncConnection; use futures::lock::Mutex as AsyncMutex; @@ -438,7 +441,8 @@ pub struct WithdrawErc1155 { #[serde(deserialize_with = "deserialize_token_id")] pub(crate) token_id: BigUint, /// Optional amount of the token to withdraw. Defaults to 1 if not specified. - pub(crate) amount: Option, + #[serde(deserialize_with = "deserialize_opt_biguint")] + pub(crate) amount: Option, /// If set to `true`, withdraws the maximum amount available. Overrides the `amount` field. #[serde(default)] pub(crate) max: bool, @@ -489,7 +493,7 @@ pub struct TransactionNftDetails { pub(crate) token_address: String, #[serde(serialize_with = "serialize_token_id")] pub(crate) token_id: BigUint, - pub(crate) amount: BigDecimal, + pub(crate) amount: BigUint, pub(crate) fee_details: Option, /// The coin transaction belongs to pub(crate) coin: String, @@ -753,7 +757,7 @@ impl NftCtx { #[cfg(not(target_arch = "wasm32"))] pub(crate) async fn lock_db( &self, - ) -> MmResult { + ) -> MmResult { Ok(self.nft_cache_db.lock().await) } @@ -806,6 +810,19 @@ where BigUint::from_str(&s).map_err(serde::de::Error::custom) } +/// Custom deserialization function for optional BigUint. +fn deserialize_opt_biguint<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let opt: Option = Option::deserialize(deserializer)?; + if let Some(s) = opt { + BigUint::from_str(&s).map(Some).map_err(serde::de::Error::custom) + } else { + Ok(None) + } +} + /// Request parameters for clearing NFT data from the database. #[derive(Debug, Deserialize)] pub struct ClearNftDbReq { diff --git a/mm2src/coins/nft/nft_tests.rs b/mm2src/coins/nft/nft_tests.rs index 05f732a9ee..71001d8f21 100644 --- a/mm2src/coins/nft/nft_tests.rs +++ b/mm2src/coins/nft/nft_tests.rs @@ -463,7 +463,12 @@ cross_test!(test_add_get_transfers, { .clone(); assert_eq!(transfer1.block_number, 28056721); let transfer2 = storage - .get_transfer_by_tx_hash_and_log_index(&chain, TX_HASH.to_string(), LOG_INDEX) + .get_transfer_by_tx_hash_log_index_token_id( + &chain, + TX_HASH.to_string(), + LOG_INDEX, + BigUint::from_str("214300047253").unwrap(), + ) .await .unwrap() .unwrap(); diff --git a/mm2src/coins/nft/storage/db_test_helpers.rs b/mm2src/coins/nft/storage/db_test_helpers.rs index d59b845661..2565be8f2e 100644 --- a/mm2src/coins/nft/storage/db_test_helpers.rs +++ b/mm2src/coins/nft/storage/db_test_helpers.rs @@ -258,7 +258,7 @@ pub(crate) fn nft_transfer_history() -> Vec { transaction_index: Some(198), log_index: 495, value: Default::default(), - transaction_type: Some("Single".to_string()), + transaction_type: Some("Batch".to_string()), token_address: Address::from_str("0xfd913a305d70a60aac4faac70c739563738e1f81").unwrap(), from_address: Address::from_str("0x6fad0ec6bb76914b2a2a800686acc22970645820").unwrap(), to_address: Address::from_str("0xf622a6c52c94b500542e2ae6bcad24c53bc5b6a2").unwrap(), @@ -284,15 +284,15 @@ pub(crate) fn nft_transfer_history() -> Vec { confirmations: 0, }; - // Same as transfer1 but with different log_index, meaning that transfer1 and transfer2 are part of one batch/multi token transaction + // Same as transfer1 (identical tx hash and log index) but with different token_id, meaning that transfer1 and transfer2 are part of one batch/multi token transaction let transfer2 = NftTransferHistory { common: NftTransferCommon { block_hash: Some("0x3d68b78391fb3cf8570df27036214f7e9a5a6a45d309197936f51d826041bfe7".to_string()), transaction_hash: "0x1e9f04e9b571b283bde02c98c2a97da39b2bb665b57c1f2b0b733f9b681debbe".to_string(), transaction_index: Some(198), - log_index: 496, + log_index: 495, value: Default::default(), - transaction_type: Some("Single".to_string()), + transaction_type: Some("Batch".to_string()), token_address: Address::from_str("0xfd913a305d70a60aac4faac70c739563738e1f81").unwrap(), from_address: Address::from_str("0x6fad0ec6bb76914b2a2a800686acc22970645820").unwrap(), to_address: Address::from_str("0xf622a6c52c94b500542e2ae6bcad24c53bc5b6a2").unwrap(), diff --git a/mm2src/coins/nft/storage/mod.rs b/mm2src/coins/nft/storage/mod.rs index ad255100c3..3eed941b5d 100644 --- a/mm2src/coins/nft/storage/mod.rs +++ b/mm2src/coins/nft/storage/mod.rs @@ -156,11 +156,12 @@ pub trait NftTransferHistoryStorageOps { token_id: BigUint, ) -> MmResult, Self::Error>; - async fn get_transfer_by_tx_hash_and_log_index( + async fn get_transfer_by_tx_hash_log_index_token_id( &self, chain: &Chain, transaction_hash: String, log_index: u32, + token_id: BigUint, ) -> MmResult, Self::Error>; /// Updates the metadata for NFT transfers identified by their token address and ID. @@ -243,3 +244,10 @@ pub(crate) struct TransferDetailsJson { pub(crate) to_address: Address, pub(crate) fee_details: Option, } + +#[async_trait] +pub trait NftMigrationOps { + type Error: NftStorageError; + + async fn migrate_tx_history_if_needed(&self, chain: &Chain) -> MmResult<(), Self::Error>; +} diff --git a/mm2src/coins/nft/storage/sql_storage.rs b/mm2src/coins/nft/storage/sql_storage.rs index 6844b261d9..bffdaef27b 100644 --- a/mm2src/coins/nft/storage/sql_storage.rs +++ b/mm2src/coins/nft/storage/sql_storage.rs @@ -2,10 +2,10 @@ use crate::nft::eth_addr_to_hex; use crate::nft::nft_structs::{Chain, ContractType, ConvertChain, Nft, NftCommon, NftList, NftListFilters, NftTokenAddrId, NftTransferCommon, NftTransferHistory, NftTransferHistoryFilters, NftsTransferHistoryList, TransferMeta, UriMeta}; -use crate::nft::storage::{get_offset_limit, NftDetailsJson, NftListStorageOps, NftStorageError, +use crate::nft::storage::{get_offset_limit, NftDetailsJson, NftListStorageOps, NftMigrationOps, NftStorageError, NftTransferHistoryStorageOps, RemoveNftResult, TransferDetailsJson}; use async_trait::async_trait; -use db_common::async_sql_conn::{AsyncConnError, AsyncConnection}; +use db_common::async_sql_conn::{AsyncConnError, AsyncConnection, InternalError}; use db_common::sql_build::{SqlCondition, SqlQuery}; use db_common::sqlite::rusqlite::types::{FromSqlError, Type}; use db_common::sqlite::rusqlite::{Connection, Error as SqlError, Result as SqlResult, Row, Statement}; @@ -22,6 +22,8 @@ use std::convert::TryInto; use std::num::NonZeroUsize; use std::str::FromStr; +const CURRENT_SCHEMA_VERSION_TX_HISTORY: i32 = 2; + impl Chain { fn nft_list_table_name(&self) -> SqlResult { let name = self.to_ticker().to_owned() + "_nft_list"; @@ -42,6 +44,12 @@ fn scanned_nft_blocks_table_name() -> SqlResult { Ok(safe_name) } +fn schema_versions_table_name() -> SqlResult { + let name = "schema_versions".to_string(); + let safe_name = SafeTableName::new(&name)?; + Ok(safe_name) +} + fn create_nft_list_table_sql(chain: &Chain) -> MmResult { let safe_table_name = chain.nft_list_table_name()?; let sql = format!( @@ -82,6 +90,11 @@ fn create_nft_list_table_sql(chain: &Chain) -> MmResult { fn create_transfer_history_table_sql(chain: &Chain) -> Result { let safe_table_name = chain.transfer_history_table_name()?; + create_transfer_history_table_sql_custom_name(&safe_table_name) +} + +/// Supports [CURRENT_SCHEMA_VERSION_TX_HISTORY] +fn create_transfer_history_table_sql_custom_name(safe_table_name: &SafeTableName) -> Result { let sql = format!( "CREATE TABLE IF NOT EXISTS {} ( transaction_hash VARCHAR(256) NOT NULL, @@ -103,7 +116,7 @@ fn create_transfer_history_table_sql(chain: &Chain) -> Result image_domain TEXT, token_name TEXT, details_json TEXT, - PRIMARY KEY (transaction_hash, log_index) + PRIMARY KEY (transaction_hash, log_index, token_id) );", safe_table_name.inner() ); @@ -122,6 +135,18 @@ fn create_scanned_nft_blocks_sql() -> Result { Ok(sql) } +fn create_schema_versions_sql() -> Result { + let safe_table_name = schema_versions_table_name()?; + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + table_name TEXT PRIMARY KEY, + version INTEGER NOT NULL + );", + safe_table_name.inner() + ); + Ok(sql) +} + impl NftStorageError for AsyncConnError {} fn get_nft_list_builder_preimage(chains: Vec, filters: Option) -> Result { @@ -432,6 +457,15 @@ fn upsert_last_scanned_block_sql() -> Result { Ok(sql) } +fn insert_schema_version_sql() -> Result { + let schema_table = schema_versions_table_name()?; + let sql = format!( + "INSERT INTO {} (table_name, version) VALUES (?1, ?2) ON CONFLICT(table_name) DO NOTHING;", + schema_table.inner() + ); + Ok(sql) +} + fn refresh_nft_metadata_sql(chain: &Chain) -> Result { let safe_table_name = chain.nft_list_table_name()?; let sql = format!( @@ -462,12 +496,25 @@ fn update_transfer_spam_by_token_addr_id(chain: &Chain) -> Result Result { - let sql = format!( +/// Generates the SQL command to insert or update the schema version in the `schema_versions` table. +/// +/// This function creates an SQL command that attempts to insert a new row with the specified +/// `table_name` and `version`. If a row with the same `table_name` already exists, the `version` +/// field is updated to the new value provided. +fn update_schema_version_sql(schema_versions: &SafeTableName) -> String { + format!( + "INSERT INTO {} (table_name, version) + VALUES (?1, ?2) + ON CONFLICT(table_name) DO UPDATE SET version = excluded.version;", + schema_versions.inner() + ) +} + +fn select_last_block_number_sql(safe_table_name: SafeTableName) -> String { + format!( "SELECT block_number FROM {} ORDER BY block_number DESC LIMIT 1", safe_table_name.inner() - ); - Ok(sql) + ) } fn select_last_scanned_block_sql() -> MmResult { @@ -540,6 +587,13 @@ fn get_transfers_with_empty_meta_builder<'a>(conn: &'a Connection, chain: &'a Ch Ok(sql_builder) } +fn get_schema_version_stmt(conn: &Connection) -> Result { + let table_name = schema_versions_table_name()?; + let sql = format!("SELECT version FROM {} WHERE table_name = ?1;", table_name.inner()); + let stmt = conn.prepare(&sql)?; + Ok(stmt) +} + fn is_table_empty(conn: &Connection, safe_table_name: SafeTableName) -> Result { let query = format!("SELECT COUNT(*) FROM {}", safe_table_name.inner()); conn.query_row(&query, [], |row| row.get::<_, i64>(0)) @@ -777,7 +831,7 @@ impl NftListStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn get_last_block_number(&self, chain: &Chain) -> MmResult, Self::Error> { let table_name = chain.nft_list_table_name()?; - let sql = select_last_block_number_sql(table_name)?; + let sql = select_last_block_number_sql(table_name); self.call(move |conn| { let block_number = query_single_row(conn, &sql, [], block_number_from_row)?; Ok(block_number) @@ -969,8 +1023,15 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn init(&self, chain: &Chain) -> MmResult<(), Self::Error> { let sql_transfer_history = create_transfer_history_table_sql(chain)?; + let table_name = chain.transfer_history_table_name()?; self.call(move |conn| { conn.execute(&sql_transfer_history, []).map(|_| ())?; + conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?; + conn.execute(&insert_schema_version_sql()?, [ + table_name.inner(), + &CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(), + ]) + .map(|_| ())?; Ok(()) }) .await @@ -978,11 +1039,10 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { } async fn is_initialized(&self, chain: &Chain) -> MmResult { - let table_name = chain.transfer_history_table_name()?; + let table = chain.transfer_history_table_name()?; self.call(move |conn| { - let nft_list_initialized = - query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [table_name.inner()], string_from_row)?; - Ok(nft_list_initialized.is_some()) + let table_exists = query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [table.inner()], string_from_row)?; + Ok(table_exists.is_some()) }) .await .map_to_mm(AsyncConnError::from) @@ -1077,7 +1137,7 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { async fn get_last_block_number(&self, chain: &Chain) -> MmResult, Self::Error> { let table_name = chain.transfer_history_table_name()?; - let sql = select_last_block_number_sql(table_name)?; + let sql = select_last_block_number_sql(table_name); self.call(move |conn| { let block_number = query_single_row(conn, &sql, [], block_number_from_row)?; Ok(block_number) @@ -1121,22 +1181,23 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { .map_to_mm(AsyncConnError::from) } - async fn get_transfer_by_tx_hash_and_log_index( + async fn get_transfer_by_tx_hash_log_index_token_id( &self, chain: &Chain, transaction_hash: String, log_index: u32, + token_id: BigUint, ) -> MmResult, Self::Error> { let table_name = chain.transfer_history_table_name()?; let sql = format!( - "SELECT * FROM {} WHERE transaction_hash=?1 AND log_index = ?2", + "SELECT * FROM {} WHERE transaction_hash=?1 AND log_index = ?2 AND token_id = ?3", table_name.inner() ); self.call(move |conn| { let transfer = query_single_row( conn, &sql, - [transaction_hash, log_index.to_string()], + [transaction_hash, log_index.to_string(), token_id.to_string()], transfer_history_from_row, )?; Ok(transfer) @@ -1285,11 +1346,18 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { } async fn clear_history_data(&self, chain: &Chain) -> MmResult<(), Self::Error> { - let table_name = chain.transfer_history_table_name()?; + let history_table_name = chain.transfer_history_table_name()?; + let schema_table_name = schema_versions_table_name()?; + let dlt_schema_sql = format!("DELETE from {} where table_name=?1", schema_table_name.inner()); self.call(move |conn| { let sql_transaction = conn.transaction()?; - sql_transaction.execute(&format!("DROP TABLE IF EXISTS {};", table_name.inner()), [])?; + sql_transaction.execute(&format!("DROP TABLE IF EXISTS {};", history_table_name.inner()), [])?; + sql_transaction.execute(&dlt_schema_sql, [history_table_name.inner()])?; sql_transaction.commit()?; + if is_table_empty(conn, schema_table_name.clone())? { + conn.execute(&format!("DROP TABLE IF EXISTS {};", schema_table_name.inner()), []) + .map(|_| ())?; + } Ok(()) }) .await @@ -1297,12 +1365,14 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { } async fn clear_all_history_data(&self) -> MmResult<(), Self::Error> { + let schema_table = schema_versions_table_name()?; self.call(move |conn| { let sql_transaction = conn.transaction()?; for chain in Chain::variant_list().into_iter() { let table_name = chain.transfer_history_table_name()?; sql_transaction.execute(&format!("DROP TABLE IF EXISTS {};", table_name.inner()), [])?; } + sql_transaction.execute(&format!("DROP TABLE IF EXISTS {};", schema_table.inner()), [])?; sql_transaction.commit()?; Ok(()) }) @@ -1310,3 +1380,112 @@ impl NftTransferHistoryStorageOps for AsyncMutexGuard<'_, AsyncConnection> { .map_to_mm(AsyncConnError::from) } } + +fn migrate_tx_history_table_from_schema_0_to_2( + conn: &mut Connection, + history_table: &SafeTableName, + schema_table: &SafeTableName, +) -> Result<(), AsyncConnError> { + if has_primary_key_duplication(conn, history_table)? { + return Err(AsyncConnError::Internal(InternalError( + "Primary key duplication occurred in old nft tx history table".to_string(), + ))); + } + + // Start a transaction to ensure all operations are atomic + let sql_tx = conn.transaction()?; + + // Create the temporary table with the new schema + let temp_table_name = SafeTableName::new(format!("{}_temp", history_table.inner()).as_str())?; + sql_tx.execute(&create_transfer_history_table_sql_custom_name(&temp_table_name)?, [])?; + + // I don't think we need to batch the data copy process here. + // It's unlikely that the table will grow to 1 million+ rows (as an example). + let copy_data_sql = format!( + "INSERT INTO {} SELECT * FROM {};", + temp_table_name.inner(), + history_table.inner() + ); + sql_tx.execute(©_data_sql, [])?; + + let drop_old_table_sql = format!("DROP TABLE IF EXISTS {};", history_table.inner()); + sql_tx.execute(&drop_old_table_sql, [])?; + + let rename_table_sql = format!( + "ALTER TABLE {} RENAME TO {};", + temp_table_name.inner(), + history_table.inner() + ); + sql_tx.execute(&rename_table_sql, [])?; + + sql_tx.execute(&update_schema_version_sql(schema_table), [ + history_table.inner().to_string(), + CURRENT_SCHEMA_VERSION_TX_HISTORY.to_string(), + ])?; + + sql_tx.commit()?; + + Ok(()) +} + +/// Query to check for duplicates based on the primary key columns from tx history table version 2 +fn has_primary_key_duplication(conn: &Connection, safe_table_name: &SafeTableName) -> Result { + let query = format!( + "SELECT EXISTS ( + SELECT 1 + FROM {} + GROUP BY transaction_hash, log_index, token_id + HAVING COUNT(*) > 1 + );", + safe_table_name.inner() + ); + // return true if duplicates exist, false otherwise + conn.query_row(&query, [], |row| row.get::<_, i32>(0)) + .map(|exists| exists == 1) +} + +#[async_trait] +impl NftMigrationOps for AsyncMutexGuard<'_, AsyncConnection> { + type Error = AsyncConnError; + + async fn migrate_tx_history_if_needed(&self, chain: &Chain) -> MmResult<(), Self::Error> { + let history_table = chain.transfer_history_table_name()?; + let schema_table = schema_versions_table_name()?; + self.call(move |conn| { + let schema_table_exists = + query_single_row(conn, CHECK_TABLE_EXISTS_SQL, [schema_table.inner()], string_from_row)?; + + let mut version = if schema_table_exists.is_some() { + get_schema_version_stmt(conn)? + .query_row([history_table.inner()], |row| row.get(0)) + .unwrap_or(0) + } else { + conn.execute(&create_schema_versions_sql()?, []).map(|_| ())?; + 0 + }; + + while version < CURRENT_SCHEMA_VERSION_TX_HISTORY { + match version { + 0 => { + migrate_tx_history_table_from_schema_0_to_2(conn, &history_table, &schema_table)?; + }, + 1 => { + // The Tx History SQL schema didn't have version 1, but let's handle this case + // for consistency with IndexedDB versioning, where the current Tx History schema is at version 2. + }, + unsupported_version => { + return Err(AsyncConnError::Internal(InternalError(format!( + "Unsupported schema version {}", + unsupported_version + )))); + }, + } + version += 1; + } + + Ok(()) + }) + .await + .map_to_mm(AsyncConnError::from) + } +} diff --git a/mm2src/coins/nft/storage/wasm/nft_idb.rs b/mm2src/coins/nft/storage/wasm/nft_idb.rs index 054f1c058e..775a871589 100644 --- a/mm2src/coins/nft/storage/wasm/nft_idb.rs +++ b/mm2src/coins/nft/storage/wasm/nft_idb.rs @@ -3,7 +3,8 @@ use async_trait::async_trait; use mm2_db::indexed_db::InitDbResult; use mm2_db::indexed_db::{DbIdentifier, DbInstance, DbLocked, IndexedDb, IndexedDbBuilder}; -const DB_VERSION: u32 = 1; +/// prim key was changed in NftTransferHistoryTable, schemas of the other tables remain the same. +const DB_VERSION: u32 = 2; /// Represents a locked instance of the `NftCacheIDB` database. /// diff --git a/mm2src/coins/nft/storage/wasm/wasm_storage.rs b/mm2src/coins/nft/storage/wasm/wasm_storage.rs index e5ea955918..99e76ba04f 100644 --- a/mm2src/coins/nft/storage/wasm/wasm_storage.rs +++ b/mm2src/coins/nft/storage/wasm/wasm_storage.rs @@ -6,11 +6,10 @@ use crate::nft::storage::wasm::{WasmNftCacheError, WasmNftCacheResult}; use crate::nft::storage::{get_offset_limit, NftListStorageOps, NftTokenAddrId, NftTransferHistoryFilters, NftTransferHistoryStorageOps, RemoveNftResult}; use async_trait::async_trait; -use common::is_initial_upgrade; use ethereum_types::Address; -use mm2_db::indexed_db::{BeBigUint, DbTable, DbUpgrader, MultiIndex, OnUpgradeResult, TableSignature}; +use mm2_db::indexed_db::{BeBigUint, DbTable, DbUpgrader, MultiIndex, OnUpgradeError, OnUpgradeResult, TableSignature}; use mm2_err_handle::map_to_mm::MapToMmResult; -use mm2_err_handle::prelude::MmResult; +use mm2_err_handle::prelude::{MmError, MmResult}; use mm2_number::BigUint; use num_traits::ToPrimitive; use serde_json::{self as json, Value as Json}; @@ -547,18 +546,20 @@ impl NftTransferHistoryStorageOps for NftCacheIDBLocked<'_> { .collect() } - async fn get_transfer_by_tx_hash_and_log_index( + async fn get_transfer_by_tx_hash_log_index_token_id( &self, chain: &Chain, transaction_hash: String, log_index: u32, + token_id: BigUint, ) -> MmResult, Self::Error> { let db_transaction = self.get_inner().transaction().await?; let table = db_transaction.table::().await?; - let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_INDEX) + let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX) .with_value(chain.to_string())? .with_value(&transaction_hash)? - .with_value(log_index)?; + .with_value(log_index)? + .with_value(BeBigUint::from(token_id))?; if let Some((_item_id, item)) = table.get_item_by_unique_multi_index(index_keys).await? { Ok(Some(transfer_details_from_item(item)?)) @@ -602,10 +603,11 @@ impl NftTransferHistoryStorageOps for NftCacheIDBLocked<'_> { } drop_mutability!(transfer); - let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_INDEX) + let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX) .with_value(&chain_str)? .with_value(&transfer.common.transaction_hash)? - .with_value(transfer.common.log_index)?; + .with_value(transfer.common.log_index)? + .with_value(BeBigUint::from(transfer.token_id.clone()))?; let item = NftTransferHistoryTable::from_transfer_history(&transfer)?; table.replace_item_by_unique_multi_index(index_keys, &item).await?; @@ -691,10 +693,11 @@ impl NftTransferHistoryStorageOps for NftCacheIDBLocked<'_> { transfer.common.possible_spam = possible_spam; drop_mutability!(transfer); - let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_INDEX) + let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX) .with_value(&chain_str)? .with_value(&transfer.common.transaction_hash)? - .with_value(transfer.common.log_index)?; + .with_value(transfer.common.log_index)? + .with_value(BeBigUint::from(transfer.token_id.clone()))?; let item = NftTransferHistoryTable::from_transfer_history(&transfer)?; table.replace_item_by_unique_multi_index(index_keys, &item).await?; @@ -777,10 +780,11 @@ async fn update_transfer_phishing_for_index( transfer.possible_phishing = possible_phishing; drop_mutability!(transfer); let transfer_item = NftTransferHistoryTable::from_transfer_history(&transfer)?; - let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_INDEX) + let index_keys = MultiIndex::new(NftTransferHistoryTable::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX) .with_value(chain)? .with_value(&transfer.common.transaction_hash)? - .with_value(transfer.common.log_index)?; + .with_value(transfer.common.log_index)? + .with_value(BeBigUint::from(transfer.token_id))?; table .replace_item_by_unique_multi_index(index_keys, &transfer_item) .await?; @@ -876,8 +880,8 @@ pub(crate) struct NftListTable { } impl NftListTable { - const CHAIN_ANIMATION_DOMAIN_INDEX: &str = "chain_animation_domain_index"; - const CHAIN_EXTERNAL_DOMAIN_INDEX: &str = "chain_external_domain_index"; + const CHAIN_ANIMATION_DOMAIN_INDEX: &'static str = "chain_animation_domain_index"; + const CHAIN_EXTERNAL_DOMAIN_INDEX: &'static str = "chain_external_domain_index"; fn from_nft(nft: &Nft) -> WasmNftCacheResult { let details_json = json::to_value(nft).map_to_mm(|e| WasmNftCacheError::ErrorSerializing(e.to_string()))?; @@ -902,26 +906,45 @@ impl NftListTable { impl TableSignature for NftListTable { const TABLE_NAME: &'static str = "nft_list_cache_table"; - fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> { - if is_initial_upgrade(old_version, new_version) { - let table = upgrader.create_table(Self::TABLE_NAME)?; - table.create_multi_index( - CHAIN_TOKEN_ADD_TOKEN_ID_INDEX, - &["chain", "token_address", "token_id"], - true, - )?; - table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?; - table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?; - table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?; - table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?; - table.create_multi_index( - Self::CHAIN_ANIMATION_DOMAIN_INDEX, - &["chain", "animation_domain"], - false, - )?; - table.create_multi_index(Self::CHAIN_EXTERNAL_DOMAIN_INDEX, &["chain", "external_domain"], false)?; - table.create_index("chain", false)?; - table.create_index("block_number", false)?; + fn on_upgrade_needed(upgrader: &DbUpgrader, mut old_version: u32, new_version: u32) -> OnUpgradeResult<()> { + while old_version < new_version { + match old_version { + 0 => { + let table = upgrader.create_table(Self::TABLE_NAME)?; + table.create_multi_index( + CHAIN_TOKEN_ADD_TOKEN_ID_INDEX, + &["chain", "token_address", "token_id"], + true, + )?; + table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?; + table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?; + table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?; + table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?; + table.create_multi_index( + Self::CHAIN_ANIMATION_DOMAIN_INDEX, + &["chain", "animation_domain"], + false, + )?; + table.create_multi_index( + Self::CHAIN_EXTERNAL_DOMAIN_INDEX, + &["chain", "external_domain"], + false, + )?; + table.create_index("chain", false)?; + table.create_index("block_number", false)?; + }, + 1 => { + // nothing to change + }, + unsupported_version => { + return MmError::err(OnUpgradeError::UnsupportedVersion { + unsupported_version, + old_version, + new_version, + }) + }, + } + old_version += 1; } Ok(()) } @@ -951,7 +974,10 @@ pub(crate) struct NftTransferHistoryTable { } impl NftTransferHistoryTable { - const CHAIN_TX_HASH_LOG_INDEX_INDEX: &str = "chain_tx_hash_log_index_index"; + // old prim key index for DB_VERSION = 1 + const CHAIN_TX_HASH_LOG_INDEX_INDEX: &'static str = "chain_tx_hash_log_index_index"; + // prim key multi index for DB_VERSION = 2 + const CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX: &'static str = "chain_tx_hash_log_index_token_id_index"; fn from_transfer_history(transfer: &NftTransferHistory) -> WasmNftCacheResult { let details_json = @@ -983,25 +1009,47 @@ impl NftTransferHistoryTable { impl TableSignature for NftTransferHistoryTable { const TABLE_NAME: &'static str = "nft_transfer_history_cache_table"; - fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> { - if is_initial_upgrade(old_version, new_version) { - let table = upgrader.create_table(Self::TABLE_NAME)?; - table.create_multi_index( - CHAIN_TOKEN_ADD_TOKEN_ID_INDEX, - &["chain", "token_address", "token_id"], - false, - )?; - table.create_multi_index( - Self::CHAIN_TX_HASH_LOG_INDEX_INDEX, - &["chain", "transaction_hash", "log_index"], - true, - )?; - table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?; - table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?; - table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?; - table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?; - table.create_index("block_number", false)?; - table.create_index("chain", false)?; + fn on_upgrade_needed(upgrader: &DbUpgrader, mut old_version: u32, new_version: u32) -> OnUpgradeResult<()> { + while old_version < new_version { + match old_version { + 0 => { + let table = upgrader.create_table(Self::TABLE_NAME)?; + table.create_multi_index( + Self::CHAIN_TX_HASH_LOG_INDEX_INDEX, + &["chain", "transaction_hash", "log_index"], + true, + )?; + table.create_multi_index( + CHAIN_TOKEN_ADD_TOKEN_ID_INDEX, + &["chain", "token_address", "token_id"], + false, + )?; + table.create_multi_index(CHAIN_BLOCK_NUMBER_INDEX, &["chain", "block_number"], false)?; + table.create_multi_index(CHAIN_TOKEN_ADD_INDEX, &["chain", "token_address"], false)?; + table.create_multi_index(CHAIN_TOKEN_DOMAIN_INDEX, &["chain", "token_domain"], false)?; + table.create_multi_index(CHAIN_IMAGE_DOMAIN_INDEX, &["chain", "image_domain"], false)?; + table.create_index("block_number", false)?; + table.create_index("chain", false)?; + }, + 1 => { + let table = upgrader.open_table(Self::TABLE_NAME)?; + // When we change indexes during `onupgradeneeded`, IndexedDB automatically updates it with the existing records + table.create_multi_index( + Self::CHAIN_TX_HASH_LOG_INDEX_TOKEN_ID_INDEX, + &["chain", "transaction_hash", "log_index", "token_id"], + true, + )?; + table.delete_index(Self::CHAIN_TX_HASH_LOG_INDEX_INDEX)?; + }, + unsupported_version => { + return MmError::err(OnUpgradeError::UnsupportedVersion { + unsupported_version, + old_version, + new_version, + }) + }, + } + old_version += 1; } Ok(()) } @@ -1016,10 +1064,25 @@ pub(crate) struct LastScannedBlockTable { impl TableSignature for LastScannedBlockTable { const TABLE_NAME: &'static str = "last_scanned_block_table"; - fn on_upgrade_needed(upgrader: &DbUpgrader, old_version: u32, new_version: u32) -> OnUpgradeResult<()> { - if is_initial_upgrade(old_version, new_version) { - let table = upgrader.create_table(Self::TABLE_NAME)?; - table.create_index("chain", true)?; + fn on_upgrade_needed(upgrader: &DbUpgrader, mut old_version: u32, new_version: u32) -> OnUpgradeResult<()> { + while old_version < new_version { + match old_version { + 0 => { + let table = upgrader.create_table(Self::TABLE_NAME)?; + table.create_index("chain", true)?; + }, + 1 => { + // nothing to change + }, + unsupported_version => { + return MmError::err(OnUpgradeError::UnsupportedVersion { + unsupported_version, + old_version, + new_version, + }) + }, + } + old_version += 1; } Ok(()) } diff --git a/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs b/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs index 9dbaf389a3..d9ac5ec322 100644 --- a/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs +++ b/mm2src/coins/z_coin/storage/walletdb/wasm/storage.rs @@ -808,7 +808,7 @@ impl WalletRead for WalletIndexedDb { let locked_db = self.lock_db().await?; let db_transaction = locked_db.get_inner().transaction().await?; let block_headers_db = db_transaction.table::().await?; - let earlist_block = block_headers_db + let earliest_block = block_headers_db .cursor_builder() .only("ticker", &self.ticker)? .bound("height", 0u32, u32::MAX) @@ -830,7 +830,7 @@ impl WalletRead for WalletIndexedDb { .next() .await?; - if let (Some(min), Some(max)) = (earlist_block, latest_block) { + if let (Some(min), Some(max)) = (earliest_block, latest_block) { Ok(Some((BlockHeight::from(min.1.height), BlockHeight::from(max.1.height)))) } else { Ok(None) diff --git a/mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs b/mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs index 8f51a220c1..fa55d87d8c 100644 --- a/mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs +++ b/mm2src/mm2_db/src/indexed_db/drivers/upgrader.rs @@ -29,6 +29,8 @@ pub enum OnUpgradeError { old_version: u32, new_version: u32, }, + #[display(fmt = "Error occurred due to deleting the '{}' index: {}", index, description)] + ErrorDeletingIndex { index: String, description: String }, } pub struct DbUpgrader { @@ -108,4 +110,18 @@ impl TableUpgrader { description: stringify_js_error(&e), }) } + + /// Deletes an index. + /// Regardless of whether the index is created using one or multiple fields, the deleteIndex() + /// method is used to delete any type of index, and it works in the same way for both. + /// https://developer.mozilla.org/en-US/docs/Web/API/IDBObjectStore/deleteIndex + pub fn delete_index(&self, index: &str) -> OnUpgradeResult<()> { + self.object_store + .delete_index(index) + .map(|_| ()) + .map_to_mm(|e| OnUpgradeError::ErrorDeletingIndex { + index: index.to_owned(), + description: stringify_js_error(&e), + }) + } } diff --git a/mm2src/mm2_main/src/lp_swap/saved_swap.rs b/mm2src/mm2_main/src/lp_swap/saved_swap.rs index de6d379b5c..c471f7eacb 100644 --- a/mm2src/mm2_main/src/lp_swap/saved_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/saved_swap.rs @@ -278,13 +278,13 @@ mod wasm_impl { .cursor_builder() .bound("migration", 0, u32::MAX) .reverse() + .where_first() .open_cursor("migration") .await? - // TODO refactor when "closure invoked recursively or after being dropped" is fixed - .collect() + .next() .await?; - Ok(migrations.first().map(|(_, m)| m.migration).unwrap_or_default()) + Ok(migrations.map(|(_, m)| m.migration).unwrap_or_default()) } pub async fn migrate_swaps_data(ctx: &MmArc) -> MmResult<(), SavedSwapError> { diff --git a/mm2src/mm2_main/tests/docker_tests/eth_docker_tests.rs b/mm2src/mm2_main/tests/docker_tests/eth_docker_tests.rs index 293610e102..5fc3b7ea81 100644 --- a/mm2src/mm2_main/tests/docker_tests/eth_docker_tests.rs +++ b/mm2src/mm2_main/tests/docker_tests/eth_docker_tests.rs @@ -171,7 +171,7 @@ fn geth_erc712_owner(token_id: U256) -> Address { block_on(erc721_contract.query("ownerOf", Token::Uint(token_id), None, Options::default(), None)).unwrap() } -fn mint_erc1155(to_addr: Address, token_id: U256, amount: U256) { +fn mint_erc1155(to_addr: Address, token_id: U256, amount: u32) { let _guard = GETH_NONCE_LOCK.lock().unwrap(); let erc1155_contract = Contract::from_json(GETH_WEB3.eth(), geth_erc1155_contract(), ERC1155_TEST_ABI.as_bytes()).unwrap(); @@ -181,7 +181,7 @@ fn mint_erc1155(to_addr: Address, token_id: U256, amount: U256) { ( Token::Address(to_addr), Token::Uint(token_id), - Token::Uint(amount), + Token::Uint(U256::from(amount)), Token::Bytes("".into()), ), geth_account(), @@ -200,10 +200,15 @@ fn mint_erc1155(to_addr: Address, token_id: U256, amount: U256) { )) .unwrap(); + // check that "balanceOf" from ERC11155 returns the exact amount of token without any decimals or scaling factors + let balance_dec = balance.to_string().parse::().unwrap(); assert_eq!( - balance, amount, + balance_dec, + BigDecimal::from(amount), "The balance of tokenId {:?} for address {:?} does not match the expected amount {:?}.", - token_id, to_addr, amount + token_id, + to_addr, + amount ); } @@ -381,7 +386,7 @@ fn global_nft_with_random_privkey( if let Some(nft_type) = nft_type { match nft_type { TestNftType::Erc1155 { token_id, amount } => { - mint_erc1155(my_address, U256::from(token_id), U256::from(amount)); + mint_erc1155(my_address, U256::from(token_id), amount); block_on(fill_erc1155_info( &global_nft, geth_erc1155_contract(), diff --git a/mm2src/mm2_number/src/lib.rs b/mm2src/mm2_number/src/lib.rs index 6d1d20f1ca..ba8b520ec0 100644 --- a/mm2src/mm2_number/src/lib.rs +++ b/mm2src/mm2_number/src/lib.rs @@ -13,7 +13,7 @@ pub use num_bigint; pub use num_rational; pub use bigdecimal::BigDecimal; -pub use num_bigint::{BigInt, BigUint}; +pub use num_bigint::{BigInt, BigUint, ParseBigIntError}; pub use num_rational::BigRational; pub use paste::paste;