Skip to content

Commit

Permalink
feat(storage): save enum indices in RocksDB (#162)
Browse files Browse the repository at this point in the history
# What ❔

Enumeration indices now are saved along with values in the same column
family. Indices are added gradually for old DB entries. The number of
keys processed each L1 batch is configurable.

## Why ❔

Enumeration indices in storage are necessary for boojum upgrade.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
perekopskiy authored Oct 10, 2023
1 parent 5352b49 commit bab099d
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 83 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ pub struct OptionalENConfig {
/// Whether to try running EN with MultiVM.
#[serde(default)]
pub experimental_multivm_support: bool,
/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")]
pub enum_index_migration_chunk_size: usize,
}

impl OptionalENConfig {
Expand Down Expand Up @@ -283,6 +286,10 @@ impl OptionalENConfig {
10
}

const fn default_enum_index_migration_chunk_size() -> usize {
1000
}

pub fn polling_interval(&self) -> Duration {
Duration::from_millis(self.polling_interval)
}
Expand Down
1 change: 1 addition & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async fn build_state_keeper(
max_allowed_l2_tx_gas_limit,
save_call_traces,
false,
config.optional.enum_index_migration_chunk_size,
));

let io = Box::new(
Expand Down
9 changes: 9 additions & 0 deletions core/lib/config/src/configs/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct StateKeeperConfig {
/// Flag which will enable storage to cache witness_inputs during State Keeper's run.
/// NOTE: This will slow down StateKeeper, to be used in non-production environments!
pub upload_witness_inputs_to_gcs: bool,

/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
pub enum_index_migration_chunk_size: Option<usize>,
}

impl StateKeeperConfig {
Expand All @@ -122,6 +125,10 @@ impl StateKeeperConfig {
default_aa: self.default_aa_hash,
}
}

pub fn enum_index_migration_chunk_size(&self) -> usize {
self.enum_index_migration_chunk_size.unwrap_or(1_000)
}
}

#[derive(Debug, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -226,6 +233,7 @@ mod tests {
virtual_blocks_interval: 1,
virtual_blocks_per_miniblock: 1,
upload_witness_inputs_to_gcs: false,
enum_index_migration_chunk_size: Some(2_000),
},
operations_manager: OperationsManagerConfig {
delay_interval: 100,
Expand Down Expand Up @@ -273,6 +281,7 @@ mod tests {
CHAIN_STATE_KEEPER_VALIDATION_COMPUTATIONAL_GAS_LIMIT="10000000"
CHAIN_STATE_KEEPER_SAVE_CALL_TRACES="false"
CHAIN_STATE_KEEPER_UPLOAD_WITNESS_INPUTS_TO_GCS="false"
CHAIN_STATE_KEEPER_ENUM_INDEX_MIGRATION_CHUNK_SIZE="2000"
CHAIN_OPERATIONS_MANAGER_DELAY_INTERVAL="100"
CHAIN_MEMPOOL_SYNC_INTERVAL_MS="10"
CHAIN_MEMPOOL_SYNC_BATCH_SIZE="1000"
Expand Down
58 changes: 32 additions & 26 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -9099,6 +9099,38 @@
},
"query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $2\n WHERE id = (\n SELECT id\n FROM prover_jobs_fri\n WHERE status = 'queued'\n AND protocol_version = ANY($1)\n ORDER BY aggregation_round DESC, l1_batch_number ASC, id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n "
},
"d1c82bd0b3c010569937ad7600760fa0c3aca7c9585bbf9598a5c0515b431b26": {
"describe": {
"columns": [
{
"name": "hashed_key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "l1_batch_number",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "index",
"ordinal": 2,
"type_info": "Int8"
}
],
"nullable": [
false,
false,
false
],
"parameters": {
"Left": [
"ByteaArray"
]
}
},
"query": "SELECT hashed_key, l1_batch_number, index FROM initial_writes WHERE hashed_key = ANY($1::bytea[])"
},
"d5dea31f2a325bb44e8ef2cbbabbeb73fd6996a3e6cb99d62c6b97a4aa49c1ca": {
"describe": {
"columns": [
Expand Down Expand Up @@ -9394,32 +9426,6 @@
},
"query": "UPDATE l1_batches SET skip_proof = TRUE WHERE number = $1"
},
"da01d59119023c822cffa5dc226e82b2abd4cbd46d3856d7db16289868a27fa1": {
"describe": {
"columns": [
{
"name": "hashed_key",
"ordinal": 0,
"type_info": "Bytea"
},
{
"name": "l1_batch_number",
"ordinal": 1,
"type_info": "Int8"
}
],
"nullable": [
false,
false
],
"parameters": {
"Left": [
"ByteaArray"
]
}
},
"query": "SELECT hashed_key, l1_batch_number FROM initial_writes WHERE hashed_key = ANY($1::bytea[])"
},
"dc16d0fac093a52480b66dfcb5976fb01e6629e8c982c265f2af1d5000090572": {
"describe": {
"columns": [
Expand Down
36 changes: 23 additions & 13 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use sqlx::Row;

use std::{collections::HashMap, time::Instant};

use crate::StorageProcessor;
use crate::{instrument::InstrumentExt, StorageProcessor};
use zksync_types::{
get_code_key, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog,
FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256,
Expand Down Expand Up @@ -244,7 +244,7 @@ impl StorageLogsDal<'_, '_> {
pub async fn get_storage_logs_for_revert(
&mut self,
l1_batch_number: L1BatchNumber,
) -> HashMap<H256, Option<H256>> {
) -> HashMap<H256, Option<(H256, u64)>> {
let miniblock_range = self
.storage
.blocks_dal()
Expand All @@ -268,7 +268,9 @@ impl StorageLogsDal<'_, '_> {
// as per `initial_writes`, so if we return such keys from this method, it will lead to
// the incorrect state after revert.
let stage_start = Instant::now();
let l1_batch_by_key = self.get_l1_batches_for_initial_writes(&modified_keys).await;
let l1_batch_and_index_by_key = self
.get_l1_batches_and_indices_for_initial_writes(&modified_keys)
.await;
tracing::info!(
"Loaded initial write info for modified keys in {:?}",
stage_start.elapsed()
Expand All @@ -277,12 +279,12 @@ impl StorageLogsDal<'_, '_> {
let stage_start = Instant::now();
let mut output = HashMap::with_capacity(modified_keys.len());
modified_keys.retain(|key| {
match l1_batch_by_key.get(key) {
match l1_batch_and_index_by_key.get(key) {
None => {
// Key is completely deduped. It should not be present in the output map.
false
}
Some(write_batch) if *write_batch > l1_batch_number => {
Some((write_batch, _)) if *write_batch > l1_batch_number => {
// Key was initially written to after the specified L1 batch.
output.insert(*key, None);
false
Expand All @@ -295,18 +297,24 @@ impl StorageLogsDal<'_, '_> {
stage_start.elapsed()
);

let deduped_count = modified_keys_count - l1_batch_by_key.len();
let deduped_count = modified_keys_count - l1_batch_and_index_by_key.len();
tracing::info!(
"Keys to update: {update_count}, to delete: {delete_count}; {deduped_count} modified keys \
are deduped and will be ignored",
update_count = modified_keys.len(),
delete_count = l1_batch_by_key.len() - modified_keys.len()
delete_count = l1_batch_and_index_by_key.len() - modified_keys.len()
);

let stage_start = Instant::now();
let prev_values_for_updated_keys = self
.get_storage_values(&modified_keys, last_miniblock)
.await;
.await
.into_iter()
.map(|(key, value)| {
let value = value.unwrap(); // We already filtered out keys that weren't touched.
let index = l1_batch_and_index_by_key[&key].1;
(key, Some((value, index)))
});
tracing::info!(
"Loaded previous values for {} keys in {:?}",
prev_values_for_updated_keys.len(),
Expand All @@ -316,20 +324,22 @@ impl StorageLogsDal<'_, '_> {
output
}

pub async fn get_l1_batches_for_initial_writes(
pub async fn get_l1_batches_and_indices_for_initial_writes(
&mut self,
hashed_keys: &[H256],
) -> HashMap<H256, L1BatchNumber> {
) -> HashMap<H256, (L1BatchNumber, u64)> {
if hashed_keys.is_empty() {
return HashMap::new(); // Shortcut to save time on communication with DB in the common case
}

let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect();
let rows = sqlx::query!(
"SELECT hashed_key, l1_batch_number FROM initial_writes \
"SELECT hashed_key, l1_batch_number, index FROM initial_writes \
WHERE hashed_key = ANY($1::bytea[])",
&hashed_keys as &[&[u8]],
)
.instrument("get_l1_batches_and_indices_for_initial_writes")
.report_latency()
.fetch_all(self.storage.conn())
.await
.unwrap();
Expand All @@ -338,7 +348,7 @@ impl StorageLogsDal<'_, '_> {
.map(|row| {
(
H256::from_slice(&row.hashed_key),
L1BatchNumber(row.l1_batch_number as u32),
(L1BatchNumber(row.l1_batch_number as u32), row.index as u64),
)
})
.collect()
Expand Down Expand Up @@ -696,7 +706,7 @@ mod tests {
.await;
assert_eq!(logs_for_revert.len(), 15); // 5 updated + 10 new keys
for log in &logs[5..] {
let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap();
let prev_value = logs_for_revert[&log.key.hashed_key()].unwrap().0;
assert_eq!(prev_value, log.value);
}
for log in &new_logs[5..] {
Expand Down
1 change: 1 addition & 0 deletions core/lib/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow = "1.0"
mini-moka = "0.10.0"
tokio = { version = "1", features = ["rt"] }
tracing = "0.1"
itertools = "0.10.3"

[dev-dependencies]
db_test_macro = { path = "../db_test_macro" }
Expand Down
31 changes: 26 additions & 5 deletions core/lib/state/src/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{hash_map::Entry, BTreeMap, HashMap};

use crate::ReadStorage;
use zksync_types::{
Expand All @@ -14,8 +14,9 @@ pub const IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID: u32 = 270;
/// In-memory storage.
#[derive(Debug, Default)]
pub struct InMemoryStorage {
pub(crate) state: HashMap<StorageKey, StorageValue>,
pub(crate) state: HashMap<StorageKey, (StorageValue, u64)>,
pub(crate) factory_deps: HashMap<H256, Vec<u8>>,
last_enum_index_set: u64,
}

impl InMemoryStorage {
Expand Down Expand Up @@ -47,7 +48,7 @@ impl InMemoryStorage {
) -> Self {
let system_context_init_log = get_system_context_init_logs(chain_id);

let state = contracts
let state_without_indices: BTreeMap<_, _> = contracts
.iter()
.flat_map(|contract| {
let bytecode_hash = bytecode_hasher(&contract.bytecode);
Expand All @@ -63,20 +64,36 @@ impl InMemoryStorage {
.chain(system_context_init_log)
.filter_map(|log| (log.kind == StorageLogKind::Write).then_some((log.key, log.value)))
.collect();
let state: HashMap<_, _> = state_without_indices
.into_iter()
.enumerate()
.map(|(idx, (key, value))| (key, (value, idx as u64 + 1)))
.collect();

let factory_deps = contracts
.into_iter()
.map(|contract| (bytecode_hasher(&contract.bytecode), contract.bytecode))
.collect();

let last_enum_index_set = state.len() as u64;
Self {
state,
factory_deps,
last_enum_index_set,
}
}

/// Sets the storage `value` at the specified `key`.
pub fn set_value(&mut self, key: StorageKey, value: StorageValue) {
self.state.insert(key, value);
match self.state.entry(key) {
Entry::Occupied(mut entry) => {
entry.get_mut().0 = value;
}
Entry::Vacant(entry) => {
self.last_enum_index_set += 1;
entry.insert((value, self.last_enum_index_set));
}
}
}

/// Stores a factory dependency with the specified `hash` and `bytecode`.
Expand All @@ -87,7 +104,11 @@ impl InMemoryStorage {

impl ReadStorage for &InMemoryStorage {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
self.state.get(key).copied().unwrap_or_default()
self.state
.get(key)
.map(|(value, _)| value)
.copied()
.unwrap_or_default()
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
Expand Down
Loading

0 comments on commit bab099d

Please sign in to comment.