diff --git a/core/bin/merkle_tree_consistency_checker/src/main.rs b/core/bin/merkle_tree_consistency_checker/src/main.rs index b67415e8a6f..a4a4aeec0bf 100644 --- a/core/bin/merkle_tree_consistency_checker/src/main.rs +++ b/core/bin/merkle_tree_consistency_checker/src/main.rs @@ -27,7 +27,7 @@ impl Cli { let db_path = &config.merkle_tree.path; tracing::info!("Verifying consistency of Merkle tree at {db_path}"); let start = Instant::now(); - let db = RocksDB::new(Path::new(db_path), true); + let db = RocksDB::new(Path::new(db_path)); let tree = ZkSyncTree::new_lightweight(db); let l1_batch_number = if let Some(number) = self.l1_batch { diff --git a/core/lib/merkle_tree/examples/loadtest/main.rs b/core/lib/merkle_tree/examples/loadtest/main.rs index ac2cdf0676c..75971fd26fb 100644 --- a/core/lib/merkle_tree/examples/loadtest/main.rs +++ b/core/lib/merkle_tree/examples/loadtest/main.rs @@ -82,7 +82,7 @@ impl Cli { dir.path().to_string_lossy() ); rocksdb = if let Some(block_cache_capacity) = self.block_cache { - let db = RocksDB::with_cache(dir.path(), true, Some(block_cache_capacity)); + let db = RocksDB::with_cache(dir.path(), Some(block_cache_capacity)); RocksDBWrapper::from(db) } else { RocksDBWrapper::new(dir.path()) diff --git a/core/lib/merkle_tree/src/storage/rocksdb.rs b/core/lib/merkle_tree/src/storage/rocksdb.rs index 543cd5da88f..b9aca28fd28 100644 --- a/core/lib/merkle_tree/src/storage/rocksdb.rs +++ b/core/lib/merkle_tree/src/storage/rocksdb.rs @@ -63,8 +63,7 @@ impl RocksDBWrapper { /// Creates a new wrapper, initializing RocksDB at the specified directory. pub fn new(path: &Path) -> Self { - let db = RocksDB::new(path, true); - Self::from(db) + Self::from(RocksDB::new(path)) } /// Sets the chunk size for multi-get operations. The requested keys will be split diff --git a/core/lib/merkle_tree/tests/integration/domain.rs b/core/lib/merkle_tree/tests/integration/domain.rs index ec76ebd4b7e..d3b666c8849 100644 --- a/core/lib/merkle_tree/tests/integration/domain.rs +++ b/core/lib/merkle_tree/tests/integration/domain.rs @@ -42,7 +42,7 @@ fn basic_workflow() { let logs = gen_storage_logs(); let (metadata, expected_root_hash) = { - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(db); let metadata = tree.process_l1_batch(&logs); tree.save(); @@ -66,7 +66,7 @@ fn basic_workflow() { ]), ); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let tree = ZkSyncTree::new_lightweight(db); tree.verify_consistency(L1BatchNumber(0)); assert_eq!(tree.root_hash(), expected_root_hash); @@ -80,7 +80,7 @@ fn basic_workflow_multiblock() { let blocks = logs.chunks(9); let expected_root_hash = { - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(db); tree.use_dedicated_thread_pool(2); for block in blocks { @@ -98,7 +98,7 @@ fn basic_workflow_multiblock() { ]), ); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let tree = ZkSyncTree::new_lightweight(db); assert_eq!(tree.root_hash(), expected_root_hash); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(12)); @@ -107,7 +107,7 @@ fn basic_workflow_multiblock() { #[test] fn filtering_out_no_op_writes() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new(db); let mut logs = gen_storage_logs(); let root_hash = tree.process_l1_batch(&logs).root_hash; @@ -142,7 +142,7 @@ fn filtering_out_no_op_writes() { #[test] fn revert_blocks() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); // Generate logs and save them to DB. // Produce 4 blocks with distinct values and 1 block with modified values from first block @@ -198,7 +198,7 @@ fn revert_blocks() { } // Revert the last block. - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); { let mut tree = ZkSyncTree::new_lightweight(storage); assert_eq!(tree.root_hash(), tree_metadata.last().unwrap().root_hash); @@ -208,7 +208,7 @@ fn revert_blocks() { } // Revert two more blocks. - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); { let mut tree = ZkSyncTree::new_lightweight(storage); tree.revert_logs(L1BatchNumber(1)); @@ -217,7 +217,7 @@ fn revert_blocks() { } // Revert two more blocks second time; the result should be the same - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); { let mut tree = ZkSyncTree::new_lightweight(storage); tree.revert_logs(L1BatchNumber(1)); @@ -226,7 +226,7 @@ fn revert_blocks() { } // Reapply one of the reverted logs - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); { let storage_log = mirror_logs.get(3 * block_size).unwrap(); let mut tree = ZkSyncTree::new_lightweight(storage); @@ -235,7 +235,7 @@ fn revert_blocks() { } // check saved block number - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); let tree = ZkSyncTree::new_lightweight(storage); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(3)); } @@ -243,7 +243,7 @@ fn revert_blocks() { #[test] fn reset_tree() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let storage = RocksDB::new(temp_dir.as_ref(), false); + let storage = RocksDB::new(temp_dir.as_ref()); let logs = gen_storage_logs(); let mut tree = ZkSyncTree::new_lightweight(storage); let empty_root_hash = tree.root_hash(); @@ -266,14 +266,14 @@ fn read_logs() { logs.truncate(5); let write_metadata = { - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(db); let metadata = tree.process_l1_batch(&logs); tree.save(); metadata }; - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(db); let read_logs: Vec<_> = logs .into_iter() @@ -304,7 +304,7 @@ fn subtract_from_max_value(diff: u8) -> [u8; 32] { #[test] fn root_hash_compatibility() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(db); assert_eq!( tree.root_hash(), @@ -356,7 +356,7 @@ fn root_hash_compatibility() { #[test] fn process_block_idempotency_check() { let temp_dir = TempDir::new().expect("failed to get temporary directory for RocksDB"); - let rocks_db = RocksDB::new(temp_dir.as_ref(), false); + let rocks_db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new_lightweight(rocks_db); let logs = gen_storage_logs(); let tree_metadata = tree.process_l1_batch(&logs); @@ -419,7 +419,7 @@ fn witness_workflow() { let logs = gen_storage_logs(); let (first_chunk, _) = logs.split_at(logs.len() / 2); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new(db); let metadata = tree.process_l1_batch(first_chunk); let job = metadata.witness.unwrap(); @@ -449,7 +449,7 @@ fn witnesses_with_multiple_blocks() { let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); let logs = gen_storage_logs(); - let db = RocksDB::new(temp_dir.as_ref(), false); + let db = RocksDB::new(temp_dir.as_ref()); let mut tree = ZkSyncTree::new(db); let empty_tree_hashes: Vec<_> = (0..256) .map(|i| Blake2Hasher.empty_subtree_hash(i)) diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 8723efd31d5..b871e9e9f03 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -114,9 +114,8 @@ impl RocksdbStorage { /// Creates a new storage with the provided RocksDB `path`. pub fn new(path: &Path) -> Self { - let db = RocksDB::new(path, true); Self { - db, + db: RocksDB::new(path), pending_patch: InMemoryStorage::default(), enum_index_migration_chunk_size: 0, } diff --git a/core/lib/storage/src/db.rs b/core/lib/storage/src/db.rs index 44808af9458..c6b75c6a25e 100644 --- a/core/lib/storage/src/db.rs +++ b/core/lib/storage/src/db.rs @@ -3,14 +3,16 @@ use rocksdb::{ Direction, IteratorMode, Options, PrefixRange, ReadOptions, WriteOptions, DB, }; -use std::ffi::CStr; use std::{ collections::HashSet, + ffi::CStr, fmt, marker::PhantomData, ops, path::Path, sync::{Arc, Condvar, Mutex}, + thread, + time::Duration, }; use crate::metrics::{RocksdbLabels, RocksdbSizeMetrics, METRICS}; @@ -139,6 +141,31 @@ impl RocksDBInner { } } +#[derive(Debug, Clone, Copy)] +struct StalledWritesRetries { + max_batch_size: usize, + retry_count: usize, + interval: Duration, +} + +impl Default for StalledWritesRetries { + fn default() -> Self { + Self { + max_batch_size: 128 << 20, // 128 MiB + retry_count: 3, + interval: Duration::from_millis(100), + } + } +} + +impl StalledWritesRetries { + // **NB.** The error message may change between RocksDB versions! + fn is_write_stall_error(error: &rocksdb::Error) -> bool { + matches!(error.kind(), rocksdb::ErrorKind::ShutdownInProgress) + && error.as_ref().ends_with("stalled writes") + } +} + /// Thin wrapper around a RocksDB instance. /// /// The wrapper is cheaply cloneable (internally, it wraps a DB instance in an [`Arc`]). @@ -146,21 +173,18 @@ impl RocksDBInner { pub struct RocksDB { inner: Arc, sync_writes: bool, + stalled_writes_retries: StalledWritesRetries, _cf: PhantomData, } impl RocksDB { - pub fn new(path: &Path, tune_options: bool) -> Self { - Self::with_cache(path, tune_options, None) + pub fn new(path: &Path) -> Self { + Self::with_cache(path, None) } - pub fn with_cache( - path: &Path, - tune_options: bool, - block_cache_capacity: Option, - ) -> Self { + pub fn with_cache(path: &Path, block_cache_capacity: Option) -> Self { let caches = RocksDBCaches::new(block_cache_capacity); - let options = Self::rocksdb_options(tune_options, None); + let options = Self::rocksdb_options(None); let existing_cfs = DB::list_cf(&options, path).unwrap_or_else(|err| { tracing::warn!( "Failed getting column families for RocksDB `{}` at `{}`, assuming CFs are empty; {err}", @@ -195,13 +219,11 @@ impl RocksDB { let all_cf_names = cf_names.iter().copied().chain(obsolete_cfs); let cfs = all_cf_names.map(|cf_name| { let mut block_based_options = BlockBasedOptions::default(); - if tune_options { - block_based_options.set_bloom_filter(10.0, false); - } + block_based_options.set_bloom_filter(10.0, false); if let Some(cache) = &caches.shared { block_based_options.set_block_cache(cache); } - let cf_options = Self::rocksdb_options(tune_options, Some(block_based_options)); + let cf_options = Self::rocksdb_options(Some(block_based_options)); ColumnFamilyDescriptor::new(cf_name, cf_options) }); @@ -218,6 +240,7 @@ impl RocksDB { Self { inner, sync_writes: false, + stalled_writes_retries: StalledWritesRetries::default(), _cf: PhantomData, } } @@ -230,16 +253,19 @@ impl RocksDB { self } - fn rocksdb_options( - tune_options: bool, - block_based_options: Option, - ) -> Options { + fn rocksdb_options(block_based_options: Option) -> Options { let mut options = Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); - if tune_options { - options.increase_parallelism(num_cpus::get() as i32); - } + + let num_cpus = num_cpus::get() as i32; + options.increase_parallelism(num_cpus); + // Settings below are taken as per PingCAP recommendations: + // https://www.pingcap.com/blog/how-to-troubleshoot-rocksdb-write-stalls-in-tikv/ + options.set_max_write_buffer_number(5); + let max_background_jobs = (num_cpus - 1).clamp(1, 8); + options.set_max_background_jobs(max_background_jobs); + if let Some(block_based_options) = block_based_options { options.set_block_based_table_factory(&block_based_options); } @@ -282,19 +308,47 @@ impl RocksDB { } pub fn write<'a>(&'a self, batch: WriteBatch<'a, CF>) -> Result<(), rocksdb::Error> { - let raw_batch = batch.inner; + let retries = &self.stalled_writes_retries; + let mut raw_batch = batch.inner; METRICS.report_batch_size(CF::DB_NAME, raw_batch.size_in_bytes()); + if raw_batch.size_in_bytes() > retries.max_batch_size { + // The write batch is too large to duplicate in RAM. + return self.write_inner(raw_batch); + } + + let raw_batch_bytes = raw_batch.data().to_vec(); + let mut retry_count = 0; + loop { + match self.write_inner(raw_batch) { + Ok(()) => return Ok(()), + Err(err) => { + let should_retry = StalledWritesRetries::is_write_stall_error(&err) + && retry_count < retries.retry_count; + if should_retry { + tracing::warn!( + "Writes stalled when writing to DB `{}`; will retry after a delay", + CF::DB_NAME + ); + thread::sleep(retries.interval); + retry_count += 1; + raw_batch = rocksdb::WriteBatch::from_data(&raw_batch_bytes); + } else { + return Err(err); + } + } + } + } + } + + fn write_inner(&self, raw_batch: rocksdb::WriteBatch) -> Result<(), rocksdb::Error> { if self.sync_writes { let mut options = WriteOptions::new(); options.set_sync(true); - self.inner.db.write_opt(raw_batch, &options)?; + self.inner.db.write_opt(raw_batch, &options) } else { - self.inner.db.write(raw_batch)?; + self.inner.db.write(raw_batch) } - - // Since getting size stats may take some time, we throttle their reporting. - Ok(()) } fn column_family(&self, cf: CF) -> &ColumnFamily { @@ -439,13 +493,13 @@ mod tests { #[test] fn changing_column_families() { let temp_dir = TempDir::new().unwrap(); - let db = RocksDB::::new(temp_dir.path(), true).with_sync_writes(); + let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); let mut batch = db.new_write_batch(); batch.put_cf(OldColumnFamilies::Default, b"test", b"value"); db.write(batch).unwrap(); drop(db); - let db = RocksDB::::new(temp_dir.path(), true); + let db = RocksDB::::new(temp_dir.path()); let value = db.get_cf(NewColumnFamilies::Default, b"test").unwrap(); assert_eq!(value.unwrap(), b"value"); } @@ -465,14 +519,39 @@ mod tests { #[test] fn default_column_family_does_not_need_to_be_explicitly_opened() { let temp_dir = TempDir::new().unwrap(); - let db = RocksDB::::new(temp_dir.path(), true).with_sync_writes(); + let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); let mut batch = db.new_write_batch(); batch.put_cf(OldColumnFamilies::Junk, b"test", b"value"); db.write(batch).unwrap(); drop(db); - let db = RocksDB::::new(temp_dir.path(), true); + let db = RocksDB::::new(temp_dir.path()); let value = db.get_cf(JunkColumnFamily, b"test").unwrap(); assert_eq!(value.unwrap(), b"value"); } + + #[test] + fn write_batch_can_be_restored_from_bytes() { + let temp_dir = TempDir::new().unwrap(); + let db = RocksDB::::new(temp_dir.path()).with_sync_writes(); + let mut batch = db.new_write_batch(); + batch.put_cf(NewColumnFamilies::Default, b"test", b"value"); + batch.put_cf(NewColumnFamilies::Default, b"test2", b"value2"); + let batch = WriteBatch { + db: &db, + inner: rocksdb::WriteBatch::from_data(batch.inner.data()), + }; + db.write(batch).unwrap(); + + let value = db + .get_cf(NewColumnFamilies::Default, b"test") + .unwrap() + .unwrap(); + assert_eq!(value, b"value"); + let value = db + .get_cf(NewColumnFamilies::Default, b"test2") + .unwrap() + .unwrap(); + assert_eq!(value, b"value2"); + } } diff --git a/core/lib/zksync_core/src/block_reverter/mod.rs b/core/lib/zksync_core/src/block_reverter/mod.rs index 3a1c58d20cf..7888df4a0f3 100644 --- a/core/lib/zksync_core/src/block_reverter/mod.rs +++ b/core/lib/zksync_core/src/block_reverter/mod.rs @@ -190,7 +190,7 @@ impl BlockReverter { path: &Path, storage_root_hash: H256, ) { - let db = RocksDB::new(path, true); + let db = RocksDB::new(path); let mut tree = ZkSyncTree::new_lightweight(db); if tree.next_l1_batch_number() <= last_l1_batch_to_keep { diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index ad4dea3023b..3330633f8c1 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -86,7 +86,7 @@ impl AsyncTree { } fn create_db(path: &Path, block_cache_capacity: usize) -> RocksDB { - let db = RocksDB::with_cache(path, true, Some(block_cache_capacity)); + let db = RocksDB::with_cache(path, Some(block_cache_capacity)); if cfg!(test) { // We need sync writes for the unit tests to execute reliably. With the default config, // some writes to RocksDB may occur, but not be visible to the test code.