diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 893f883618e30..0737a9e21a247 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -4,9 +4,16 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache; -use crate::virtual_file::{self, VirtualFile}; +use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; +use crate::virtual_file::owned_buffers_io::util::size_tracking_writer; +use crate::virtual_file::owned_buffers_io::write::Buffer; +use crate::virtual_file::{self, owned_buffers_io, VirtualFile}; +use anyhow::Context; +use bytes::BytesMut; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; +use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; +use tracing::error; use std::io; use std::sync::atomic::AtomicU64; @@ -15,14 +22,20 @@ use utils::id::TimelineId; pub struct EphemeralFile { _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, - - rw: page_caching::RW, + page_cache_file_id: page_cache::FileId, + buffered_writer: size_tracking_writer::Writer< + owned_buffers_io::write::BufferedWriter< + BytesMut, + size_tracking_writer::Writer, + >, + >, + /// Gate guard is held on as long as we need to do operations in the path (delete on drop) + _gate_guard: utils::sync::gate::GateGuard, } -pub(super) mod page_caching; - use super::storage_layer::inmemory_layer::InMemoryLayerIndexValue; -mod zero_padded_read_write; + +const TAIL_SZ: usize = 64 * 1024; impl EphemeralFile { pub async fn create( @@ -52,33 +65,157 @@ impl EphemeralFile { ) .await?; + let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore + Ok(EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, - rw: page_caching::RW::new(file, gate_guard), + page_cache_file_id, + buffered_writer: size_tracking_writer::Writer::new( + owned_buffers_io::write::BufferedWriter::new( + size_tracking_writer::Writer::new(file), + BytesMut::with_capacity(TAIL_SZ), + ), + ), + _gate_guard: gate_guard, }) } +} +impl Drop for EphemeralFile { + fn drop(&mut self) { + // unlink the file + // we are clear to do this, because we have entered a gate + let path = &self.buffered_writer.as_inner().as_inner().as_inner().path; + let res = std::fs::remove_file(path); + if let Err(e) = res { + if e.kind() != std::io::ErrorKind::NotFound { + // just never log the not found errors, we cannot do anything for them; on detach + // the tenant directory is already gone. + // + // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 + error!("could not remove ephemeral file '{path}': {e}"); + } + } + } +} + +impl EphemeralFile { pub(crate) fn len(&self) -> u32 { - self.rw.bytes_written() + u32::try_from(self.buffered_writer.bytes_written()) + .expect("we don't allow writing more than u32::MAX bytes") } pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId { - self.rw.page_cache_file_id() + self.page_cache_file_id } - /// See [`self::page_caching::RW::load_to_vec`]. pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { - self.rw.load_to_vec(ctx).await + let size = usize::try_from(self.len()).unwrap(); + let vec = Vec::with_capacity(size); + + // read from disk what we've already flushed + let buffered_writer = self.buffered_writer.as_inner(); + let file_size_tracking_writer = buffered_writer.as_inner(); + let flushed_offset = usize::try_from(file_size_tracking_writer.bytes_written()).unwrap(); + let flushed_range = 0..flushed_offset; + let file: &VirtualFile = file_size_tracking_writer.as_inner(); + let mut vec = file + .read_exact_at( + vec.slice(0..(flushed_range.end - flushed_range.start)), + u64::try_from(flushed_range.start).unwrap(), + ctx, + ) + .await? + .into_inner(); + + // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk + let buffer = buffered_writer.inspect_buffer(); + let buffered = &buffer[0..buffer.pending()]; + vec.extend_from_slice(buffered); + assert_eq!(vec.len(), size); + Ok(vec) } - pub(crate) async fn read_page( + /// Fill dst will dst.bytes_total() bytes from the bytes written to the buffered writer from offset `start` and later. + /// If `dst` is larger than the available bytes, the read will be short. + /// The read will never be short for other reasons. + /// The number of bytes read into `dst` is returned as part of the result tuple. + /// No guarantees are made about the remaining bytes in `dst`, i.e., assume their contents are random. + pub(crate) async fn read_at_to_end( &self, - blknum: u32, - dst: page_caching::PageBuf, + start: u32, + dst: Slice, ctx: &RequestContext, - ) -> Result { - self.rw.read_page(blknum, dst, ctx).await + ) -> std::io::Result<(Slice, usize)> { + let total_available = u32::try_from(self.buffered_writer.bytes_written()) + .expect("we don't allow writing more than u32::MAX bytes"); + + let buffered_writer = self.buffered_writer.as_inner(); + let file_size_tracking_writer = buffered_writer.as_inner(); + let flushed_offset = u32::try_from(file_size_tracking_writer.bytes_written()) + .expect("we don't allow writing more than u32::MAX bytes"); + + let buffer = buffered_writer.inspect_buffer(); + let buffered = &buffer[0..buffer.pending()]; + + let dst_len = u32::try_from(dst.len()) + .with_context(|| format!("read_aligned: dst.len() is too large: {}", dst.len())) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + let end = { + let mut end = start + .checked_add(dst_len) + .with_context(|| { + format!("read_aligned: offset + dst.len() is too large: {start} + {dst_len}",) + }) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + if end > total_available { + end = total_available; + } + end + }; + + struct Range(u32, u32); + impl Range { + fn len(&self) -> u32 { + assert!(self.1 >= self.0); + self.1 - self.0 + } + } + let written_range = Range(start, std::cmp::min(end, flushed_offset)); + let buffered_range = Range(flushed_offset, std::cmp::max(flushed_offset, end)); + + let dst = if written_range.len() > 0 { + let file: &VirtualFile = file_size_tracking_writer.as_inner(); + let bounds = dst.bounds(); + let slice = file + .read_exact_at( + dst.slice(0..written_range.len() as usize), + u64::try_from(start).unwrap(), + ctx, + ) + .await?; + Slice::from_buf_bounds(Slice::into_inner(slice), bounds) + } else { + dst + }; + + let dst = if buffered_range.len() > 0 { + let offset_in_buffer = (start + written_range.len() - flushed_offset) as usize; + let to_copy = + &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len() as usize)]; + let bounds = dst.bounds(); + let mut view = dst.slice(written_range.len() as usize..buffered_range.len() as usize); + view.as_mut_rust_slice_full_zeroed() + .copy_from_slice(to_copy); + Slice::from_buf_bounds(Slice::into_inner(view), bounds) + } else { + dst + }; + + // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs + + Ok((dst, (end - start) as usize)) } pub(crate) async fn write_blob( @@ -87,7 +224,8 @@ impl EphemeralFile { will_init: bool, ctx: &RequestContext, ) -> Result { - let pos = self.rw.bytes_written(); + let pos = u32::try_from(self.buffered_writer.bytes_written()) + .expect("this function enforces we never write more than u32::MAX bytes"); let len = u32::try_from(buf.len()).map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, @@ -104,7 +242,10 @@ impl EphemeralFile { ) })?; - self.rw.write_all_borrowed(buf, ctx).await?; + self.buffered_writer + .as_inner_mut() + .write_buffered_borrowed(buf, ctx) + .await?; Ok(InMemoryLayerIndexValue { pos, diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs deleted file mode 100644 index d3e6f232cceef..0000000000000 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ /dev/null @@ -1,265 +0,0 @@ -//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the -//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`]. - -use crate::context::RequestContext; -use crate::page_cache::{self, PAGE_SZ}; -use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; -use crate::virtual_file::VirtualFile; - -use std::io::{self, ErrorKind}; -use std::ops::{Deref, Range}; -use tokio_epoll_uring::BoundedBuf; -use tracing::*; - -use super::zero_padded_read_write; - -/// See module-level comment. -pub struct RW { - page_cache_file_id: page_cache::FileId, - rw: super::zero_padded_read_write::RW, - /// Gate guard is held on as long as we need to do operations in the path (delete on drop). - _gate_guard: utils::sync::gate::GateGuard, -} - -/// Result of [`RW::read_page`]. -pub(crate) enum ReadResult<'a> { - EphemeralFileMutableTail(PageBuf, &'a [u8; PAGE_SZ]), - Owned(PageBuf), -} - -impl ReadResult<'_> { - pub(crate) fn contents(&self) -> &[u8; PAGE_SZ] { - match self { - ReadResult::EphemeralFileMutableTail(_, buf) => buf, - ReadResult::Owned(buf) => buf.deref(), - } - } - pub(crate) fn into_page_buf(self) -> PageBuf { - match self { - ReadResult::EphemeralFileMutableTail(buf, _) => buf, - ReadResult::Owned(buf) => buf, - } - } -} - -pub(crate) struct PageBuf(Box<[u8; PAGE_SZ]>); - -impl From> for PageBuf { - fn from(buf: Box<[u8; PAGE_SZ]>) -> Self { - Self(buf) - } -} - -impl Deref for PageBuf { - type Target = [u8; PAGE_SZ]; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -// Safety: `PageBuf` is a fixed-size buffer that is zero-initialized. -unsafe impl tokio_epoll_uring::IoBuf for PageBuf { - fn stable_ptr(&self) -> *const u8 { - self.0.as_ptr() - } - - fn bytes_init(&self) -> usize { - self.0.len() - } - - fn bytes_total(&self) -> usize { - self.0.len() - } -} - -// Safety: the `&mut self` guarantees no aliasing. `set_init` is safe -// because the buffer is always fully initialized. -unsafe impl tokio_epoll_uring::IoBufMut for PageBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.0.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - // this is a no-op because the buffer is always fully initialized - assert!(pos <= self.0.len()); - } -} - -impl RW { - pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self { - let page_cache_file_id = page_cache::next_file_id(); - Self { - page_cache_file_id, - rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(file)), - _gate_guard, - } - } - - pub fn page_cache_file_id(&self) -> page_cache::FileId { - self.page_cache_file_id - } - - pub(crate) async fn write_all_borrowed( - &mut self, - srcbuf: &[u8], - ctx: &RequestContext, - ) -> Result<(), io::Error> { - // It doesn't make sense to proactively fill the page cache on the Pageserver write path - // because Compute is unlikely to access recently written data. - self.rw.write_all_borrowed(srcbuf, ctx).await.map(|_| ()) - } - - pub(crate) fn bytes_written(&self) -> u32 { - self.rw.bytes_written() - } - - /// Load all blocks that can be read via [`Self::read_page`] into a contiguous memory buffer. - /// - /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer. - /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`]. - pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { - // round up to the next PAGE_SZ multiple, required by blob_io - let size = { - let s = usize::try_from(self.bytes_written()).unwrap(); - if s % PAGE_SZ == 0 { - s - } else { - s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap() - } - }; - let vec = Vec::with_capacity(size); - - // read from disk what we've already flushed - let writer = self.rw.as_writer(); - let flushed_range = writer.written_range(); - let mut vec = writer - .file - .read_exact_at( - vec.slice(0..(flushed_range.end - flushed_range.start)), - u64::try_from(flushed_range.start).unwrap(), - ctx, - ) - .await? - .into_inner(); - - // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk - let buffered = self.rw.get_tail_zero_padded(); - vec.extend_from_slice(buffered); - assert_eq!(vec.len(), size); - assert_eq!(vec.len() % PAGE_SZ, 0); - Ok(vec) - } - - pub(crate) async fn read_page( - &self, - blknum: u32, - buf: PageBuf, - ctx: &RequestContext, - ) -> Result { - match self.rw.read_blk(blknum).await? { - zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => { - let buf = writer - .file - .read_exact_at(buf.slice_full(), blknum as u64 * PAGE_SZ as u64, ctx) - .await - .map(|slice| slice.into_inner())?; - Ok(ReadResult::Owned(buf)) - } - zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { - buffer: tail_ref, - } => Ok(ReadResult::EphemeralFileMutableTail(buf, tail_ref)), - } - } -} - -impl Drop for RW { - fn drop(&mut self) { - // There might still be pages in the [`crate::page_cache`] for this file. - // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed. - - // unlink the file - // we are clear to do this, because we have entered a gate - let res = std::fs::remove_file(&self.rw.as_writer().file.path); - if let Err(e) = res { - if e.kind() != std::io::ErrorKind::NotFound { - // just never log the not found errors, we cannot do anything for them; on detach - // the tenant directory is already gone. - // - // not found files might also be related to https://github.com/neondatabase/neon/issues/2442 - error!( - "could not remove ephemeral file '{}': {}", - self.rw.as_writer().file.path, - e - ); - } - } - } -} - -struct PreWarmingWriter { - nwritten_blocks: u32, - file: VirtualFile, -} - -impl PreWarmingWriter { - fn new(file: VirtualFile) -> Self { - Self { - nwritten_blocks: 0, - file, - } - } - - /// Return the byte range within `file` that has been written though `write_all`. - /// - /// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`. - fn written_range(&self) -> (impl Deref> + '_) { - let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap(); - struct Wrapper(Range); - impl Deref for Wrapper { - type Target = Range; - fn deref(&self) -> &Range { - &self.0 - } - } - Wrapper(0..nwritten_blocks * PAGE_SZ) - } -} - -impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { - async fn write_all( - &mut self, - buf: FullSlice, - ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let buflen = buf.len(); - assert_eq!( - buflen % PAGE_SZ, - 0, - "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used" - ); - - // Do the IO. - let buf = match self.file.write_all(buf, ctx).await { - (buf, Ok(nwritten)) => { - assert_eq!(nwritten, buflen); - buf - } - (_, Err(e)) => { - return Err(std::io::Error::new( - ErrorKind::Other, - // order error before path because path is long and error is short - format!( - "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}", - self.nwritten_blocks, buflen, e, self.file.path, - ), - )); - } - }; - - let nblocks = buflen / PAGE_SZ; - let nblocks32 = u32::try_from(nblocks).unwrap(); - self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); - Ok((buflen, buf)) - } -} diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs deleted file mode 100644 index 063fd854c5883..0000000000000 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! The heart of how [`super::EphemeralFile`] does its reads and writes. -//! -//! # Writes -//! -//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`]. -//! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`]. -//! -//! # Reads -//! -//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`]. -//! -//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer -//! or redirects the caller to read from the underlying [`OwnedAsyncWriter`] -//! if the read is for the prefix that has already been flushed. -//! -//! # Current Usage -//! -//! The current user of this module is [`super::page_caching::RW`]. - -mod zero_padded; - -use anyhow::Context; - -use crate::{ - context::RequestContext, - page_cache::PAGE_SZ, - virtual_file::owned_buffers_io::{ - self, - write::{Buffer, OwnedAsyncWriter}, - }, -}; - -const TAIL_SZ: usize = 64 * 1024; - -/// See module-level comment. -pub struct RW { - buffered_writer: owned_buffers_io::write::BufferedWriter< - zero_padded::Buffer, - owned_buffers_io::util::size_tracking_writer::Writer, - >, -} - -pub enum ReadResult<'a, W> { - NeedsReadFromWriter { writer: &'a W }, - ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] }, -} - -impl RW -where - W: OwnedAsyncWriter, -{ - pub fn new(writer: W) -> Self { - let bytes_flushed_tracker = - owned_buffers_io::util::size_tracking_writer::Writer::new(writer); - let buffered_writer = owned_buffers_io::write::BufferedWriter::new( - bytes_flushed_tracker, - zero_padded::Buffer::default(), - ); - Self { buffered_writer } - } - - pub(crate) fn as_writer(&self) -> &W { - self.buffered_writer.as_inner().as_inner() - } - - pub async fn write_all_borrowed( - &mut self, - buf: &[u8], - ctx: &RequestContext, - ) -> std::io::Result { - self.buffered_writer.write_buffered_borrowed(buf, ctx).await - } - - pub fn bytes_written(&self) -> u32 { - let flushed_offset = self.buffered_writer.as_inner().bytes_written(); - let flushed_offset = u32::try_from(flushed_offset).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset}")).unwrap(); - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffer_pending = u32::try_from(buffer.pending()).expect("TAIL_SZ is < u32::MAX"); - flushed_offset.checked_add(buffer_pending).with_context(|| format!("buffered_writer.write_buffered_borrowed() disallows sizes larger than u32::MAX: {flushed_offset} + {buffer_pending}")).unwrap() - } - - /// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`]. - pub fn get_tail_zero_padded(&self) -> &[u8] { - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffer_written_up_to = buffer.pending(); - // pad to next page boundary - let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 { - buffer_written_up_to - } else { - buffer_written_up_to - .checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ)) - .unwrap() - }; - &buffer.as_zero_padded_slice()[0..read_up_to] - } - - pub(crate) async fn read_blk(&self, blknum: u32) -> Result, std::io::Error> { - let flushed_offset = - u32::try_from(self.buffered_writer.as_inner().bytes_written()).expect(""); - let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); - let buffered_offset = flushed_offset + u32::try_from(buffer.pending()).unwrap(); - let page_sz = u32::try_from(PAGE_SZ).unwrap(); - let read_offset = blknum.checked_mul(page_sz).unwrap(); - - // The trailing page ("block") might only be partially filled, - // yet the blob_io code relies on us to return a full PAGE_SZed slice anyway. - // Moreover, it has to be zero-padded, because when we still had - // a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it. - // DeltaLayer probably has the same issue, not sure why it needs no special treatment. - // => check here that the read doesn't go beyond this potentially trailing - // => the zero-padding is done in the `else` branch below - let blocks_written = if buffered_offset % page_sz == 0 { - buffered_offset / page_sz - } else { - (buffered_offset / page_sz) + 1 - }; - if blknum >= blocks_written { - return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}"))); - } - - // assertions for the `if-else` below - assert_eq!( - flushed_offset % (u32::try_from(TAIL_SZ).unwrap()), 0, - "we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks" - ); - assert_eq!( - flushed_offset % page_sz, - 0, - "the logic below can't handle if the page is spread across the flushed part and the buffer" - ); - - if read_offset < flushed_offset { - assert!(read_offset + page_sz <= flushed_offset); - Ok(ReadResult::NeedsReadFromWriter { - writer: self.as_writer(), - }) - } else { - let read_offset_in_buffer = read_offset - .checked_sub(flushed_offset) - .expect("would have taken `if` branch instead of this one"); - let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap(); - let zero_padded_slice = buffer.as_zero_padded_slice(); - let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)]; - Ok(ReadResult::ServedFromZeroPaddedMutableTail { - buffer: page - .try_into() - .expect("the slice above got it as page-size slice"), - }) - } - } -} diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs deleted file mode 100644 index 2dc0277638e78..0000000000000 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! A [`crate::virtual_file::owned_buffers_io::write::Buffer`] whose -//! unwritten range is guaranteed to be zero-initialized. -//! This is used by [`crate::tenant::ephemeral_file::zero_padded_read_write::RW::read_blk`] -//! to serve page-sized reads of the trailing page when the trailing page has only been partially filled. - -use std::mem::MaybeUninit; - -use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; - -/// See module-level comment. -pub struct Buffer { - allocation: Box<[u8; N]>, - written: usize, -} - -impl Default for Buffer { - fn default() -> Self { - Self { - allocation: Box::new( - // SAFETY: zeroed memory is a valid [u8; N] - unsafe { MaybeUninit::zeroed().assume_init() }, - ), - written: 0, - } - } -} - -impl Buffer { - #[inline(always)] - fn invariants(&self) { - // don't check by default, unoptimized is too expensive even for debug mode - if false { - debug_assert!(self.written <= N, "{}", self.written); - debug_assert!(self.allocation[self.written..N].iter().all(|v| *v == 0)); - } - } - - pub fn as_zero_padded_slice(&self) -> &[u8; N] { - &self.allocation - } -} - -impl crate::virtual_file::owned_buffers_io::write::Buffer for Buffer { - type IoBuf = Self; - - fn cap(&self) -> usize { - self.allocation.len() - } - - fn extend_from_slice(&mut self, other: &[u8]) { - self.invariants(); - let remaining = self.allocation.len() - self.written; - if other.len() > remaining { - panic!("calling extend_from_slice() with insufficient remaining capacity"); - } - self.allocation[self.written..(self.written + other.len())].copy_from_slice(other); - self.written += other.len(); - self.invariants(); - } - - fn pending(&self) -> usize { - self.written - } - - fn flush(self) -> FullSlice { - self.invariants(); - let written = self.written; - FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written)) - } - - fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { - let Self { - mut allocation, - written, - } = iobuf; - allocation[0..written].fill(0); - let new = Self { - allocation, - written: 0, - }; - new.invariants(); - new - } -} - -/// We have this trait impl so that the `flush` method in the `Buffer` impl above can produce a -/// [`tokio_epoll_uring::BoundedBuf::slice`] of the [`Self::written`] range of the data. -/// -/// Remember that bytes_init is generally _not_ a tracker of the amount -/// of valid data in the io buffer; we use `Slice` for that. -/// The `IoBuf` is _only_ for keeping track of uninitialized memory, a bit like MaybeUninit. -/// -/// SAFETY: -/// -/// The [`Self::allocation`] is stable becauses boxes are stable. -/// The memory is zero-initialized, so, bytes_init is always N. -unsafe impl tokio_epoll_uring::IoBuf for Buffer { - fn stable_ptr(&self) -> *const u8 { - self.allocation.as_ptr() - } - - fn bytes_init(&self) -> usize { - // Yes, N, not self.written; Read the full comment of this impl block! - N - } - - fn bytes_total(&self) -> usize { - N - } -} diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 3b7aa37c476b8..5b5c544caf10d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -6,9 +6,7 @@ //! use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value}; -use crate::tenant::ephemeral_file::page_caching::PageBuf; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; @@ -24,6 +22,7 @@ use pageserver_api::shard::TenantShardId; use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Mutex, OnceLock}; use std::time::Instant; +use tokio_epoll_uring::BoundedBuf; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; // avoid binding to Write (conflicts with std::io::Write) @@ -318,40 +317,45 @@ impl InMemoryLayer { } } + const DIO_CHUNK_SIZE: usize = 512; + // Plan which parts of which pages need to be appended to which value_buf - struct PageReadDestination<'a> { + struct ChunkReadDestination<'a> { value_read: &'a ValueRead, - offset_in_page: u32, + offset_in_chunk: u32, len: u32, } // use of BTreeMap's sorted iterator is critical to esnure value_buf is filled in order - let mut page_reads: BTreeMap> = BTreeMap::new(); + let mut chunk_reads: BTreeMap> = BTreeMap::new(); for value_read in reads.iter().flat_map(|(_, v)| v.iter()) { let ValueRead { pos, len, .. } = value_read; let mut remaining = usize::try_from(*len).unwrap(); - let mut page_no = *pos / (PAGE_SZ as u32); - let mut offset_in_page = usize::try_from(*pos % (PAGE_SZ as u32)).unwrap(); + let mut chunk_no = *pos / (DIO_CHUNK_SIZE as u32); + let mut offset_in_chunk = usize::try_from(*pos % (DIO_CHUNK_SIZE as u32)).unwrap(); while remaining > 0 { - let remaining_in_page = std::cmp::min(remaining, PAGE_SZ - offset_in_page); - page_reads - .entry(page_no) + let remaining_in_chunk = std::cmp::min(remaining, DIO_CHUNK_SIZE - offset_in_chunk); + chunk_reads + .entry(chunk_no) .or_default() - .push(PageReadDestination { + .push(ChunkReadDestination { value_read, - offset_in_page: offset_in_page as u32, - len: remaining_in_page as u32, + offset_in_chunk: offset_in_chunk as u32, + len: remaining_in_chunk as u32, }); - offset_in_page = 0; - page_no += 1; - remaining -= remaining_in_page; + offset_in_chunk = 0; + chunk_no += 1; + remaining -= remaining_in_chunk; } } + // TODO: merge adjacent chunk reads (merging pass on the BTreeMap iterator) + // Execute reads and fill the destination // TODO: prefetch - let mut page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); - for (page_no, dsts) in page_reads.into_iter() { - let all_done = dsts.iter().all(|PageReadDestination { value_read, .. }| { + let get_chunk_buf = || Vec::with_capacity(DIO_CHUNK_SIZE); + let mut chunk_buf = get_chunk_buf(); + for (chunk_no, dsts) in chunk_reads.into_iter() { + let all_done = dsts.iter().all(|ChunkReadDestination { value_read, .. }| { let value_buf = value_read.value_buf.lock().unwrap(); let Ok(buf) = &*value_buf else { return true; // on Err() there's no need to read more @@ -361,34 +365,42 @@ impl InMemoryLayer { if all_done { continue; } - let read_result = match inner.file.read_page(page_no, page_buf, &ctx).await { - Ok(read_result) => read_result, + let (tmp, nread) = match inner + .file + .read_at_to_end( + chunk_no * DIO_CHUNK_SIZE as u32, + chunk_buf.slice_full(), + &ctx, + ) + .await + { + Ok(t) => t, Err(e) => { let e = Arc::new(e); - for PageReadDestination { value_read, .. } in dsts { + for ChunkReadDestination { value_read, .. } in dsts { *value_read.value_buf.lock().unwrap() = Err(Arc::clone(&e)); // this will make later reads short-circuit, see top of loop body } - page_buf = PageBuf::from(Box::new([0u8; PAGE_SZ])); // TODO: change read_page API to return the buffer + chunk_buf = get_chunk_buf(); // TODO: change API to return the buffer back on error continue; } }; - let page_contents = read_result.contents(); - for PageReadDestination { + chunk_buf = tmp.into_inner(); + let contents = &chunk_buf[..nread]; + for ChunkReadDestination { value_read, - offset_in_page, + offset_in_chunk, len, } in dsts { if let Ok(buf) = &mut *value_read.value_buf.lock().unwrap() { buf.extend_from_slice( - &page_contents[offset_in_page as usize..(offset_in_page + len) as usize], + &contents[offset_in_chunk as usize..(offset_in_chunk + len) as usize], ); } } - page_buf = read_result.into_page_buf(); } - drop(page_buf); + drop(chunk_buf); // Process results into the reconstruct state 'next_key: for (key, value_reads) in reads { @@ -651,19 +663,6 @@ impl InMemoryLayer { match l0_flush_global_state { l0_flush::Inner::Direct { .. } => { let file_contents: Vec = inner.file.load_to_vec(ctx).await?; - assert_eq!( - file_contents.len() % PAGE_SZ, - 0, - "needed by BlockReaderRef::Slice" - ); - assert_eq!(file_contents.len(), { - let written = usize::try_from(inner.file.len()).unwrap(); - if written % PAGE_SZ == 0 { - written - } else { - written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap() - } - }); let file_contents = Bytes::from(file_contents); @@ -675,7 +674,8 @@ impl InMemoryLayer { len, will_init, } = entry; - let buf = file_contents.slice(*pos as usize..(*pos + *len) as usize); + let buf = + Bytes::slice(&file_contents, *pos as usize..(*pos + *len) as usize); let (_buf, res) = delta_layer_writer .put_value_bytes( Key::from_compact(*key), diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index efcb61ba6532a..d10e988f42294 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -25,6 +25,10 @@ impl Writer { &self.dst } + pub fn as_inner_mut(&mut self) -> &mut W { + &mut self.dst + } + /// Returns the wrapped `VirtualFile` object as well as the number /// of bytes that were written to it through this object. #[cfg_attr(target_os = "macos", allow(dead_code))]