diff --git a/Cargo.lock b/Cargo.lock index 6af6b18a13..4b3f1d95ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5304,7 +5304,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efd1f82c56340fdf16f2a953d7bda4f8fdffba13d93b00844c25572110b26079" [[package]] -name = "tree_cache_updater" +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "tx_count_migration" version = "1.0.0" dependencies = [ "anyhow", @@ -5317,12 +5323,6 @@ dependencies = [ "zksync_types", ] -[[package]] -name = "try-lock" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" - [[package]] name = "typeable" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 8b207df65f..5d6e3a2f19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "core/bin/parse_pub_data", "core/bin/block_revert", "core/bin/remove_proofs", - "core/bin/tree_cache_updater", + "core/bin/tx_count_migration", # Server micro-services "core/bin/zksync_api", diff --git a/core/bin/tree_cache_updater/src/main.rs b/core/bin/tree_cache_updater/src/main.rs deleted file mode 100644 index 471a05b831..0000000000 --- a/core/bin/tree_cache_updater/src/main.rs +++ /dev/null @@ -1,75 +0,0 @@ -use structopt::StructOpt; -use zksync_config::DBConfig; -use zksync_crypto::merkle_tree::parallel_smt::SparseMerkleTreeSerializableCacheBN256; -use zksync_storage::StorageProcessor; -use zksync_types::BlockNumber; - -#[derive(Debug, StructOpt)] -#[structopt( - name = "tree_cache_updater", - about = "Tool to migrate server tree caches to the binary format." -)] -struct Opt { - /// Maximum amount of blocks to convert. - #[structopt(long)] - max_blocks: usize, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let opt = Opt::from_args(); - - let mut storage = StorageProcessor::establish_connection().await?; - let mut transaction = storage.start_transaction().await?; - - let max_block = transaction - .chain() - .block_schema() - .get_last_saved_block() - .await?; - - let min_block = std::cmp::max(max_block.0.saturating_sub(opt.max_blocks as u32), 1); // We can't go below the 1st block. - - println!( - "I'm going to convert caches for blocks from {} to {}", - min_block, max_block.0 - ); - println!("Database URL is {}", DBConfig::from_env().url); - println!("Proceed? [y/n]"); - - let mut input = String::new(); - std::io::stdin().read_line(&mut input).unwrap(); - - if input.trim().to_lowercase() == "y" { - println!("OK. Starting!"); - } else { - println!("Quitting"); - return Ok(()); - } - - // Go through the suggested blocks range. For each block in this range, if the cache exists, we will load it, convert to the bincode cache, - // and store to the binary schema. - for block in min_block..(max_block.0) { - if let Some(cache) = transaction - .chain() - .tree_cache_schema_json() - .get_account_tree_cache_block(BlockNumber(block)) - .await? - { - let cache: SparseMerkleTreeSerializableCacheBN256 = serde_json::from_value(cache)?; - let binary_cache = cache.encode_bincode(); - transaction - .chain() - .tree_cache_schema_bincode() - .store_account_tree_cache(BlockNumber(block), binary_cache) - .await?; - } - println!("Block {} processed", block); - } - - transaction.commit().await?; - - println!("Done"); - - Ok(()) -} diff --git a/core/bin/tree_cache_updater/Cargo.toml b/core/bin/tx_count_migration/Cargo.toml similarity index 96% rename from core/bin/tree_cache_updater/Cargo.toml rename to core/bin/tx_count_migration/Cargo.toml index 66dca49cec..e59043e812 100644 --- a/core/bin/tree_cache_updater/Cargo.toml +++ b/core/bin/tx_count_migration/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "tree_cache_updater" +name = "tx_count_migration" version = "1.0.0" edition = "2018" authors = ["The Matter Labs Team "] diff --git a/core/bin/tx_count_migration/src/main.rs b/core/bin/tx_count_migration/src/main.rs new file mode 100644 index 0000000000..dc136f9389 --- /dev/null +++ b/core/bin/tx_count_migration/src/main.rs @@ -0,0 +1,38 @@ +use zksync_storage::StorageProcessor; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let mut storage = StorageProcessor::establish_connection().await?; + let mut first_account = None; + loop { + let mut transaction = storage.start_transaction().await?; + match transaction + .chain() + .operations_ext_schema() + .get_accounts_range(first_account, 10000) + .await + { + Some((start_account, final_account)) => { + transaction + .chain() + .operations_ext_schema() + .update_txs_count(start_account, final_account) + .await; + println!( + "Data for accounts from {:?} to {:?} has been updated", + &start_account, &final_account, + ); + first_account = Some(final_account); + } + None => { + // We can forget about tx because we will close + // the connection without updating any data + println!("Finish"); + break; + } + } + transaction.commit().await?; + } + + Ok(()) +} diff --git a/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/down.sql b/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/down.sql deleted file mode 100644 index 291a97c5ce..0000000000 --- a/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/down.sql +++ /dev/null @@ -1 +0,0 @@ --- This file should undo anything in `up.sql` \ No newline at end of file diff --git a/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/up.sql b/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/up.sql deleted file mode 100644 index 2173484eae..0000000000 --- a/core/lib/storage/migrations/2023-01-19-190417_fill-tx_count-with-data/up.sql +++ /dev/null @@ -1,40 +0,0 @@ -DO -$$ - DECLARE - a1 BYTEA; - a2 BYTEA; - rows BIGINT = 10000; - BEGIN - LOOP - SELECT address INTO a2 - FROM - ( - SELECT DISTINCT address - FROM tx_filters - WHERE ( a1 IS NULL OR address > a1 ) - ORDER BY address - LIMIT rows - ) AS a - ORDER BY address DESC - LIMIT 1; - - IF NOT found THEN EXIT; END IF; - - INSERT INTO txs_count (address, token, count) - SELECT address,token, COUNT(DISTINCT tx_hash) - FROM tx_filters - WHERE ( a1 IS NULL OR address > a1 ) AND address <= a2 - GROUP BY (address, token) - ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count; - - INSERT INTO txs_count (address, token, count) - SELECT address, -1, COUNT(DISTINCT tx_hash) - FROM tx_filters - WHERE ( a1 IS NULL OR address > a1 ) AND address <= a2 - GROUP BY (address) - ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count; - raise info 'from %: to %', a1, a2; - a1 = a2; - END LOOP; - END -$$ \ No newline at end of file diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index 0240c51391..de7212ce84 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -1926,6 +1926,24 @@ }, "query": "SELECT * FROM blocks WHERE number = $1" }, + "2ea6ab99824c0c546c099c42d972c2cf36e14147bd22f4c45cae3e0b3ef004d7": { + "describe": { + "columns": [ + { + "name": "address", + "ordinal": 0, + "type_info": "Bytea" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT DISTINCT address\n FROM tx_filters\n ORDER BY address\n LIMIT 1\n " + }, "2f260906b05f4d37fcc1396ded15aee2ea2f8298682e4b19bb8c234e0a66ad66": { "describe": { "columns": [ @@ -2052,6 +2070,27 @@ }, "query": "\n INSERT INTO tokens ( id, address, symbol, decimals, kind )\n VALUES ( $1, $2, $3, $4, $5 )\n ON CONFLICT (id)\n DO\n UPDATE SET address = $2, symbol = $3, decimals = $4, kind = $5\n " }, + "31bcd4f1f90659273976b4200cd2e32a96566b03562e5a7eef6ee2533f5a05b2": { + "describe": { + "columns": [ + { + "name": "address", + "ordinal": 0, + "type_info": "Bytea" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Bytea", + "Int8" + ] + } + }, + "query": "\n SELECT * FROM ( \n SELECT DISTINCT address\n FROM tx_filters\n WHERE address > $1\n ORDER BY address\n LIMIT $2\n ) AS a\n ORDER BY address DESC LIMIT 1\n " + }, "32534621f625f4eb72d416e0a35e01d32b322a7efe0c1b6f477e545a1ce25f9e": { "describe": { "columns": [ @@ -7443,6 +7482,19 @@ }, "query": "INSERT INTO mempool_txs (tx_hash, tx, created_at, eth_sign_data, batch_id)\n VALUES ($1, $2, $3, $4, $5)" }, + "cd6df068718c77ed95513af99496a5248eb8318493beaca056cb7ef002218abc": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Bytea", + "Bytea" + ] + } + }, + "query": "\n INSERT INTO txs_count (address, token, count)\n SELECT address, -1, COUNT(DISTINCT tx_hash)\n FROM tx_filters\n WHERE address > $1 AND address <= $2\n GROUP BY (address)\n ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;\n " + }, "ceb8e4656aa76e1918a03707a1f047aed19ffcb3c70dbde61a6353b26b5a2493": { "describe": { "columns": [], @@ -9279,6 +9331,19 @@ }, "query": "\n SELECT tx FROM executed_transactions WHERE tx->'type' = '\"MintNFT\"' AND success = true\n ORDER BY nonce\n " }, + "fa843a3e1dcf2b0b4c56effeca90f4b56ab0ffd3ee7bc0e80fe618d292d513c9": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Bytea", + "Bytea" + ] + } + }, + "query": "\n INSERT INTO txs_count (address, token, count)\n SELECT address,token, COUNT(DISTINCT tx_hash)\n FROM tx_filters\n WHERE address > $1 AND address <= $2\n GROUP BY (address, token)\n ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;\n " + }, "fabb011dfd474fd56c71b7fb1707bbe586e66f9a45deac15b486845ba5c87979": { "describe": { "columns": [ diff --git a/core/lib/storage/src/chain/operations_ext/mod.rs b/core/lib/storage/src/chain/operations_ext/mod.rs index 18b10960f1..7a921e3e56 100644 --- a/core/lib/storage/src/chain/operations_ext/mod.rs +++ b/core/lib/storage/src/chain/operations_ext/mod.rs @@ -1400,6 +1400,85 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> { Ok(record.map(|record| TxHash::from_slice(&record.tx_hash).unwrap())) } + // TODO Remove it after migration is complete + pub async fn get_accounts_range( + &mut self, + start_account: Option
, + limit: u32, + ) -> Option<(Address, Address)> { + let start_account = match start_account { + None => { + let address = sqlx::query_scalar!( + r#" + SELECT DISTINCT address + FROM tx_filters + ORDER BY address + LIMIT 1 + "#, + ) + .fetch_one(self.0.conn()) + .await + .unwrap(); + Address::from_slice(&address) + } + Some(account) => account, + }; + + sqlx::query_scalar!( + r#" + SELECT * FROM ( + SELECT DISTINCT address + FROM tx_filters + WHERE address > $1 + ORDER BY address + LIMIT $2 + ) AS a + ORDER BY address DESC LIMIT 1 + "#, + start_account.as_bytes(), + limit as i32 + ) + .fetch_optional(self.0.conn()) + .await + .unwrap() + .map(|account| (start_account, Address::from_slice(&account))) + } + + // TODO Remove it after migration is complete + pub async fn update_txs_count(&mut self, start_account: Address, finish_account: Address) { + sqlx::query!( + r#" + INSERT INTO txs_count (address, token, count) + SELECT address,token, COUNT(DISTINCT tx_hash) + FROM tx_filters + WHERE address > $1 AND address <= $2 + GROUP BY (address, token) + ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count; + "#, + start_account.as_bytes(), + finish_account.as_bytes(), + ) + .execute(self.0.conn()) + .await + .unwrap(); + + sqlx::query!( + r#" + INSERT INTO txs_count (address, token, count) + SELECT address, -1, COUNT(DISTINCT tx_hash) + FROM tx_filters + WHERE address > $1 AND address <= $2 + GROUP BY (address) + ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count; + "#, + start_account.as_bytes(), + finish_account.as_bytes(), + ) + .execute(self.0.conn()) + .await + .unwrap(); + } + pub async fn get_account_transactions_count( &mut self, address: Address,