Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: remove AdjacentVectoredReadBuilder and bump minmimum io_buffer_alignment to 512 #9175

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/_build-and-test-locally.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,7 @@ jobs:

# run pageserver tests with different settings
for io_engine in std-fs tokio-epoll-uring ; do
for io_buffer_alignment in 0 1 512 ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine NEON_PAGESERVER_UNIT_TEST_IO_BUFFER_ALIGNMENT=$io_buffer_alignment ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
done

# Run separate tests for real S3
Expand Down
12 changes: 6 additions & 6 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
Expand Down Expand Up @@ -1133,7 +1133,7 @@ impl DeltaLayerInner {
ctx: &RequestContext,
) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
};
use futures::stream::TryStreamExt;

Expand Down Expand Up @@ -1183,8 +1183,8 @@ impl DeltaLayerInner {

let mut prev: Option<(Key, Lsn, BlobRef)> = None;

let mut read_builder: Option<VectoredReadBuilder> = None;
let read_mode = VectoredReadCoalesceMode::get();
let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
let align = virtual_file::get_io_buffer_alignment();

let max_read_size = self
.max_vectored_read_bytes
Expand Down Expand Up @@ -1228,12 +1228,12 @@ impl DeltaLayerInner {
{
None
} else {
read_builder.replace(VectoredReadBuilder::new(
read_builder.replace(ChunkedVectoredReadBuilder::new(
offsets.start.pos(),
offsets.end.pos(),
meta,
max_read_size,
read_mode,
align,
))
}
} else {
Expand Down
216 changes: 36 additions & 180 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,171 +185,7 @@ pub(crate) enum VectoredReadExtended {
No,
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum VectoredReadCoalesceMode {
/// Only coalesce exactly adjacent reads.
AdjacentOnly,
/// In addition to adjacent reads, also consider reads whose corresponding
/// `end` and `start` offsets reside at the same chunk.
Chunked(usize),
}

impl VectoredReadCoalesceMode {
/// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
/// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
pub(crate) fn get() -> Self {
let align = virtual_file::get_io_buffer_alignment_raw();
if align == 0 {
VectoredReadCoalesceMode::AdjacentOnly
} else {
VectoredReadCoalesceMode::Chunked(align)
}
}
}

pub(crate) enum VectoredReadBuilder {
Adjacent(AdjacentVectoredReadBuilder),
Chunked(ChunkedVectoredReadBuilder),
}

impl VectoredReadBuilder {
fn new_impl(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: Option<usize>,
mode: VectoredReadCoalesceMode,
) -> Self {
match mode {
VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
),
VectoredReadCoalesceMode::Chunked(chunk_size) => {
Self::Chunked(ChunkedVectoredReadBuilder::new(
start_offset,
end_offset,
meta,
max_read_size,
chunk_size,
))
}
}
}

pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
mode: VectoredReadCoalesceMode,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
}

pub(crate) fn new_streaming(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
mode: VectoredReadCoalesceMode,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, None, mode)
}

pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
match self {
VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
}
}

pub(crate) fn build(self) -> VectoredRead {
match self {
VectoredReadBuilder::Adjacent(builder) => builder.build(),
VectoredReadBuilder::Chunked(builder) => builder.build(),
}
}

pub(crate) fn size(&self) -> usize {
match self {
VectoredReadBuilder::Adjacent(builder) => builder.size(),
VectoredReadBuilder::Chunked(builder) => builder.size(),
}
}
}

pub(crate) struct AdjacentVectoredReadBuilder {
/// Start offset of the read.
start: u64,
// End offset of the read.
end: u64,
/// Start offset and metadata for each blob in this read
blobs_at: VecMap<u64, BlobMeta>,
max_read_size: Option<usize>,
}

impl AdjacentVectoredReadBuilder {
/// Start building a new vectored read.
///
/// Note that by design, this does not check against reading more than `max_read_size` to
/// support reading larger blobs than the configuration value. The builder will be single use
/// however after that.
pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: Option<usize>,
) -> Self {
let mut blobs_at = VecMap::default();
blobs_at
.append(start_offset, meta)
.expect("First insertion always succeeds");

Self {
start: start_offset,
end: end_offset,
blobs_at,
max_read_size,
}
}
/// Attempt to extend the current read with a new blob if the start
/// offset matches with the current end of the vectored read
/// and the resuting size is below the max read size
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let size = (end - start) as usize;
let not_limited_by_max_read_size = {
if let Some(max_read_size) = self.max_read_size {
self.size() + size <= max_read_size
} else {
true
}
};

if self.end == start && not_limited_by_max_read_size {
self.end = end;
self.blobs_at
.append(start, meta)
.expect("LSNs are ordered within vectored reads");

return VectoredReadExtended::Yes;
}

VectoredReadExtended::No
}

