Skip to content

Commit

Permalink
Rewrite sql tx to rust (#2327)
Browse files Browse the repository at this point in the history
Signed-off-by: Danil <[email protected]>

Signed-off-by: Danil <[email protected]>
  • Loading branch information
Deniallugo authored Jan 25, 2023
1 parent 2766f6b commit 5fd47c1
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 125 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
"core/bin/parse_pub_data",
"core/bin/block_revert",
"core/bin/remove_proofs",
"core/bin/tree_cache_updater",
"core/bin/tx_count_migration",

# Server micro-services
"core/bin/zksync_api",
Expand Down
75 changes: 0 additions & 75 deletions core/bin/tree_cache_updater/src/main.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "tree_cache_updater"
name = "tx_count_migration"
version = "1.0.0"
edition = "2018"
authors = ["The Matter Labs Team <[email protected]>"]
Expand Down
38 changes: 38 additions & 0 deletions core/bin/tx_count_migration/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use zksync_storage::StorageProcessor;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut storage = StorageProcessor::establish_connection().await?;
let mut first_account = None;
loop {
let mut transaction = storage.start_transaction().await?;
match transaction
.chain()
.operations_ext_schema()
.get_accounts_range(first_account, 10000)
.await
{
Some((start_account, final_account)) => {
transaction
.chain()
.operations_ext_schema()
.update_txs_count(start_account, final_account)
.await;
println!(
"Data for accounts from {:?} to {:?} has been updated",
&start_account, &final_account,
);
first_account = Some(final_account);
}
None => {
// We can forget about tx because we will close
// the connection without updating any data
println!("Finish");
break;
}
}
transaction.commit().await?;
}

Ok(())
}

This file was deleted.

This file was deleted.

65 changes: 65 additions & 0 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,24 @@
},
"query": "SELECT * FROM blocks WHERE number = $1"
},
"2ea6ab99824c0c546c099c42d972c2cf36e14147bd22f4c45cae3e0b3ef004d7": {
"describe": {
"columns": [
{
"name": "address",
"ordinal": 0,
"type_info": "Bytea"
}
],
"nullable": [
false
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT DISTINCT address\n FROM tx_filters\n ORDER BY address\n LIMIT 1\n "
},
"2f260906b05f4d37fcc1396ded15aee2ea2f8298682e4b19bb8c234e0a66ad66": {
"describe": {
"columns": [
Expand Down Expand Up @@ -2052,6 +2070,27 @@
},
"query": "\n INSERT INTO tokens ( id, address, symbol, decimals, kind )\n VALUES ( $1, $2, $3, $4, $5 )\n ON CONFLICT (id)\n DO\n UPDATE SET address = $2, symbol = $3, decimals = $4, kind = $5\n "
},
"31bcd4f1f90659273976b4200cd2e32a96566b03562e5a7eef6ee2533f5a05b2": {
"describe": {
"columns": [
{
"name": "address",
"ordinal": 0,
"type_info": "Bytea"
}
],
"nullable": [
false
],
"parameters": {
"Left": [
"Bytea",
"Int8"
]
}
},
"query": "\n SELECT * FROM ( \n SELECT DISTINCT address\n FROM tx_filters\n WHERE address > $1\n ORDER BY address\n LIMIT $2\n ) AS a\n ORDER BY address DESC LIMIT 1\n "
},
"32534621f625f4eb72d416e0a35e01d32b322a7efe0c1b6f477e545a1ce25f9e": {
"describe": {
"columns": [
Expand Down Expand Up @@ -7443,6 +7482,19 @@
},
"query": "INSERT INTO mempool_txs (tx_hash, tx, created_at, eth_sign_data, batch_id)\n VALUES ($1, $2, $3, $4, $5)"
},
"cd6df068718c77ed95513af99496a5248eb8318493beaca056cb7ef002218abc": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Bytea",
"Bytea"
]
}
},
"query": "\n INSERT INTO txs_count (address, token, count)\n SELECT address, -1, COUNT(DISTINCT tx_hash)\n FROM tx_filters\n WHERE address > $1 AND address <= $2\n GROUP BY (address)\n ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;\n "
},
"ceb8e4656aa76e1918a03707a1f047aed19ffcb3c70dbde61a6353b26b5a2493": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -9279,6 +9331,19 @@
},
"query": "\n SELECT tx FROM executed_transactions WHERE tx->'type' = '\"MintNFT\"' AND success = true\n ORDER BY nonce\n "
},
"fa843a3e1dcf2b0b4c56effeca90f4b56ab0ffd3ee7bc0e80fe618d292d513c9": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Bytea",
"Bytea"
]
}
},
"query": "\n INSERT INTO txs_count (address, token, count)\n SELECT address,token, COUNT(DISTINCT tx_hash)\n FROM tx_filters\n WHERE address > $1 AND address <= $2\n GROUP BY (address, token)\n ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;\n "
},
"fabb011dfd474fd56c71b7fb1707bbe586e66f9a45deac15b486845ba5c87979": {
"describe": {
"columns": [
Expand Down
79 changes: 79 additions & 0 deletions core/lib/storage/src/chain/operations_ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,85 @@ impl<'a, 'c> OperationsExtSchema<'a, 'c> {
Ok(record.map(|record| TxHash::from_slice(&record.tx_hash).unwrap()))
}

// TODO Remove it after migration is complete
pub async fn get_accounts_range(
&mut self,
start_account: Option<Address>,
limit: u32,
) -> Option<(Address, Address)> {
let start_account = match start_account {
None => {
let address = sqlx::query_scalar!(
r#"
SELECT DISTINCT address
FROM tx_filters
ORDER BY address
LIMIT 1
"#,
)
.fetch_one(self.0.conn())
.await
.unwrap();
Address::from_slice(&address)
}
Some(account) => account,
};

sqlx::query_scalar!(
r#"
SELECT * FROM (
SELECT DISTINCT address
FROM tx_filters
WHERE address > $1
ORDER BY address
LIMIT $2
) AS a
ORDER BY address DESC LIMIT 1
"#,
start_account.as_bytes(),
limit as i32
)
.fetch_optional(self.0.conn())
.await
.unwrap()
.map(|account| (start_account, Address::from_slice(&account)))
}

// TODO Remove it after migration is complete
pub async fn update_txs_count(&mut self, start_account: Address, finish_account: Address) {
sqlx::query!(
r#"
INSERT INTO txs_count (address, token, count)
SELECT address,token, COUNT(DISTINCT tx_hash)
FROM tx_filters
WHERE address > $1 AND address <= $2
GROUP BY (address, token)
ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;
"#,
start_account.as_bytes(),
finish_account.as_bytes(),
)
.execute(self.0.conn())
.await
.unwrap();

sqlx::query!(
r#"
INSERT INTO txs_count (address, token, count)
SELECT address, -1, COUNT(DISTINCT tx_hash)
FROM tx_filters
WHERE address > $1 AND address <= $2
GROUP BY (address)
ON CONFLICT( address, token) DO UPDATE SET count = EXCLUDED.count;
"#,
start_account.as_bytes(),
finish_account.as_bytes(),
)
.execute(self.0.conn())
.await
.unwrap();
}

pub async fn get_account_transactions_count(
&mut self,
address: Address,
Expand Down

0 comments on commit 5fd47c1

Please sign in to comment.