diff --git a/crutest/src/cli.rs b/crutest/src/cli.rs index c56c7d190..e71bd6393 100644 --- a/crutest/src/cli.rs +++ b/crutest/src/cli.rs @@ -128,6 +128,8 @@ enum CliCommand { #[clap(long, action)] skip_verify: bool, }, + /// Run the sparse fill test. Write to a block in each extent. + FillSparse, /// Flush Flush, /// Run Generic workload @@ -234,7 +236,8 @@ async fn cli_read( * Convert offset to its byte value. */ let offset = BlockIndex(block_index as u64); - let mut data = crucible::Buffer::repeat(255, size, ri.block_size as usize); + let mut data = + crucible::Buffer::repeat(255, size, ri.volume_info.block_size as usize); println!("Read at block {:5}, len:{:7}", offset.0, data.len()); volume.read(offset, &mut data).await?; @@ -244,7 +247,7 @@ async fn cli_read( dl.clone(), block_index, &mut ri.write_log, - ri.block_size, + ri.volume_info.block_size, false, ) { ValidateStatus::Bad => { @@ -311,13 +314,13 @@ async fn cli_write( let data = if block_index + size > ri.total_blocks { println!("Skip write log for invalid size {}", ri.total_blocks); let mut out = BytesMut::new(); - out.resize(size * ri.block_size as usize, 0); + out.resize(size * ri.volume_info.block_size as usize, 0); out } else { for bi in block_index..block_index + size { ri.write_log.update_wc(bi); } - fill_vec(block_index, size, &ri.write_log, ri.block_size) + fill_vec(block_index, size, &ri.write_log, ri.volume_info.block_size) }; println!("Write at block {:5}, len:{:7}", offset.0, data.len()); @@ -352,14 +355,18 @@ async fn cli_write_unwritten( // like normal and update our internal counter to reflect that. ri.write_log.update_wc(block_index); - fill_vec(block_index, 1, &ri.write_log, ri.block_size) + fill_vec(block_index, 1, &ri.write_log, ri.volume_info.block_size) } else { println!("This block has been written"); // Fill the write buffer with random data. We don't expect this // to actually make it to disk. - let mut data = BytesMut::with_capacity(ri.block_size as usize); - data.extend((0..ri.block_size).map(|_| rand::thread_rng().gen::())); + let mut data = + BytesMut::with_capacity(ri.volume_info.block_size as usize); + data.extend( + (0..ri.volume_info.block_size) + .map(|_| rand::thread_rng().gen::()), + ); data }; @@ -499,6 +506,9 @@ async fn cmd_to_msg( CliCommand::Fill { skip_verify } => { fw.send(CliMessage::Fill(skip_verify)).await?; } + CliCommand::FillSparse => { + fw.send(CliMessage::FillSparse).await?; + } CliCommand::Flush => { fw.send(CliMessage::Flush).await?; } @@ -571,8 +581,8 @@ async fn cmd_to_msg( Some(CliMessage::MyUuid(uuid)) => { println!("uuid: {}", uuid); } - Some(CliMessage::Info(es, bs, bl)) => { - println!("Got info: {} {} {}", es, bs, bl); + Some(CliMessage::Info(vi, bs, bl)) => { + println!("Got info: {:?} {} {}", vi, bs, bl); } Some(CliMessage::DoneOk) => { println!("Ok"); @@ -871,6 +881,23 @@ async fn process_cli_command( } } } + CliMessage::FillSparse => { + if ri.write_log.is_empty() { + fw.send(CliMessage::Error(CrucibleError::GenericError( + "Info not initialized".to_string(), + ))) + .await + } else { + match fill_sparse_workload(volume, ri).await { + Ok(_) => fw.send(CliMessage::DoneOk).await, + Err(e) => { + let msg = format!("FillSparse failed with {}", e); + let e = CrucibleError::GenericError(msg); + fw.send(CliMessage::Error(e)).await + } + } + } + } CliMessage::Flush => { println!("Flush"); match volume.flush(None).await { @@ -886,10 +913,7 @@ async fn process_cli_command( let new_ri = get_region_info(volume).await; match new_ri { Ok(new_ri) => { - let bs = new_ri.block_size; - let es = new_ri.extent_size.value; - let ts = new_ri.total_size; - *ri = new_ri; + *ri = new_ri.clone(); /* * We may only want to read input from the file once. * Maybe make a command to specifically do it, but it @@ -902,7 +926,12 @@ async fn process_cli_command( *wc_filled = true; } } - fw.send(CliMessage::Info(bs, es, ts)).await + fw.send(CliMessage::Info( + new_ri.volume_info, + new_ri.total_size, + new_ri.total_blocks, + )) + .await } Err(e) => fw.send(CliMessage::Error(e)).await, } @@ -1071,9 +1100,12 @@ pub async fn start_cli_server( * If write_log len is zero, then the RegionInfo has * not been filled. */ + let volume_info = VolumeInfo { + block_size: 512, + volumes: Vec::new(), + }; let mut ri: RegionInfo = RegionInfo { - block_size: 0, - extent_size: Block::new_512(0), + volume_info, total_size: 0, total_blocks: 0, write_log: WriteLog::new(0), diff --git a/crutest/src/main.rs b/crutest/src/main.rs index 3c6570745..31face715 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -1,15 +1,4 @@ // Copyright 2023 Oxide Computer Company -use std::fmt; -use std::fs::File; -use std::io::Write; -use std::net::{IpAddr, SocketAddr}; -use std::num::NonZeroU64; -use std::path::PathBuf; -use std::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, -}; - use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use clap::Parser; @@ -25,6 +14,16 @@ use serde::{Deserialize, Serialize}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use slog::{info, o, warn, Logger}; +use std::fmt; +use std::fs::File; +use std::io::Write; +use std::net::{IpAddr, SocketAddr}; +use std::num::NonZeroU64; +use std::path::PathBuf; +use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, +}; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; @@ -35,7 +34,7 @@ mod protocol; mod stats; pub use stats::*; -use crucible::volume::VolumeBuilder; +use crucible::volume::{VolumeBuilder, VolumeInfo}; use crucible::*; use crucible_client_types::RegionExtentInfo; use crucible_protocol::CRUCIBLE_MESSAGE_VERSION; @@ -418,8 +417,7 @@ impl BufferbloatConfig { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RegionInfo { - block_size: u64, - extent_size: Block, + volume_info: VolumeInfo, total_size: u64, total_blocks: usize, write_log: WriteLog, @@ -434,30 +432,28 @@ async fn get_region_info(volume: &Volume) -> Result { * These query requests have the side effect of preventing the test from * starting before the upstairs is ready. */ - let block_size = volume.get_block_size().await?; - let extent_size = volume.query_extent_size().await?; + let volume_info = volume.volume_extent_info().await?; let total_size = volume.total_size().await?; - let total_blocks = (total_size / block_size) as usize; + let total_blocks = (total_size / volume_info.block_size) as usize; /* * Limit the max IO size (in blocks) to be 1MiB or the size * of the volume, whichever is smaller */ const MAX_IO_BYTES: usize = 1024 * 1024; - let mut max_block_io = MAX_IO_BYTES / block_size as usize; + let mut max_block_io = MAX_IO_BYTES / volume_info.block_size as usize; if total_blocks < max_block_io { max_block_io = total_blocks; } println!( - "Region: es:{} ec:{} bs:{} ts:{} tb:{} max_io:{} or {}", - extent_size.value, - total_blocks as u64 / extent_size.value, - block_size, + "Region: sv:{} bs:{} ts:{} tb:{} max_io:{} or {}", + volume_info.volumes.len(), + volume_info.block_size, total_size, total_blocks, max_block_io, - (max_block_io as u64 * block_size), + (max_block_io as u64 * volume_info.block_size), ); /* @@ -467,8 +463,7 @@ async fn get_region_info(volume: &Volume) -> Result { let write_log = WriteLog::new(total_blocks); Ok(RegionInfo { - block_size, - extent_size, + volume_info, total_size, total_blocks, write_log, @@ -804,7 +799,7 @@ async fn make_a_volume( bail!("Failed to get region info from {:?}: {}", dsc_url, e); } }; - info!(test_log, "use region info: {:?}", ri); + info!(test_log, "Use this region info from dsc: {:?}", ri); let extent_info = RegionExtentInfo { block_size: ri.block_size, blocks_per_extent: ri.blocks_per_extent, @@ -1577,7 +1572,7 @@ async fn verify_volume( let write_log = write_log.clone(); let blocks_done = blocks_done.clone(); let total_blocks = ri.total_blocks; - let block_size = ri.block_size; + let block_size = ri.volume_info.block_size; let pb = pb.clone(); tasks.push(tokio::task::spawn(async move { let mut result = Ok(()); @@ -1911,8 +1906,12 @@ async fn balloon_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { ri.write_log.update_wc(block_index + i); } - let data = - fill_vec(block_index, size, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + size, + &ri.write_log, + ri.volume_info.block_size, + ); /* * Convert block_index to its byte value. */ @@ -1922,8 +1921,11 @@ async fn balloon_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { volume.write(offset, data).await?; volume.flush(None).await?; - let mut data = - crucible::Buffer::repeat(255, size, ri.block_size as usize); + let mut data = crucible::Buffer::repeat( + 255, + size, + ri.volume_info.block_size as usize, + ); volume.read(offset, &mut data).await?; let dl = data.into_bytes(); @@ -1931,7 +1933,7 @@ async fn balloon_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { dl, block_index, &mut ri.write_log, - ri.block_size, + ri.volume_info.block_size, false, ) { ValidateStatus::Bad | ValidateStatus::InRange => { @@ -1976,7 +1978,7 @@ async fn fill_workload( let write_log = write_log.clone(); let blocks_done = blocks_done.clone(); let total_blocks = ri.total_blocks; - let block_size = ri.block_size; + let block_size = ri.volume_info.block_size; let pb = pb.clone(); tasks.push(tokio::task::spawn(async move { while block_index < total_blocks { @@ -2045,24 +2047,32 @@ async fn fill_sparse_workload( ) -> Result<()> { let mut rng = rand_chacha::ChaCha8Rng::from_entropy(); - // Figure out how many extents we have - let extents = ri.total_blocks / (ri.extent_size.value as usize); - let extent_size = ri.extent_size.value as usize; - - // Do one write to each extent. - for extent in 0..extents { - let mut block_index: usize = extent * extent_size; - let random_offset: usize = rng.gen_range(0..extent_size); - block_index += random_offset; + // We loop over all sub volumes, doing one write to each extent. + for (index, sv) in ri.volume_info.volumes.iter().enumerate() { + let extents = sv.extent_count as usize; + let extent_size = sv.blocks_per_extent as usize; + for extent in 0..extents { + let mut block_index: usize = extent * extent_size; + let random_offset = rng.gen_range(0..extent_size); + block_index += random_offset; - let offset = BlockIndex(block_index as u64); + let offset = BlockIndex(block_index.try_into().unwrap()); - ri.write_log.update_wc(block_index); + ri.write_log.update_wc(block_index); - let data = fill_vec(block_index, 1, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + 1, + &ri.write_log, + ri.volume_info.block_size, + ); - println!("[{extent}/{extents}] Write to block {}", block_index); - volume.write(offset, data).await?; + println!( + "[{index}][{extent}/{extents}] Write to block {}", + block_index + ); + volume.write(offset, data).await?; + } } volume.flush(None).await?; @@ -2090,7 +2100,7 @@ async fn generic_workload( _ => 5, }; let block_width = ri.total_blocks.to_string().len(); - let size_width = (10 * ri.block_size).to_string().len(); + let size_width = (10 * ri.volume_info.block_size).to_string().len(); let max_io_size = std::cmp::min(10, ri.total_blocks); @@ -2135,8 +2145,12 @@ async fn generic_workload( ri.write_log.update_wc(block_index + i); } - let data = - fill_vec(block_index, size, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + size, + &ri.write_log, + ri.volume_info.block_size, + ); if !quiet { match wtq { @@ -2174,8 +2188,11 @@ async fn generic_workload( volume.write(offset, data).await?; } else { // Read (+ verify) - let mut data = - crucible::Buffer::repeat(255, size, ri.block_size as usize); + let mut data = crucible::Buffer::repeat( + 255, + size, + ri.volume_info.block_size as usize, + ); if !quiet { match wtq { WhenToQuit::Count { count } => { @@ -2206,7 +2223,7 @@ async fn generic_workload( dl, block_index, &mut ri.write_log, - ri.block_size, + ri.volume_info.block_size, false, ) { ValidateStatus::Bad | ValidateStatus::InRange => { @@ -2343,7 +2360,7 @@ async fn replace_while_reconcile( info!(log, "Begin replacement while reconciliation test"); loop { info!(log, "[{c}] Touch every extent part 1"); - fill_sparse_workload(&volume, ri).await?; + fill_sparse_workload(volume, ri).await?; info!(log, "[{c}] Stop a downstairs"); // Stop a downstairs, wait for dsc to confirm it is stopped. @@ -2357,7 +2374,7 @@ async fn replace_while_reconcile( tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; } info!(log, "[{c}] Touch every extent part 2"); - fill_sparse_workload(&volume, ri).await?; + fill_sparse_workload(volume, ri).await?; info!(log, "[{c}] Deactivate"); volume.deactivate().await.unwrap(); @@ -2541,7 +2558,7 @@ async fn replace_before_active( let mut new_ds = targets.len() - 1; for c in 1.. { info!(log, "[{c}] Touch every extent"); - fill_sparse_workload(&volume, ri).await?; + fill_sparse_workload(volume, ri).await?; volume.deactivate().await.unwrap(); loop { @@ -2692,7 +2709,7 @@ async fn replace_workload( let ds_total = targets.len() - 1; if fill { - fill_sparse_workload(&volume, ri).await?; + fill_sparse_workload(volume, ri).await?; } // Make a copy of the stop at counter if one was provided so the // IO task and the replace task don't have to share wtq @@ -2874,7 +2891,12 @@ async fn dirty_workload( */ ri.write_log.update_wc(block_index); - let data = fill_vec(block_index, size, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + size, + &ri.write_log, + ri.volume_info.block_size, + ); println!( "[{:>0width$}/{:>0width$}] Write at block {}, len:{}", @@ -3075,7 +3097,7 @@ async fn perf_workload( } let mut rng = rand::thread_rng(); - let io_size = blocks_per_io * ri.block_size as usize; + let io_size = blocks_per_io * ri.volume_info.block_size as usize; let write_buffers: Vec = (0..io_depth) @@ -3089,10 +3111,13 @@ async fn perf_workload( .collect(); let mut read_buffers: Vec = (0..io_depth) - .map(|_| Buffer::new(blocks_per_io, ri.block_size as usize)) + .map(|_| Buffer::new(blocks_per_io, ri.volume_info.block_size as usize)) .collect(); - let es = ri.extent_size.value; + let mut es = 0; + for sv in ri.volume_info.volumes.iter() { + es += sv.blocks_per_extent; + } let ec = ri.total_blocks as u64 / es; // To make a random block offset, we take the total block count and subtract @@ -3108,7 +3133,7 @@ async fn perf_workload( for write_buffer in write_buffers.iter().take(io_depth) { let offset: u64 = rng.gen::() % offset_mod; let future = volume.write_to_byte_offset( - offset * ri.block_size, + offset * ri.volume_info.block_size, write_buffer.clone(), ); write_futures.push_back(future); @@ -3158,7 +3183,7 @@ async fn perf_workload( let offset: u64 = rng.gen::() % offset_mod; let future = { let volume = volume.clone(); - let bs = ri.block_size; + let bs = ri.volume_info.block_size; tokio::spawn(async move { volume .read_from_byte_offset( @@ -3215,20 +3240,26 @@ async fn perf_workload( /// Prints a pleasant summary of the given region fn print_region_description(ri: &RegionInfo, encrypted: bool) { println!("region info:"); - println!(" block size: {} bytes", ri.block_size); - println!(" blocks / extent: {}", ri.extent_size.value); - println!( - " extent size: {}", - human_bytes((ri.block_size * ri.extent_size.value) as f64) - ); + let block_size = ri.volume_info.block_size; + println!(" block size: {} bytes", block_size); + for (index, sv) in ri.volume_info.volumes.iter().enumerate() { + println!( + " sub_volume {index} blocks / extent: {}", + sv.blocks_per_extent + ); + println!(" sub_volume {index} extent count: {}", sv.extent_count); + println!( + " sub_volume {index} extent size: {}", + human_bytes((block_size * sv.blocks_per_extent) as f64) + ); + } + println!(" total blocks: {}", ri.total_blocks); println!( - " extent count: {}", - ri.total_blocks as u64 / ri.extent_size.value + " total size: {}", + human_bytes(ri.total_size as f64) ); - println!(" total blocks: {}", ri.total_blocks); - println!(" total size: {}", human_bytes(ri.total_size as f64)); println!( - " encryption: {}", + " encryption: {}", if encrypted { "yes" } else { "no" } ); } @@ -3261,10 +3292,11 @@ async fn rand_read_write_workload( bail!("too many blocks per IO; can't exceed {}", ri.total_blocks); } + let block_size = ri.volume_info.block_size as usize; println!( "\n----------------------------------------------\ \nrandom {desc} with {} chunks ({} block{})", - human_bytes((cfg.blocks_per_io as u64 * ri.block_size) as f64), + human_bytes((cfg.blocks_per_io as u64 * block_size as u64) as f64), cfg.blocks_per_io, if cfg.blocks_per_io > 1 { "s" } else { "" }, ); @@ -3274,7 +3306,6 @@ async fn rand_read_write_workload( let stop = Arc::new(AtomicBool::new(false)); let byte_count = Arc::new(AtomicUsize::new(0)); - let block_size = ri.block_size as usize; let total_blocks = ri.total_blocks; let mut workers = vec![]; for _ in 0..cfg.io_depth { @@ -3424,13 +3455,14 @@ async fn bufferbloat_workload( bail!("too many blocks per IO; can't exceed {}", ri.total_blocks); } + let block_size = ri.volume_info.block_size as usize; println!( "\n----------------------------------------------\ \nbufferbloat test\ \n----------------------------------------------\ \ninitial {:?} sec random write with {} chunks ({} block{})", cfg.time_secs, - human_bytes((cfg.blocks_per_io as u64 * ri.block_size) as f64), + human_bytes((cfg.blocks_per_io as u64 * block_size as u64) as f64), cfg.blocks_per_io, if cfg.blocks_per_io > 1 { "s" } else { "" }, ); @@ -3440,7 +3472,6 @@ async fn bufferbloat_workload( let stop = Arc::new(AtomicBool::new(false)); let byte_count = Arc::new(AtomicUsize::new(0)); - let block_size = ri.block_size as usize; let total_blocks = ri.total_blocks; let mut workers = vec![]; for _ in 0..cfg.io_depth { @@ -3548,20 +3579,20 @@ async fn one_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { */ ri.write_log.update_wc(block_index); - let data = fill_vec(block_index, size, &ri.write_log, ri.block_size); + let block_size = ri.volume_info.block_size; + let data = fill_vec(block_index, size, &ri.write_log, block_size); println!("Write at block {:5}, len:{:7}", offset.0, data.len()); volume.write(offset, data).await?; - let mut data = crucible::Buffer::repeat(255, size, ri.block_size as usize); + let mut data = crucible::Buffer::repeat(255, size, block_size as usize); println!("Read at block {:5}, len:{:7}", offset.0, data.len()); volume.read(offset, &mut data).await?; let dl = data.into_bytes(); - match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false) - { + match validate_vec(dl, block_index, &mut ri.write_log, block_size, false) { ValidateStatus::Bad | ValidateStatus::InRange => { bail!("Error at {}", block_index); } @@ -3664,6 +3695,7 @@ async fn write_flush_read_workload( let mut rng = rand_chacha::ChaCha8Rng::from_entropy(); let count_width = count.to_string().len(); + let block_size = ri.volume_info.block_size; for c in 1..=count { /* * Pick a random size (in blocks) for the IO, up to the size of the @@ -3691,7 +3723,7 @@ async fn write_flush_read_workload( ri.write_log.update_wc(block_index + i); } - let data = fill_vec(block_index, size, &ri.write_log, ri.block_size); + let data = fill_vec(block_index, size, &ri.write_log, block_size); println!( "{:>0width$}/{:>0width$} IO at block {:5}, len:{:7}", @@ -3705,8 +3737,7 @@ async fn write_flush_read_workload( volume.flush(None).await?; - let mut data = - crucible::Buffer::repeat(255, size, ri.block_size as usize); + let mut data = crucible::Buffer::repeat(255, size, block_size as usize); volume.read(offset, &mut data).await?; let dl = data.into_bytes(); @@ -3714,7 +3745,7 @@ async fn write_flush_read_workload( dl, block_index, &mut ri.write_log, - ri.block_size, + block_size, false, ) { ValidateStatus::Bad | ValidateStatus::InRange => { @@ -3795,7 +3826,8 @@ async fn repair_workload( // These help the printlns use the minimum white space let count_width = count.to_string().len(); let block_width = ri.total_blocks.to_string().len(); - let size_width = (10 * ri.block_size).to_string().len(); + let block_size = ri.volume_info.block_size; + let size_width = (10 * block_size).to_string().len(); for c in 1..=count { let op = rng.gen_range(0..10); // Make sure the last few commands are not a flush @@ -3841,7 +3873,7 @@ async fn repair_workload( } let data = - fill_vec(block_index, size, &ri.write_log, ri.block_size); + fill_vec(block_index, size, &ri.write_log, block_size); print!( "{:>0width$}/{:>0width$} Write \ @@ -3864,7 +3896,7 @@ async fn repair_workload( } else { // Read let mut data = - crucible::Buffer::repeat(255, size, ri.block_size as usize); + crucible::Buffer::repeat(255, size, block_size as usize); println!( "{:>0width$}/{:>0width$} Read \ block {:>bw$} len {:>sw$}", @@ -3936,14 +3968,18 @@ async fn demo_workload( ri.write_log.update_wc(block_index + i); } - let data = - fill_vec(block_index, size, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + size, + &ri.write_log, + ri.volume_info.block_size, + ); let future = volume.write(offset, data); write_futures.push_back(future); } else { // Read - let block_size = ri.block_size as usize; + let block_size = ri.volume_info.block_size as usize; let future = { let volume = volume.clone(); tokio::spawn(async move { @@ -3993,41 +4029,100 @@ async fn demo_workload( /* * This is a test workload that generates a single write spanning an extent - * then will try to read the same. + * then will try to read the same. When multiple sub_volumes are present we + * also issue a Write/Flush/Read that will span the two sub_volumes. */ async fn span_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { - /* - * Pick the last block in the first extent - */ - let block_index = (ri.extent_size.value - 1) as usize; + let mut extent_block_start = 0; + let last_sub_volume = ri.volume_info.volumes.len() - 1; + let block_size = ri.volume_info.block_size; + for (index, sv) in ri.volume_info.volumes.iter().enumerate() { + // Pick the last block in the first extent + let extent_size = sv.blocks_per_extent as usize; + let block_index = extent_block_start + extent_size - 1; + + // Update the counter for the blocks we are about to write. + ri.write_log.update_wc(block_index); + ri.write_log.update_wc(block_index + 1); - /* - * Update the counter for the blocks we are about to write. - */ - ri.write_log.update_wc(block_index); - ri.write_log.update_wc(block_index + 1); + let offset = BlockIndex(block_index as u64); + let data = fill_vec(block_index, 2, &ri.write_log, block_size); - let offset = BlockIndex(block_index as u64); - let data = fill_vec(block_index, 2, &ri.write_log, ri.block_size); + println!( + "sub_volume:{} Block:{} Send a write spanning two extents", + index, block_index + ); + volume.write(offset, data).await?; - println!("Sending a write spanning two extents"); - volume.write(offset, data).await?; + println!("sub_volume:{index} Send a flush"); + volume.flush(None).await?; - println!("Sending a flush"); - volume.flush(None).await?; + println!( + "sub_volume:{} Block:{} Send a read spanning two extents", + index, block_index + ); + let mut data = crucible::Buffer::repeat(99, 2, block_size as usize); + volume.read(offset, &mut data).await?; + + let dl = data.into_bytes(); + match validate_vec( + dl, + block_index, + &mut ri.write_log, + block_size, + false, + ) { + ValidateStatus::Bad | ValidateStatus::InRange => { + bail!("Span read verify failed"); + } + ValidateStatus::Good => {} + } - let mut data = crucible::Buffer::repeat(99, 2, ri.block_size as usize); + // Move to the start of the next extent. + extent_block_start += extent_size * sv.extent_count as usize; - println!("Sending a read spanning two extents"); - volume.read(offset, &mut data).await?; + // If our sub volume is not the last sub volume, do an IO that will + // span this one and the next. + if index < last_sub_volume { + let block_index = extent_block_start - 1; - let dl = data.into_bytes(); - match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false) - { - ValidateStatus::Bad | ValidateStatus::InRange => { - bail!("Span read verify failed"); + // Update the counter for the blocks we are about to write. + ri.write_log.update_wc(block_index); + ri.write_log.update_wc(block_index + 1); + + let offset = BlockIndex(block_index as u64); + let data = fill_vec(block_index, 2, &ri.write_log, block_size); + + println!( + "sub_volume:{} Block:{} Send a write spanning two sub_volumes", + index, block_index + ); + volume.write(offset, data).await?; + + println!("sub_volume:{index} Send a flush"); + volume.flush(None).await?; + + println!( + "sub_volume:{} Block:{} Send a read spanning two sub volumes", + index, block_index + ); + let mut data = crucible::Buffer::repeat(99, 2, block_size as usize); + volume.read(offset, &mut data).await?; + + let dl = data.into_bytes(); + match validate_vec( + dl, + block_index, + &mut ri.write_log, + block_size, + false, + ) { + ValidateStatus::Bad | ValidateStatus::InRange => { + bail!("Span read verify failed"); + } + ValidateStatus::Good => {} + } } - ValidateStatus::Good => {} } Ok(()) } @@ -4043,7 +4138,8 @@ async fn big_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { */ ri.write_log.update_wc(block_index); - let data = fill_vec(block_index, 1, &ri.write_log, ri.block_size); + let data = + fill_vec(block_index, 1, &ri.write_log, ri.volume_info.block_size); /* * Convert block_index to its byte value. */ @@ -4053,7 +4149,11 @@ async fn big_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { volume.flush(None).await?; - let mut data = crucible::Buffer::repeat(255, 1, ri.block_size as usize); + let mut data = crucible::Buffer::repeat( + 255, + 1, + ri.volume_info.block_size as usize, + ); volume.read(offset, &mut data).await?; let dl = data.into_bytes(); @@ -4061,7 +4161,7 @@ async fn big_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { dl, block_index, &mut ri.write_log, - ri.block_size, + ri.volume_info.block_size, false, ) { ValidateStatus::Bad | ValidateStatus::InRange => { @@ -4088,7 +4188,7 @@ async fn biggest_io_workload( let biggest_io_in_blocks = { let crucible_max_io = crucible_protocol::CrucibleEncoder::max_io_blocks( - ri.block_size as usize, + ri.volume_info.block_size as usize, )?; if crucible_max_io < ri.total_blocks { @@ -4121,8 +4221,12 @@ async fn biggest_io_workload( ri.write_log.update_wc(block_index + i); } - let data = - fill_vec(block_index, next_io_blocks, &ri.write_log, ri.block_size); + let data = fill_vec( + block_index, + next_io_blocks, + &ri.write_log, + ri.volume_info.block_size, + ); println!( "IO at block:{} size in blocks:{}", @@ -4144,27 +4248,25 @@ async fn biggest_io_workload( * TODO: Make this test use the global write count, but remember, async. */ async fn dep_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { - let final_offset = ri.total_size - ri.block_size; + let final_offset = ri.total_size - ri.volume_info.block_size; let mut my_offset: u64 = 0; for my_count in 1..150 { let mut write_futures = FuturesOrdered::new(); let mut read_futures = FuturesOrdered::new(); - + let block_size = ri.volume_info.block_size; /* * Generate some number of operations */ for ioc in 0..200 { - my_offset = (my_offset + ri.block_size) % final_offset; + my_offset = (my_offset + block_size) % final_offset; if random() { /* * Generate a write buffer with a locally unique value. */ - let mut data = BytesMut::with_capacity(ri.block_size as usize); + let mut data = BytesMut::with_capacity(block_size as usize); let seed = ((my_offset % 254) + 1) as u8; - data.extend( - std::iter::repeat(seed).take(ri.block_size as usize), - ); + data.extend(std::iter::repeat(seed).take(block_size as usize)); println!( "Loop:{} send write {} @ offset:{} len:{}", @@ -4177,7 +4279,7 @@ async fn dep_workload(volume: &Volume, ri: &mut RegionInfo) -> Result<()> { write_futures.push_back(future); } else { let mut data = - crucible::Buffer::repeat(0, 1, ri.block_size as usize); + crucible::Buffer::repeat(0, 1, block_size as usize); println!( "Loop:{} send read {} @ offset:{} len:{}", diff --git a/crutest/src/protocol.rs b/crutest/src/protocol.rs index 291b07320..fdeb765fa 100644 --- a/crutest/src/protocol.rs +++ b/crutest/src/protocol.rs @@ -34,9 +34,11 @@ pub enum CliMessage { Export, // Run the fill test. Fill(bool), + // Run the sparse fill test. + FillSparse, Flush, Generic(usize, bool), - Info(u64, u64, u64), + Info(VolumeInfo, u64, usize), InfoPlease, IsActive, MyUuid(Uuid), @@ -219,7 +221,11 @@ mod tests { #[test] fn rt_info() -> Result<()> { - let input = CliMessage::Info(1, 2, 99); + let vi = VolumeInfo { + block_size: 512, + volumes: Vec::new(), + }; + let input = CliMessage::Info(vi, 2, 99); assert_eq!(input, round_trip(&input)?); Ok(()) } diff --git a/upstairs/src/block_io.rs b/upstairs/src/block_io.rs index 739dcb4c6..a455f60d1 100644 --- a/upstairs/src/block_io.rs +++ b/upstairs/src/block_io.rs @@ -48,8 +48,10 @@ impl BlockIO for FileBlockIO { Ok(()) } - async fn query_extent_size(&self) -> Result { - crucible_bail!(Unsupported, "query_extent_size unsupported",) + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + Ok(None) } async fn query_work_queue(&self) -> Result { @@ -216,10 +218,10 @@ impl BlockIO for ReqwestBlockIO { }) } - async fn query_extent_size(&self) -> Result { - Err(CrucibleError::Unsupported( - "query_extent_size unsupported".to_string(), - )) + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + Ok(None) } async fn query_is_active(&self) -> Result { diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index e01aee223..a3e13498b 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -13,7 +13,8 @@ use crate::{ BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, JobId, RawReadResponse, ReplaceResult, UpstairsAction, }; -use crucible_common::{build_logger, Block, BlockIndex, CrucibleError}; +use crucible_client_types::RegionExtentInfo; +use crucible_common::{build_logger, BlockIndex, CrucibleError}; use crucible_protocol::SnapshotDetails; use async_trait::async_trait; @@ -342,9 +343,13 @@ impl Guest { rx.wait().await } - pub async fn query_extent_size(&self) -> Result { - self.send_and_wait(|done| BlockOp::QueryExtentSize { done }) - .await + pub async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + let ei = self + .send_and_wait(|done| BlockOp::QueryExtentInfo { done }) + .await?; + Ok(Some(ei)) } pub async fn query_work_queue(&self) -> Result { @@ -463,10 +468,13 @@ impl BlockIO for Guest { self.send_and_wait(|done| BlockOp::QueryWorkQueue { done }) .await } - - async fn query_extent_size(&self) -> Result { - self.send_and_wait(|done| BlockOp::QueryExtentSize { done }) - .await + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + let ei = self + .send_and_wait(|done| BlockOp::QueryExtentInfo { done }) + .await?; + Ok(Some(ei)) } async fn total_size(&self) -> Result { diff --git a/upstairs/src/in_memory.rs b/upstairs/src/in_memory.rs index bdda4bd6f..ff3e2165f 100644 --- a/upstairs/src/in_memory.rs +++ b/upstairs/src/in_memory.rs @@ -41,8 +41,10 @@ impl BlockIO for InMemoryBlockIO { Ok(()) } - async fn query_extent_size(&self) -> Result { - crucible_bail!(Unsupported, "query_extent_size unsupported",) + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + Ok(None) } async fn query_work_queue(&self) -> Result { diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 81488f2e5..54e5af7b1 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::time::Duration; pub use crucible_client_types::{ - CrucibleOpts, ReplaceResult, VolumeConstructionRequest, + CrucibleOpts, RegionExtentInfo, ReplaceResult, VolumeConstructionRequest, }; pub use crucible_common::*; pub use crucible_protocol::*; @@ -109,8 +109,9 @@ pub trait BlockIO: Sync { async fn deactivate(&self) -> Result<(), CrucibleError>; async fn query_is_active(&self) -> Result; - - async fn query_extent_size(&self) -> Result; + async fn query_extent_info( + &self, + ) -> Result, CrucibleError>; async fn query_work_queue(&self) -> Result; // Total bytes of Volume @@ -1595,8 +1596,8 @@ pub(crate) enum BlockOp { done: BlockRes, }, // Begin testing options. - QueryExtentSize { - done: BlockRes, + QueryExtentInfo { + done: BlockRes, }, QueryWorkQueue { done: BlockRes, diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 8ebccc5c3..1d1422559 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -17,6 +17,7 @@ use crate::{ EncryptionContext, GuestIoHandle, Message, RegionDefinition, RegionDefinitionStatus, SnapshotDetails, WQCounts, }; +use crucible_client_types::RegionExtentInfo; use crucible_common::{BlockIndex, CrucibleError}; use serde::{Deserialize, Serialize}; @@ -1049,11 +1050,16 @@ impl Upstairs { }; } // Testing options - BlockOp::QueryExtentSize { done } => { + BlockOp::QueryExtentInfo { done } => { // Yes, test only match self.ddef.get_def() { Some(rd) => { - done.send_ok(rd.extent_size()); + let ei = RegionExtentInfo { + block_size: rd.block_size(), + blocks_per_extent: rd.extent_size().value, + extent_count: rd.extent_count(), + }; + done.send_ok(ei); } None => { warn!( diff --git a/upstairs/src/volume.rs b/upstairs/src/volume.rs index b58d055d9..2c9b964be 100644 --- a/upstairs/src/volume.rs +++ b/upstairs/src/volume.rs @@ -611,6 +611,36 @@ impl Volume { Ok(()) } + + pub async fn volume_extent_info( + &self, + ) -> Result { + // A volume has multiple levels of extent info, not just one. + let mut volumes = Vec::new(); + + for sub_volume in &self.sub_volumes { + match sub_volume.query_extent_info().await? { + Some(ei) => { + assert_eq!(self.block_size, ei.block_size); + let svi = SubVolumeInfo { + blocks_per_extent: ei.blocks_per_extent, + extent_count: ei.extent_count, + }; + volumes.push(svi); + } + None => { + continue; + } + } + } + + Ok(VolumeInfo { + block_size: self.block_size, + volumes, + }) + + // TODO: add support for read only parents + } } #[async_trait] @@ -682,22 +712,12 @@ impl BlockIO for VolumeInner { // Return a vec of these? // Return a struct with a vec for SV and Some/None for ROP? - async fn query_extent_size(&self) -> Result { - // ZZZ this needs more info, what if ROP and SV differ? - for sub_volume in &self.sub_volumes { - match sub_volume.query_extent_size().await { - Ok(es) => { - return Ok(es); - } - _ => { - continue; - } - } - } - if let Some(ref read_only_parent) = &self.read_only_parent { - return read_only_parent.query_extent_size().await; - } - crucible_bail!(IoError, "Cannot determine extent size"); + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + // A volume has multiple levels of extent info, not just one. + // To get the same info in the proper form, use volume_extent_info() + Ok(None) } async fn deactivate(&self) -> Result<(), CrucibleError> { @@ -964,6 +984,17 @@ impl BlockIO for VolumeInner { } } +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct VolumeInfo { + pub block_size: u64, + pub volumes: Vec, +} +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct SubVolumeInfo { + pub blocks_per_extent: u64, + pub extent_count: u32, +} + // Traditional subvolume is just one region set impl SubVolume { // Compute sub volume LBA from total volume LBA. @@ -1061,8 +1092,10 @@ impl BlockIO for SubVolume { async fn query_work_queue(&self) -> Result { self.block_io.query_work_queue().await } - async fn query_extent_size(&self) -> Result { - self.block_io.query_extent_size().await + async fn query_extent_info( + &self, + ) -> Result, CrucibleError> { + self.block_io.query_extent_info().await } async fn deactivate(&self) -> Result<(), CrucibleError> {