pub(crate) fn size(&self) -> usize {
(self.end - self.start) as usize
}

pub(crate) fn build(self) -> VectoredRead {
VectoredRead {
start: self.start,
end: self.end,
blobs_at: self.blobs_at,
}
}
}

/// A vectored read builder that tries to coalesce all reads that fits in a chunk.
pub(crate) struct ChunkedVectoredReadBuilder {
/// Start block number
start_blk_no: usize,
Expand All @@ -373,7 +209,7 @@ impl ChunkedVectoredReadBuilder {
/// Note that by design, this does not check against reading more than `max_read_size` to
/// support reading larger blobs than the configuration value. The builder will be single use
/// however after that.
pub(crate) fn new(
fn new_impl(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
Expand All @@ -396,6 +232,25 @@ impl ChunkedVectoredReadBuilder {
}
}

pub(crate) fn new(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
max_read_size: usize,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), align)
}

pub(crate) fn new_streaming(
start_offset: u64,
end_offset: u64,
meta: BlobMeta,
align: usize,
) -> Self {
Self::new_impl(start_offset, end_offset, meta, None, align)
}

/// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
///
/// The resulting size also must be below the max read size.
Expand Down Expand Up @@ -474,17 +329,17 @@ pub struct VectoredReadPlanner {

max_read_size: usize,

mode: VectoredReadCoalesceMode,
align: usize,
}

impl VectoredReadPlanner {
pub fn new(max_read_size: usize) -> Self {
let mode = VectoredReadCoalesceMode::get();
let align = virtual_file::get_io_buffer_alignment();
Self {
blobs: BTreeMap::new(),
prev: None,
max_read_size,
mode,
align,
}
}

Expand Down Expand Up @@ -545,7 +400,7 @@ impl VectoredReadPlanner {
}

pub fn finish(self) -> Vec<VectoredRead> {
let mut current_read_builder: Option<VectoredReadBuilder> = None;
let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
let mut reads = Vec::new();

for (key, blobs_for_key) in self.blobs {
Expand All @@ -558,12 +413,12 @@ impl VectoredReadPlanner {
};

if extended == VectoredReadExtended::No {
let next_read_builder = VectoredReadBuilder::new(
let next_read_builder = ChunkedVectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.max_read_size,
self.mode,
self.align,
);

let prev_read_builder = current_read_builder.replace(next_read_builder);
Expand Down Expand Up @@ -688,7 +543,7 @@ impl<'a> VectoredBlobReader<'a> {
/// `handle` gets called and when the current key would just exceed the read_size and
/// max_cnt constraints.
pub struct StreamingVectoredReadPlanner {
read_builder: Option<VectoredReadBuilder>,
read_builder: Option<ChunkedVectoredReadBuilder>,
// Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64)>,
/// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
Expand All @@ -699,21 +554,21 @@ pub struct StreamingVectoredReadPlanner {
/// Size of the current batch
cnt: usize,

mode: VectoredReadCoalesceMode,
align: usize,
}

impl StreamingVectoredReadPlanner {
pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
assert!(max_cnt > 0);
assert!(max_read_size > 0);
let mode = VectoredReadCoalesceMode::get();
let align = virtual_file::get_io_buffer_alignment();
Self {
read_builder: None,
prev: None,
max_cnt,
max_read_size,
cnt: 0,
mode,
align,
}
}

Expand Down Expand Up @@ -762,11 +617,11 @@ impl StreamingVectoredReadPlanner {
}
None => {
self.read_builder = {
Some(VectoredReadBuilder::new_streaming(
Some(ChunkedVectoredReadBuilder::new_streaming(
start_offset,
end_offset,
BlobMeta { key, lsn },
self.mode,
self.align,
))
};
}
Expand Down Expand Up @@ -1092,7 +947,7 @@ mod tests {
let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
let mut buf = BytesMut::with_capacity(reserved_bytes);

let mode = VectoredReadCoalesceMode::get();
let align = virtual_file::get_io_buffer_alignment();
let vectored_blob_reader = VectoredBlobReader::new(&file);
let meta = BlobMeta {
key: Key::MIN,
Expand All @@ -1104,7 +959,8 @@ mod tests {
if idx + 1 == offsets.len() {
continue;
}
let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
let read_builder =
ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, align);
let read = read_builder.build();
let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
assert_eq!(result.blobs.len(), 1);
Expand Down
Loading
Loading