Skip to content

Commit

Permalink
review: do some testing of parallel IO
Browse files Browse the repository at this point in the history
  • Loading branch information
VladLazar committed Nov 4, 2024
1 parent 657526a commit dba6968
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 37 deletions.
85 changes: 48 additions & 37 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5134,7 +5134,7 @@ mod tests {
use pageserver_api::value::Value;
use pageserver_compaction::helpers::overlaps_with;
use rand::{thread_rng, Rng};
use storage_layer::PersistentLayerKey;
use storage_layer::{PersistentLayerKey, SelectedIoConcurrency};
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::DeltaLayerTestDesc;
Expand Down Expand Up @@ -5970,48 +5970,59 @@ mod tests {
// Pick a big LSN such that we query over all the changes.
let reads_lsn = Lsn(u64::MAX - 1);

for read in reads {
info!("Doing vectored read on {:?}", read);
let io_concurrency_levels = vec![
SelectedIoConcurrency::Serial,
SelectedIoConcurrency::Parallel,
];

let vectored_res = tline
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new(),
&ctx,
)
.await;
for io_concurrency_level in io_concurrency_levels {
for read in reads.clone() {
info!(
"Doing vectored read on {:?} with IO concurrency {:?}",
read, io_concurrency_level
);

let mut expected_lsns: HashMap<Key, Lsn> = Default::default();
let mut expect_missing = false;
let mut key = read.start().unwrap();
while key != read.end().unwrap() {
if let Some(lsns) = inserted.get(&key) {
let expected_lsn = lsns.iter().rfind(|lsn| **lsn <= reads_lsn);
match expected_lsn {
Some(lsn) => {
expected_lsns.insert(key, *lsn);
}
None => {
expect_missing = true;
break;
let vectored_res = tline
.get_vectored_impl(
read.clone(),
reads_lsn,
&mut ValuesReconstructState::new_with_io_concurrency(io_concurrency_level),
&ctx,
)
.await;

let mut expected_lsns: HashMap<Key, Lsn> = Default::default();
let mut expect_missing = false;
let mut key = read.start().unwrap();
while key != read.end().unwrap() {
if let Some(lsns) = inserted.get(&key) {
let expected_lsn = lsns.iter().rfind(|lsn| **lsn <= reads_lsn);
match expected_lsn {
Some(lsn) => {
expected_lsns.insert(key, *lsn);
}
None => {
expect_missing = true;
break;
}
}
} else {
expect_missing = true;
break;
}
} else {
expect_missing = true;
break;
}

key = key.next();
}
key = key.next();
}

if expect_missing {
assert!(matches!(vectored_res, Err(GetVectoredError::MissingKey(_))));
} else {
for (key, image) in vectored_res? {
let expected_lsn = expected_lsns.get(&key).expect("determined above");
let expected_image = test_img(&format!("{} at {}", key.field6, expected_lsn));
assert_eq!(image?, expected_image);
if expect_missing {
assert!(matches!(vectored_res, Err(GetVectoredError::MissingKey(_))));
} else {
for (key, image) in vectored_res? {
let expected_lsn = expected_lsns.get(&key).expect("determined above");
let expected_image =
test_img(&format!("{} at {}", key.field6, expected_lsn));
assert_eq!(image?, expected_image);
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions pageserver/src/tenant/storage_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ impl IoConcurrency {
}
}

#[cfg(test)]
#[derive(Debug, Copy, Clone)]
pub(crate) enum SelectedIoConcurrency {
Serial,
Parallel,
}

impl ValuesReconstructState {
pub(crate) fn new() -> Self {
Self {
Expand All @@ -216,6 +223,21 @@ impl ValuesReconstructState {
}
}

#[cfg(test)]
pub(crate) fn new_with_io_concurrency(io_concurrency: SelectedIoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),
keys_with_image_coverage: None,
layers_visited: 0,
delta_layers_visited: 0,
io_concurrency: match io_concurrency {
SelectedIoConcurrency::Serial => IoConcurrency::Serial { prev_io: None },
SelectedIoConcurrency::Parallel => IoConcurrency::Parallel,
},
}
}

pub(crate) fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
Expand Down

0 comments on commit dba6968

Please sign in to comment.