From c77e6e8c9667282d25a98a3570583da300b9291f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Aug 2024 19:54:46 +0200 Subject: [PATCH 1/6] refactor: VirtualFile write path: take Slice Need this so we can pass in Bytes to the l0 flush code --- pageserver/src/tenant/blob_io.rs | 51 ++++++++++--------- .../src/tenant/ephemeral_file/page_caching.rs | 32 ++++-------- .../src/tenant/storage_layer/delta_layer.rs | 20 +++++--- pageserver/src/virtual_file.rs | 36 +++++++------ .../util/size_tracking_writer.rs | 6 +-- .../virtual_file/owned_buffers_io/write.rs | 35 +++++-------- 6 files changed, 87 insertions(+), 93 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 8e9d349ca88c..f1055ce099a9 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -186,11 +186,11 @@ impl BlobWriter { /// You need to make sure that the internal buffer is empty, otherwise /// data will be written in wrong order. #[inline(always)] - async fn write_all_unbuffered, Buf: IoBuf + Send>( + async fn write_all_unbuffered( &mut self, - src_buf: B, + src_buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (Slice, Result<(), Error>) { let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let nbytes = match res { Ok(nbytes) => nbytes, @@ -204,8 +204,9 @@ impl BlobWriter { /// Flushes the internal buffer to the underlying `VirtualFile`. pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); - let (mut buf, res) = self.inner.write_all(buf, ctx).await; + let (mut slice, res) = self.inner.write_all(buf.slice(0..buf.len()), ctx).await; res?; + let mut buf = Slice::into_inner(slice); buf.clear(); self.buf = buf; Ok(()) @@ -222,11 +223,17 @@ impl BlobWriter { } /// Internal, possibly buffered, write function - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - src_buf: B, + src_buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { + ) -> (Slice, Result<(), Error>) { + assert!( + src_buf.bytes_init(), + src_buf.bytes_total(), + "caller likely meant to fill the buffer fully" + ); + if !BUFFERED { assert!(self.buf.is_empty()); return self.write_all_unbuffered(src_buf, ctx).await; @@ -287,19 +294,23 @@ impl BlobWriter { /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob_maybe_compressed, Buf: IoBuf + Send>( + pub async fn write_blob_maybe_compressed( &mut self, - srcbuf: B, + srcbuf: Slice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (B::Buf, Result<(u64, CompressionInfo), Error>) { + ) -> (Slice, Result<(u64, CompressionInfo), Error>) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, compressed_size: None, }; - let len = srcbuf.bytes_init(); + assert!( + srcbuf.bytes_init(), + srcbuf.bytes_total(), + "caller likely meant to fill the buffer fully" + ); let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); @@ -308,10 +319,7 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - ( - self.write_all(io_buf, ctx).await, - srcbuf.slice_full().into_inner(), - ) + (self.write_all(io_buf, ctx).await, srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_LEN { @@ -323,7 +331,7 @@ impl BlobWriter { format!("blob too large ({len} bytes)"), )), ), - srcbuf.slice_full().into_inner(), + srcbuf, ); } let (high_bit_mask, len_written, srcbuf) = match algorithm { @@ -336,8 +344,7 @@ impl BlobWriter { } else { async_compression::tokio::write::ZstdEncoder::new(Vec::new()) }; - let slice = srcbuf.slice_full(); - encoder.write_all(&slice[..]).await.unwrap(); + encoder.write_all(&srcbuf[..]).await.unwrap(); encoder.shutdown().await.unwrap(); let compressed = encoder.into_inner(); compression_info.compressed_size = Some(compressed.len()); @@ -345,14 +352,12 @@ impl BlobWriter { compression_info.written_compressed = true; let compressed_len = compressed.len(); compressed_buf = Some(compressed); - (BYTE_ZSTD, compressed_len, slice.into_inner()) + (BYTE_ZSTD, compressed_len, srcbuf) } else { - (BYTE_UNCOMPRESSED, len, slice.into_inner()) + (BYTE_UNCOMPRESSED, len, srcbuf) } } - ImageCompressionAlgorithm::Disabled => { - (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()) - } + ImageCompressionAlgorithm::Disabled => (BYTE_UNCOMPRESSED, len, srcbuf), }; let mut len_buf = (len_written as u32).to_be_bytes(); assert_eq!(len_buf[0] & 0xf0, 0); diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 0a12b64a7cb0..c5823aa001e3 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -208,21 +208,13 @@ impl PreWarmingWriter { } impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { - async fn write_all< - B: tokio_epoll_uring::BoundedBuf, - Buf: tokio_epoll_uring::IoBuf + Send, - >( + async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let buf = buf.slice(..); - let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done - let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) { - Some(buf.to_vec()) - } else { - None - }; + ) -> std::io::Result<(usize, Slice)> { + assert_eq!(buf.bytes_init(), buf.bytes_total()); + let buflen = buf.len(); assert_eq!( buflen % PAGE_SZ, @@ -231,10 +223,10 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi ); // Do the IO. - let iobuf = match self.file.write_all(buf, ctx).await { - (iobuf, Ok(nwritten)) => { + let buf = match self.file.write_all(buf, ctx).await { + (buf, Ok(nwritten)) => { assert_eq!(nwritten, buflen); - iobuf + buf } (_, Err(e)) => { return Err(std::io::Error::new( @@ -248,12 +240,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi } }; - // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf) - let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds); - if let Some(check_bounds_stuff_works) = check_bounds_stuff_works { - assert_eq!(&check_bounds_stuff_works, &*buf); - } - let nblocks = buflen / PAGE_SZ; let nblocks32 = u32::try_from(nblocks).unwrap(); @@ -300,6 +286,6 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi } self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); - Ok((buflen, buf.into_inner())) + Ok((buflen, buf)) } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f4e965b99acc..6e0ae050d3d0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -436,19 +436,22 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> anyhow::Result<()> { let (_, res) = self - .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx) + .put_value_bytes(key, lsn, Value::ser(&val)?.slice(..), val.will_init(), ctx) .await; res } - async fn put_value_bytes( + async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, - val: Vec, + val: Slice, will_init: bool, ctx: &RequestContext, - ) -> (Vec, anyhow::Result<()>) { + ) -> (Buf, anyhow::Result<()>) + where + Buf: IoBufMut + Send, + { assert!( self.lsn_range.start <= lsn, "lsn_start={}, lsn={}", @@ -646,14 +649,17 @@ impl DeltaLayerWriter { .await } - pub async fn put_value_bytes( + pub async fn put_value_bytes( &mut self, key: Key, lsn: Lsn, - val: Vec, + val: Slice, will_init: bool, ctx: &RequestContext, - ) -> (Vec, anyhow::Result<()>) { + ) -> (Buf, anyhow::Result<()>) + where + Buf: IoBufMut + Send, + { self.inner .as_mut() .unwrap() diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 27f6fe90a498..390d1f9dd837 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -672,27 +672,33 @@ impl VirtualFile { (Slice::into_inner(buf), Ok(())) } - /// Writes `buf.slice(0..buf.bytes_init())`. - /// Returns the IoBuf that is underlying the BoundedBuf `buf`. - /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. - /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. - pub async fn write_all, Buf: IoBuf + Send>( + /// Writes `buf` to the file at the current offset. + /// + /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. + pub async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result) { + ) -> (Slice, Result) { + assert!( + buf.bytes_init(), + buf.bytes_total(), + "caller likely meant to fill the buffer fully" + ); + let nbytes = buf.bytes_init(); if nbytes == 0 { - return (Slice::into_inner(buf.slice_full()), Ok(0)); + return (buf, Ok(0)); } - let mut buf = buf.slice(0..nbytes); + let bounds = buf.bounds(); + let mut buf = buf; while !buf.is_empty() { let res; (buf, res) = self.write(buf, ctx).await; match res { Ok(0) => { return ( - Slice::into_inner(buf), + Slice::from_buf_bounds(buf.into_inner(), bounds), Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -703,10 +709,10 @@ impl VirtualFile { buf = buf.slice(n..); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::into_inner(buf), Err(e)), + Err(e) => return (Slice::from_buf_bounds(buf.into_inner(), bounds), Err(e)), } } - (Slice::into_inner(buf), Ok(nbytes)) + (Slice::from_buf_bounds(buf.into_inner(), bounds), Ok(nbytes)) } async fn write( @@ -1093,11 +1099,11 @@ impl Drop for VirtualFile { impl OwnedAsyncWriter for VirtualFile { #[inline(always)] - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { + ) -> std::io::Result<(usize, Slice)> { let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; res.map(move |v| (v, buf)) } diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 55b1d0b46bfe..0d8f854be9ca 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -35,11 +35,11 @@ where W: OwnedAsyncWriter, { #[inline(always)] - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { + ) -> std::io::Result<(usize, Slice)> { let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; self.bytes_amount += u64::try_from(nwritten).unwrap(); Ok((nwritten, buf)) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 8599d95cdf9f..a761e86cd45d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -6,11 +6,11 @@ use crate::context::RequestContext; /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. pub trait OwnedAsyncWriter { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)>; + ) -> std::io::Result<(usize, Slice)>; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -213,18 +213,14 @@ impl Buffer for BytesMut { } impl OwnedAsyncWriter for Vec { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, _: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let nbytes = buf.bytes_init(); - if nbytes == 0 { - return Ok((0, Slice::into_inner(buf.slice_full()))); - } - let buf = buf.slice(0..nbytes); + ) -> std::io::Result<(usize, Slice)> { + assert_eq!(buf.bytes_init(), buf.bytes_total()); self.extend_from_slice(&buf[..]); - Ok((buf.len(), Slice::into_inner(buf))) + Ok((buf.len(), buf)) } } @@ -241,19 +237,14 @@ mod tests { writes: Vec>, } impl OwnedAsyncWriter for RecorderWriter { - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, _: &RequestContext, - ) -> std::io::Result<(usize, B::Buf)> { - let nbytes = buf.bytes_init(); - if nbytes == 0 { - self.writes.push(vec![]); - return Ok((0, Slice::into_inner(buf.slice_full()))); - } - let buf = buf.slice(0..nbytes); + ) -> std::io::Result<(usize, Slice)> { + assert_eq!(buf.bytes_init(), buf.bytes_total()); self.writes.push(Vec::from(&buf[..])); - Ok((buf.len(), Slice::into_inner(buf))) + Ok((buf.len(), buf)) } } From b7f7d5c9d37a1b324e6f0d037adab301f06bd1d6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Aug 2024 20:55:13 +0200 Subject: [PATCH 2/6] all tests pass --- pageserver/src/tenant/blob_io.rs | 48 +++++++++++-------- .../src/tenant/ephemeral_file/page_caching.rs | 2 +- .../src/tenant/storage_layer/delta_layer.rs | 26 ++++++---- .../src/tenant/storage_layer/image_layer.rs | 9 ++-- .../tenant/storage_layer/inmemory_layer.rs | 25 +++++++--- pageserver/src/virtual_file.rs | 28 ++++++----- pageserver/src/virtual_file/io_engine.rs | 2 +- .../owned_buffers_io/io_buf_ext.rs | 39 +++++++++++++++ .../virtual_file/owned_buffers_io/slice.rs | 4 +- .../util/size_tracking_writer.rs | 2 +- .../virtual_file/owned_buffers_io/write.rs | 11 +++-- 11 files changed, 134 insertions(+), 62 deletions(-) create mode 100644 pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f1055ce099a9..a01c18d7cef6 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -24,6 +24,7 @@ use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -204,7 +205,7 @@ impl BlobWriter { /// Flushes the internal buffer to the underlying `VirtualFile`. pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); - let (mut slice, res) = self.inner.write_all(buf.slice(0..buf.len()), ctx).await; + let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; res?; let mut buf = Slice::into_inner(slice); buf.clear(); @@ -228,11 +229,15 @@ impl BlobWriter { src_buf: Slice, ctx: &RequestContext, ) -> (Slice, Result<(), Error>) { - assert!( + assert_eq!( src_buf.bytes_init(), src_buf.bytes_total(), "caller likely meant to fill the buffer fully" ); + let src_buf_bounds = src_buf.bounds(); + let restore = move |src_buf_slice: Slice<_>| { + Slice::from_buf_bounds(src_buf_slice.into_inner(), src_buf_bounds) + }; if !BUFFERED { assert!(self.buf.is_empty()); @@ -241,7 +246,7 @@ impl BlobWriter { let remaining = Self::CAPACITY - self.buf.len(); let src_buf_len = src_buf.bytes_init(); if src_buf_len == 0 { - return (Slice::into_inner(src_buf.slice_full()), Ok(())); + return (restore(src_buf), Ok(())); } let mut src_buf = src_buf.slice(0..src_buf_len); // First try to copy as much as we can into the buffer @@ -252,7 +257,7 @@ impl BlobWriter { // Then, if the buffer is full, flush it out if self.buf.len() == Self::CAPACITY { if let Err(e) = self.flush_buffer(ctx).await { - return (Slice::into_inner(src_buf), Err(e)); + return (restore(src_buf), Err(e)); } } // Finally, write the tail of src_buf: @@ -265,7 +270,7 @@ impl BlobWriter { let copied = self.write_into_buffer(&src_buf); // We just verified above that src_buf fits into our internal buffer. assert_eq!(copied, src_buf.len()); - Slice::into_inner(src_buf) + restore(src_buf) } else { let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await; if let Err(e) = res { @@ -274,18 +279,18 @@ impl BlobWriter { src_buf } } else { - Slice::into_inner(src_buf) + restore(src_buf) }; (src_buf, Ok(())) } /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob, Buf: IoBuf + Send>( + pub async fn write_blob( &mut self, - srcbuf: B, + srcbuf: Slice, ctx: &RequestContext, - ) -> (B::Buf, Result) { + ) -> (Slice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -306,26 +311,27 @@ impl BlobWriter { compressed_size: None, }; - assert!( + assert_eq!( srcbuf.bytes_init(), srcbuf.bytes_total(), "caller likely meant to fill the buffer fully" ); + let len = srcbuf.bytes_init(); let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); let mut compressed_buf = None; - let ((io_buf, hdr_res), srcbuf) = async { + let ((io_buf_slice, hdr_res), srcbuf) = async { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - (self.write_all(io_buf, ctx).await, srcbuf) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } else { // Write a 4-byte length header if len > MAX_SUPPORTED_LEN { return ( ( - io_buf, + io_buf.slice_len(), Err(Error::new( ErrorKind::Other, format!("blob too large ({len} bytes)"), @@ -363,18 +369,18 @@ impl BlobWriter { assert_eq!(len_buf[0] & 0xf0, 0); len_buf[0] |= high_bit_mask; io_buf.extend_from_slice(&len_buf[..]); - (self.write_all(io_buf, ctx).await, srcbuf) + (self.write_all(io_buf.slice_len(), ctx).await, srcbuf) } } .await; - self.io_buf = Some(io_buf); + self.io_buf = Some(io_buf_slice.into_inner()); match hdr_res { Ok(_) => (), - Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), + Err(e) => return (srcbuf, Err(e)), } let (srcbuf, res) = if let Some(compressed_buf) = compressed_buf { - let (_buf, res) = self.write_all(compressed_buf, ctx).await; - (Slice::into_inner(srcbuf.slice(..)), res) + let (_buf, res) = self.write_all(compressed_buf.slice_len(), ctx).await; + (srcbuf, res) } else { self.write_all(srcbuf, ctx).await }; @@ -437,21 +443,21 @@ pub(crate) mod tests { let (_, res) = if compression { let res = wtr .write_blob_maybe_compressed( - blob.clone(), + blob.clone().slice_len(), ctx, ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await; (res.0, res.1.map(|(off, _)| off)) } else { - wtr.write_blob(blob.clone(), ctx).await + wtr.write_blob(blob.clone().slice_len(), ctx).await }; let offs = res?; offsets.push(offs); } // Write out one page worth of zeros so that we can // read again with read_blk - let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], ctx).await; + let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await; let offs = res?; println!("Writing final blob at offs={offs}"); wtr.flush_buffer(ctx).await?; diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index c5823aa001e3..634a16bcd476 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -9,7 +9,7 @@ use crate::virtual_file::VirtualFile; use once_cell::sync::Lazy; use std::io::{self, ErrorKind}; use std::ops::{Deref, Range}; -use tokio_epoll_uring::BoundedBuf; +use tokio_epoll_uring::{BoundedBuf, Slice}; use tracing::*; use super::zero_padded_read_write; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6e0ae050d3d0..c5ea912e24d3 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -42,6 +42,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::PageReconstructError; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -63,6 +64,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio_epoll_uring::{IoBufMut, Slice}; use tracing::*; use utils::{ @@ -436,7 +438,13 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> anyhow::Result<()> { let (_, res) = self - .put_value_bytes(key, lsn, Value::ser(&val)?.slice(..), val.will_init(), ctx) + .put_value_bytes( + key, + lsn, + Value::ser(&val)?.slice_len(), + val.will_init(), + ctx, + ) .await; res } @@ -448,7 +456,7 @@ impl DeltaLayerWriterInner { val: Slice, will_init: bool, ctx: &RequestContext, - ) -> (Buf, anyhow::Result<()>) + ) -> (Slice, anyhow::Result<()>) where Buf: IoBufMut + Send, { @@ -517,7 +525,7 @@ impl DeltaLayerWriterInner { file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -537,7 +545,7 @@ impl DeltaLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file @@ -656,7 +664,7 @@ impl DeltaLayerWriter { val: Slice, will_init: bool, ctx: &RequestContext, - ) -> (Buf, anyhow::Result<()>) + ) -> (Slice, anyhow::Result<()>) where Buf: IoBufMut + Send, { @@ -749,7 +757,7 @@ impl DeltaLayer { // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } @@ -1297,12 +1305,12 @@ impl DeltaLayerInner { .put_value_bytes( key, lsn, - std::mem::take(&mut per_blob_copy), + std::mem::take(&mut per_blob_copy).slice_len(), will_init, ctx, ) .await; - per_blob_copy = tmp; + per_blob_copy = tmp.into_inner(); res?; @@ -1877,7 +1885,7 @@ pub(crate) mod test { for entry in entries { let (_, res) = writer - .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx) + .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx) .await; res?; } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f9d3fdf18691..9a19e4e2c711 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -38,6 +38,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; @@ -354,7 +355,7 @@ impl ImageLayer { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; Ok(()) } @@ -786,7 +787,7 @@ impl ImageLayerWriterInner { self.num_keys += 1; let (_img, res) = self .blob_writer - .write_blob_maybe_compressed(img, ctx, compression) + .write_blob_maybe_compressed(img.slice_len(), ctx, compression) .await; // TODO: re-use the buffer for `img` further upstack let (off, compression_info) = res?; @@ -838,7 +839,7 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; } @@ -858,7 +859,7 @@ impl ImageLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf, ctx).await; + let (_buf, res) = file.write_all(buf.slice_len(), ctx).await; res?; let metadata = file diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index fb15ddfba9ee..c16d111a2520 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -12,6 +12,7 @@ use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::timeline::GetVectoredError; use crate::tenant::PageReconstructError; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache, walrecord}; use anyhow::{anyhow, Result}; use camino::Utf8PathBuf; @@ -581,11 +582,17 @@ impl InMemoryLayer { for (lsn, pos) in vec_map.as_slice() { cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, &ctx) + let (tmp, res) = delta_layer_writer + .put_value_bytes( + Key::from_compact(*key), + *lsn, + std::mem::take(&mut buf).slice_len(), + will_init, + &ctx, + ) .await; res?; + buf = tmp.into_inner(); } } } @@ -620,11 +627,17 @@ impl InMemoryLayer { // => https://github.com/neondatabase/neon/issues/8183 cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(Key::from_compact(*key), *lsn, buf, will_init, ctx) + let (tmp, res) = delta_layer_writer + .put_value_bytes( + Key::from_compact(*key), + *lsn, + std::mem::take(&mut buf).slice_len(), + will_init, + ctx, + ) .await; res?; + buf = tmp.into_inner(); } } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 390d1f9dd837..83f773c7cedd 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -50,6 +50,7 @@ pub(crate) mod owned_buffers_io { //! but for the time being we're proving out the primitives in the neon.git repo //! for faster iteration. + pub(crate) mod io_buf_ext; pub(crate) mod slice; pub(crate) mod write; pub(crate) mod util { @@ -680,7 +681,7 @@ impl VirtualFile { buf: Slice, ctx: &RequestContext, ) -> (Slice, Result) { - assert!( + assert_eq!( buf.bytes_init(), buf.bytes_total(), "caller likely meant to fill the buffer fully" @@ -1165,7 +1166,8 @@ mod tests { use crate::task_mgr::TaskKind; use super::*; - use owned_buffers_io::slice::SliceExt; + use owned_buffers_io::io_buf_ext::IoBufExt; + use owned_buffers_io::slice::SliceMutExt; use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; @@ -1225,23 +1227,18 @@ mod tests { MaybeVirtualFile::File(file) => file.seek(pos), } } - async fn write_all, Buf: IoBuf + Send>( + async fn write_all( &mut self, - buf: B, + buf: Slice, ctx: &RequestContext, ) -> Result<(), Error> { + assert_eq!(buf.bytes_init(), buf.bytes_total()); match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all(buf, ctx).await; res.map(|_| ()) } - MaybeVirtualFile::File(file) => { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return Ok(()); - } - file.write_all(&buf.slice(0..buf_len)) - } + MaybeVirtualFile::File(file) => file.write_all(&buf[..]), } } @@ -1353,7 +1350,9 @@ mod tests { &ctx, ) .await?; - file_a.write_all(b"foobar".to_vec(), &ctx).await?; + file_a + .write_all(b"foobar".to_vec().slice_len(), &ctx) + .await?; // cannot read from a file opened in write-only mode let _ = file_a.read_string(&ctx).await.unwrap_err(); @@ -1362,7 +1361,10 @@ mod tests { let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?; // cannot write to a file opened in read-only mode - let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err(); + let _ = file_a + .write_all(b"bar".to_vec().slice_len(), &ctx) + .await + .unwrap_err(); // Try simple read assert_eq!("foobar", file_a.read_string(&ctx).await?); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 0ffcd9fa0560..043c9d19a20c 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -107,7 +107,7 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata}; +use super::{owned_buffers_io::slice::SliceMutExt, FileGuard, Metadata}; #[cfg(target_os = "linux")] fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs new file mode 100644 index 000000000000..ad2c076e7baa --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -0,0 +1,39 @@ +use bytes::{Bytes, BytesMut}; +use std::ops::Range; +use tokio_epoll_uring::{BoundedBuf, Slice}; + +pub(crate) trait IoBufExt { + /// Get a [`Slice`] covering the range `[0..self.len()]`. + /// It is guaranteed that the resulting slice has [`Slice::bytes_init`] equal to [`Slice::bytes_total`]. + fn slice_len(self) -> Slice + where + Self: Sized; +} + +macro_rules! impl_io_buf_ext { + ($($t:ty),*) => { + $( + impl IoBufExt for $t { + #[inline(always)] + fn slice_len(self) -> Slice { + let len = self.len(); + let s = if len == 0 { + // paper over the incorrect assertion + // https://github.com/neondatabase/tokio-epoll-uring/issues/46 + let slice = self.slice_full(); + let mut bounds: Range<_> = slice.bounds(); + bounds.end = bounds.start; + // from_buf_bounds has the correct assertion + Slice::from_buf_bounds(slice.into_inner(), bounds) + } else { + self.slice(0..len) + }; + assert_eq!(s.bytes_init(), s.bytes_total()); + s + } + } + )* + }; +} + +impl_io_buf_ext!(Bytes, BytesMut, Vec); diff --git a/pageserver/src/virtual_file/owned_buffers_io/slice.rs b/pageserver/src/virtual_file/owned_buffers_io/slice.rs index d19e5ddffefb..610059366319 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/slice.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/slice.rs @@ -3,14 +3,14 @@ use tokio_epoll_uring::BoundedBufMut; use tokio_epoll_uring::IoBufMut; use tokio_epoll_uring::Slice; -pub(crate) trait SliceExt { +pub(crate) trait SliceMutExt { /// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO. /// /// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]` fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8]; } -impl SliceExt for Slice +impl SliceMutExt for Slice where B: IoBufMut, { diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 0d8f854be9ca..483d0100020d 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -1,5 +1,5 @@ use crate::{context::RequestContext, virtual_file::owned_buffers_io::write::OwnedAsyncWriter}; -use tokio_epoll_uring::{BoundedBuf, IoBuf}; +use tokio_epoll_uring::{IoBuf, Slice}; pub struct Writer { dst: W, diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index a761e86cd45d..14e8eeef6ada 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -81,7 +81,9 @@ where &mut self, chunk: Slice, ctx: &RequestContext, - ) -> std::io::Result<(usize, S)> { + ) -> std::io::Result<(usize, Slice)> { + assert_eq!(chunk.bytes_init(), chunk.bytes_total()); + let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk if chunk.len() >= self.buf().cap() { @@ -114,7 +116,7 @@ where } } assert!(slice.is_empty(), "by now we should have drained the chunk"); - Ok((chunk_len, chunk.into_inner())) + Ok((chunk_len, chunk)) } /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data. @@ -150,9 +152,10 @@ where self.buf = Some(buf); return Ok(()); } - let (nwritten, io_buf) = self.writer.write_all(buf.flush(), ctx).await?; + let slice = buf.flush(); + let (nwritten, slice) = self.writer.write_all(slice, ctx).await?; assert_eq!(nwritten, buf_len); - self.buf = Some(Buffer::reuse_after_flush(io_buf)); + self.buf = Some(Buffer::reuse_after_flush(slice.into_inner())); Ok(()) } } From 52dedffa15d56a7839e035a166e8700188b930de Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 13 Aug 2024 20:59:03 +0200 Subject: [PATCH 3/6] remaining --- pageserver/src/virtual_file.rs | 42 +++++++++---------- .../owned_buffers_io/io_buf_ext.rs | 2 + 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 83f773c7cedd..a636471a54c2 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -638,24 +638,23 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 - pub async fn write_all_at, Buf: IoBuf + Send>( + pub async fn write_all_at( &self, - buf: B, + buf: Slice, mut offset: u64, ctx: &RequestContext, - ) -> (B::Buf, Result<(), Error>) { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return (Slice::into_inner(buf.slice_full()), Ok(())); - } - let mut buf = buf.slice(0..buf_len); + ) -> (Slice, Result<(), Error>) { + assert_eq!(buf.bytes_init(), buf.bytes_total()); + let bounds = buf.bounds(); + let restore = |buf: Slice<_>| Slice::from_buf_bounds(buf.into_inner(), bounds); + let mut buf = buf; while !buf.is_empty() { let res; (buf, res) = self.write_at(buf, offset, ctx).await; match res { Ok(0) => { return ( - Slice::into_inner(buf), + restore(buf), Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -667,10 +666,10 @@ impl VirtualFile { offset += n as u64; } Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::into_inner(buf), Err(e)), + Err(e) => return (restore(buf), Err(e)), } } - (Slice::into_inner(buf), Ok(())) + (restore(buf), Ok(())) } /// Writes `buf` to the file at the current offset. @@ -1201,24 +1200,19 @@ mod tests { } } } - async fn write_all_at, Buf: IoBuf + Send>( + async fn write_all_at( &self, - buf: B, + buf: Slice, offset: u64, ctx: &RequestContext, ) -> Result<(), Error> { + assert_eq!(buf.bytes_init(), buf.bytes_total()); match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all_at(buf, offset, ctx).await; res } - MaybeVirtualFile::File(file) => { - let buf_len = buf.bytes_init(); - if buf_len == 0 { - return Ok(()); - } - file.write_all_at(&buf.slice(0..buf_len), offset) - } + MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset), } } async fn seek(&mut self, pos: SeekFrom) -> Result { @@ -1407,8 +1401,12 @@ mod tests { &ctx, ) .await?; - file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?; - file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?; + file_b + .write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx) + .await?; + file_b + .write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx) + .await?; assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA"); diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index ad2c076e7baa..c68dc8f557d7 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -5,6 +5,8 @@ use tokio_epoll_uring::{BoundedBuf, Slice}; pub(crate) trait IoBufExt { /// Get a [`Slice`] covering the range `[0..self.len()]`. /// It is guaranteed that the resulting slice has [`Slice::bytes_init`] equal to [`Slice::bytes_total`]. + /// + /// This is for use on the write path. fn slice_len(self) -> Slice where Self: Sized; From 21573e3892be7a228ec2f347e21fe423110f6dca Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Aug 2024 13:30:26 +0000 Subject: [PATCH 4/6] add type FullSlice (good suggestion, Vlad!) --- pageserver/src/tenant/blob_io.rs | 50 +++++------ .../src/tenant/ephemeral_file/page_caching.rs | 9 +- .../zero_padded_read_write/zero_padded.rs | 6 +- .../tenant/remote_timeline_client/download.rs | 5 +- .../src/tenant/storage_layer/delta_layer.rs | 14 +-- .../tenant/storage_layer/inmemory_layer.rs | 4 +- pageserver/src/virtual_file.rs | 60 ++++++------- pageserver/src/virtual_file/io_engine.rs | 19 ++-- .../owned_buffers_io/io_buf_ext.rs | 87 +++++++++++++------ .../util/size_tracking_writer.rs | 11 ++- .../virtual_file/owned_buffers_io/write.rs | 49 ++++++----- 11 files changed, 177 insertions(+), 137 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index a01c18d7cef6..a245c99a88fa 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -24,7 +24,7 @@ use tracing::warn; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; -use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -189,9 +189,9 @@ impl BlobWriter { #[inline(always)] async fn write_all_unbuffered( &mut self, - src_buf: Slice, + src_buf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result<(), Error>) { + ) -> (FullSlice, Result<(), Error>) { let (src_buf, res) = self.inner.write_all(src_buf, ctx).await; let nbytes = match res { Ok(nbytes) => nbytes, @@ -207,7 +207,7 @@ impl BlobWriter { let buf = std::mem::take(&mut self.buf); let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await; res?; - let mut buf = Slice::into_inner(slice); + let mut buf = slice.into_raw_slice().into_inner(); buf.clear(); self.buf = buf; Ok(()) @@ -226,22 +226,23 @@ impl BlobWriter { /// Internal, possibly buffered, write function async fn write_all( &mut self, - src_buf: Slice, + src_buf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result<(), Error>) { - assert_eq!( - src_buf.bytes_init(), - src_buf.bytes_total(), - "caller likely meant to fill the buffer fully" - ); + ) -> (FullSlice, Result<(), Error>) { + let src_buf = src_buf.into_raw_slice(); let src_buf_bounds = src_buf.bounds(); let restore = move |src_buf_slice: Slice<_>| { - Slice::from_buf_bounds(src_buf_slice.into_inner(), src_buf_bounds) + FullSlice::must_new(Slice::from_buf_bounds( + src_buf_slice.into_inner(), + src_buf_bounds, + )) }; if !BUFFERED { assert!(self.buf.is_empty()); - return self.write_all_unbuffered(src_buf, ctx).await; + return self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; } let remaining = Self::CAPACITY - self.buf.len(); let src_buf_len = src_buf.bytes_init(); @@ -272,7 +273,9 @@ impl BlobWriter { assert_eq!(copied, src_buf.len()); restore(src_buf) } else { - let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await; + let (src_buf, res) = self + .write_all_unbuffered(FullSlice::must_new(src_buf), ctx) + .await; if let Err(e) = res { return (src_buf, Err(e)); } @@ -288,9 +291,9 @@ impl BlobWriter { /// which can be used to retrieve the data later. pub async fn write_blob( &mut self, - srcbuf: Slice, + srcbuf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result) { + ) -> (FullSlice, Result) { let (buf, res) = self .write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled) .await; @@ -299,24 +302,19 @@ impl BlobWriter { /// Write a blob of data. Returns the offset that it was written to, /// which can be used to retrieve the data later. - pub async fn write_blob_maybe_compressed( + pub(crate) async fn write_blob_maybe_compressed( &mut self, - srcbuf: Slice, + srcbuf: FullSlice, ctx: &RequestContext, algorithm: ImageCompressionAlgorithm, - ) -> (Slice, Result<(u64, CompressionInfo), Error>) { + ) -> (FullSlice, Result<(u64, CompressionInfo), Error>) { let offset = self.offset; let mut compression_info = CompressionInfo { written_compressed: false, compressed_size: None, }; - assert_eq!( - srcbuf.bytes_init(), - srcbuf.bytes_total(), - "caller likely meant to fill the buffer fully" - ); - let len = srcbuf.bytes_init(); + let len = srcbuf.len(); let mut io_buf = self.io_buf.take().expect("we always put it back below"); io_buf.clear(); @@ -373,7 +371,7 @@ impl BlobWriter { } } .await; - self.io_buf = Some(io_buf_slice.into_inner()); + self.io_buf = Some(io_buf_slice.into_raw_slice().into_inner()); match hdr_res { Ok(_) => (), Err(e) => return (srcbuf, Err(e)), diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 634a16bcd476..7355b3b5a37d 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -4,12 +4,13 @@ use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::BlockLease; +use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; use crate::virtual_file::VirtualFile; use once_cell::sync::Lazy; use std::io::{self, ErrorKind}; use std::ops::{Deref, Range}; -use tokio_epoll_uring::{BoundedBuf, Slice}; +use tokio_epoll_uring::BoundedBuf; use tracing::*; use super::zero_padded_read_write; @@ -210,11 +211,9 @@ impl PreWarmingWriter { impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { - assert_eq!(buf.bytes_init(), buf.bytes_total()); - + ) -> std::io::Result<(usize, FullSlice)> { let buflen = buf.len(); assert_eq!( buflen % PAGE_SZ, diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs index f90291bbf814..2dc0277638e7 100644 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs +++ b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write/zero_padded.rs @@ -5,6 +5,8 @@ use std::mem::MaybeUninit; +use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; + /// See module-level comment. pub struct Buffer { allocation: Box<[u8; N]>, @@ -60,10 +62,10 @@ impl crate::virtual_file::owned_buffers_io::write::Buffer for Bu self.written } - fn flush(self) -> tokio_epoll_uring::Slice { + fn flush(self) -> FullSlice { self.invariants(); let written = self.written; - tokio_epoll_uring::BoundedBuf::slice(self, 0..written) + FullSlice::must_new(tokio_epoll_uring::BoundedBuf::slice(self, 0..written)) } fn reuse_after_flush(iobuf: Self::IoBuf) -> Self { diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index a17b32c98375..8199218c3c61 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; +use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath}; @@ -219,9 +220,7 @@ async fn download_object<'a>( Ok(chunk) => chunk, Err(e) => return Err(e), }; - buffered - .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk), ctx) - .await?; + buffered.write_buffered(chunk.slice_len(), ctx).await?; } let size_tracking = buffered.flush_and_into_inner(ctx).await?; Ok(size_tracking.into_inner()) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index c5ea912e24d3..d5b40c74f2be 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -42,7 +42,7 @@ use crate::tenant::vectored_blob_io::{ VectoredReadPlanner, }; use crate::tenant::PageReconstructError; -use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; +use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -64,7 +64,7 @@ use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; use tokio::sync::OnceCell; -use tokio_epoll_uring::{IoBufMut, Slice}; +use tokio_epoll_uring::IoBufMut; use tracing::*; use utils::{ @@ -453,10 +453,10 @@ impl DeltaLayerWriterInner { &mut self, key: Key, lsn: Lsn, - val: Slice, + val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (Slice, anyhow::Result<()>) + ) -> (FullSlice, anyhow::Result<()>) where Buf: IoBufMut + Send, { @@ -661,10 +661,10 @@ impl DeltaLayerWriter { &mut self, key: Key, lsn: Lsn, - val: Slice, + val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (Slice, anyhow::Result<()>) + ) -> (FullSlice, anyhow::Result<()>) where Buf: IoBufMut + Send, { @@ -1310,7 +1310,7 @@ impl DeltaLayerInner { ctx, ) .await; - per_blob_copy = tmp.into_inner(); + per_blob_copy = tmp.into_raw_slice().into_inner(); res?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c16d111a2520..53d855450f8a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -592,7 +592,7 @@ impl InMemoryLayer { ) .await; res?; - buf = tmp.into_inner(); + buf = tmp.into_raw_slice().into_inner(); } } } @@ -637,7 +637,7 @@ impl InMemoryLayer { ) .await; res?; - buf = tmp.into_inner(); + buf = tmp.into_raw_slice().into_inner(); } } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index a636471a54c2..b4695e5f4013 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -17,6 +17,7 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; +use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::shard::TenantShardId; use std::fs::File; use std::io::{Error, ErrorKind, Seek, SeekFrom}; @@ -640,17 +641,18 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub async fn write_all_at( &self, - buf: Slice, + buf: FullSlice, mut offset: u64, ctx: &RequestContext, - ) -> (Slice, Result<(), Error>) { - assert_eq!(buf.bytes_init(), buf.bytes_total()); + ) -> (FullSlice, Result<(), Error>) { + let buf = buf.into_raw_slice(); let bounds = buf.bounds(); - let restore = |buf: Slice<_>| Slice::from_buf_bounds(buf.into_inner(), bounds); + let restore = + |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); let mut buf = buf; while !buf.is_empty() { - let res; - (buf, res) = self.write_at(buf, offset, ctx).await; + let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await; + buf = tmp.into_raw_slice(); match res { Ok(0) => { return ( @@ -677,28 +679,22 @@ impl VirtualFile { /// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller. pub async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result) { - assert_eq!( - buf.bytes_init(), - buf.bytes_total(), - "caller likely meant to fill the buffer fully" - ); - - let nbytes = buf.bytes_init(); - if nbytes == 0 { - return (buf, Ok(0)); - } + ) -> (FullSlice, Result) { + let buf = buf.into_raw_slice(); let bounds = buf.bounds(); + let restore = + |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds)); + let nbytes = buf.len(); let mut buf = buf; while !buf.is_empty() { - let res; - (buf, res) = self.write(buf, ctx).await; + let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await; + buf = tmp.into_raw_slice(); match res { Ok(0) => { return ( - Slice::from_buf_bounds(buf.into_inner(), bounds), + restore(buf), Err(Error::new( std::io::ErrorKind::WriteZero, "failed to write whole buffer", @@ -709,17 +705,17 @@ impl VirtualFile { buf = buf.slice(n..); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return (Slice::from_buf_bounds(buf.into_inner(), bounds), Err(e)), + Err(e) => return (restore(buf), Err(e)), } } - (Slice::from_buf_bounds(buf.into_inner(), bounds), Ok(nbytes)) + (restore(buf), Ok(nbytes)) } async fn write( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> (Slice, Result) { + ) -> (FullSlice, Result) { let pos = self.pos; let (buf, res) = self.write_at(buf, pos, ctx).await; let n = match res { @@ -762,10 +758,10 @@ impl VirtualFile { async fn write_at( &self, - buf: Slice, + buf: FullSlice, offset: u64, _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ - ) -> (Slice, Result) { + ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, Err(e) => return (buf, Err(e)), @@ -1101,9 +1097,9 @@ impl OwnedAsyncWriter for VirtualFile { #[inline(always)] async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { + ) -> std::io::Result<(usize, FullSlice)> { let (buf, res) = VirtualFile::write_all(self, buf, ctx).await; res.map(move |v| (v, buf)) } @@ -1202,11 +1198,10 @@ mod tests { } async fn write_all_at( &self, - buf: Slice, + buf: FullSlice, offset: u64, ctx: &RequestContext, ) -> Result<(), Error> { - assert_eq!(buf.bytes_init(), buf.bytes_total()); match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all_at(buf, offset, ctx).await; @@ -1223,10 +1218,9 @@ mod tests { } async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, ) -> Result<(), Error> { - assert_eq!(buf.bytes_init(), buf.bytes_total()); match self { MaybeVirtualFile::VirtualFile(file) => { let (_buf, res) = file.write_all(buf, ctx).await; diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 043c9d19a20c..faef1ba9ff54 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -12,7 +12,7 @@ #[cfg(target_os = "linux")] pub(super) mod tokio_epoll_uring_ext; -use tokio_epoll_uring::{IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use tracing::Instrument; pub(crate) use super::api::IoEngineKind; @@ -107,7 +107,10 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::{owned_buffers_io::slice::SliceMutExt, FileGuard, Metadata}; +use super::{ + owned_buffers_io::{io_buf_ext::FullSlice, slice::SliceMutExt}, + FileGuard, Metadata, +}; #[cfg(target_os = "linux")] fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { @@ -206,8 +209,8 @@ impl IoEngine { &self, file_guard: FileGuard, offset: u64, - buf: Slice, - ) -> ((FileGuard, Slice), std::io::Result) { + buf: FullSlice, + ) -> ((FileGuard, FullSlice), std::io::Result) { match self { IoEngine::NotSet => panic!("not initialized"), IoEngine::StdFs => { @@ -217,8 +220,12 @@ impl IoEngine { #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.write(file_guard, offset, buf).await; - (resources, res.map_err(epoll_uring_error_to_std)) + let ((file_guard, slice), res) = + system.write(file_guard, offset, buf.into_raw_slice()).await; + ( + (file_guard, FullSlice::must_new(slice)), + res.map_err(epoll_uring_error_to_std), + ) } } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index c68dc8f557d7..3b2fa92f5e24 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -1,41 +1,78 @@ use bytes::{Bytes, BytesMut}; -use std::ops::Range; -use tokio_epoll_uring::{BoundedBuf, Slice}; +use std::ops::{Deref, Range}; +use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; + +/// The true owned equivalent for Rust [`slice`]. Use this for the write path. +/// +/// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`, +/// [`FullSlice`] is guaranteed to have all its bytes initialized. This means that +/// [`>::len`] is equal to [`Slice::bytes_init`] and [`Slice::bytes_total`]. +/// +pub struct FullSlice { + slice: Slice, +} + +impl FullSlice +where + B: IoBuf, +{ + pub(crate) fn must_new(slice: Slice) -> Self { + assert_eq!(slice.bytes_init(), slice.bytes_total()); + FullSlice { slice } + } + pub(crate) fn into_raw_slice(self) -> Slice { + let FullSlice { slice: s } = self; + s + } +} + +impl Deref for FullSlice +where + B: IoBuf, +{ + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let rust_slice = &self.slice[..]; + assert_eq!(rust_slice.len(), self.slice.bytes_init()); + assert_eq!(rust_slice.len(), self.slice.bytes_total()); + rust_slice + } +} pub(crate) trait IoBufExt { /// Get a [`Slice`] covering the range `[0..self.len()]`. /// It is guaranteed that the resulting slice has [`Slice::bytes_init`] equal to [`Slice::bytes_total`]. /// /// This is for use on the write path. - fn slice_len(self) -> Slice + fn slice_len(self) -> FullSlice where Self: Sized; } macro_rules! impl_io_buf_ext { - ($($t:ty),*) => { - $( - impl IoBufExt for $t { - #[inline(always)] - fn slice_len(self) -> Slice { - let len = self.len(); - let s = if len == 0 { - // paper over the incorrect assertion - // https://github.com/neondatabase/tokio-epoll-uring/issues/46 - let slice = self.slice_full(); - let mut bounds: Range<_> = slice.bounds(); - bounds.end = bounds.start; - // from_buf_bounds has the correct assertion - Slice::from_buf_bounds(slice.into_inner(), bounds) - } else { - self.slice(0..len) - }; - assert_eq!(s.bytes_init(), s.bytes_total()); - s - } + ($T:ty) => { + impl IoBufExt for $T { + #[inline(always)] + fn slice_len(self) -> FullSlice { + let len = self.len(); + let s = if len == 0 { + // paper over the incorrect assertion + // https://github.com/neondatabase/tokio-epoll-uring/issues/46 + let slice = self.slice_full(); + let mut bounds: Range<_> = slice.bounds(); + bounds.end = bounds.start; + // from_buf_bounds has the correct assertion + Slice::from_buf_bounds(slice.into_inner(), bounds) + } else { + self.slice(0..len) + }; + FullSlice::must_new(s) } - )* + } }; } -impl_io_buf_ext!(Bytes, BytesMut, Vec); +impl_io_buf_ext!(Bytes); +impl_io_buf_ext!(BytesMut); +impl_io_buf_ext!(Vec); diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs index 483d0100020d..efcb61ba6532 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -1,5 +1,8 @@ -use crate::{context::RequestContext, virtual_file::owned_buffers_io::write::OwnedAsyncWriter}; -use tokio_epoll_uring::{IoBuf, Slice}; +use crate::{ + context::RequestContext, + virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter}, +}; +use tokio_epoll_uring::IoBuf; pub struct Writer { dst: W, @@ -37,9 +40,9 @@ where #[inline(always)] async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { + ) -> std::io::Result<(usize, FullSlice)> { let (nwritten, buf) = self.dst.write_all(buf, ctx).await?; self.bytes_amount += u64::try_from(nwritten).unwrap(); Ok((nwritten, buf)) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 14e8eeef6ada..f8f37b17e33f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -1,16 +1,18 @@ use bytes::BytesMut; -use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_epoll_uring::IoBuf; use crate::context::RequestContext; +use super::io_buf_ext::{FullSlice, IoBufExt}; + /// A trait for doing owned-buffer write IO. /// Think [`tokio::io::AsyncWrite`] but with owned buffers. pub trait OwnedAsyncWriter { async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, Slice)>; + ) -> std::io::Result<(usize, FullSlice)>; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -79,10 +81,10 @@ where #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn write_buffered( &mut self, - chunk: Slice, + chunk: FullSlice, ctx: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { - assert_eq!(chunk.bytes_init(), chunk.bytes_total()); + ) -> std::io::Result<(usize, FullSlice)> { + let chunk = chunk.into_raw_slice(); let chunk_len = chunk.len(); // avoid memcpy for the middle of the chunk @@ -96,7 +98,10 @@ where .pending(), 0 ); - let (nwritten, chunk) = self.writer.write_all(chunk, ctx).await?; + let (nwritten, chunk) = self + .writer + .write_all(FullSlice::must_new(chunk), ctx) + .await?; assert_eq!(nwritten, chunk_len); return Ok((nwritten, chunk)); } @@ -116,7 +121,7 @@ where } } assert!(slice.is_empty(), "by now we should have drained the chunk"); - Ok((chunk_len, chunk)) + Ok((chunk_len, FullSlice::must_new(chunk))) } /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data. @@ -155,7 +160,9 @@ where let slice = buf.flush(); let (nwritten, slice) = self.writer.write_all(slice, ctx).await?; assert_eq!(nwritten, buf_len); - self.buf = Some(Buffer::reuse_after_flush(slice.into_inner())); + self.buf = Some(Buffer::reuse_after_flush( + slice.into_raw_slice().into_inner(), + )); Ok(()) } } @@ -175,9 +182,9 @@ pub trait Buffer { /// Number of bytes in the buffer. fn pending(&self) -> usize; - /// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data + /// Turns `self` into a [`FullSlice`] of the pending data /// so we can use [`tokio_epoll_uring`] to write it to disk. - fn flush(self) -> Slice; + fn flush(self) -> FullSlice; /// After the write to disk is done and we have gotten back the slice, /// [`BufferedWriter`] uses this method to re-use the io buffer. @@ -201,12 +208,8 @@ impl Buffer for BytesMut { self.len() } - fn flush(self) -> Slice { - if self.is_empty() { - return self.slice_full(); - } - let len = self.len(); - self.slice(0..len) + fn flush(self) -> FullSlice { + self.slice_len() } fn reuse_after_flush(mut iobuf: BytesMut) -> Self { @@ -218,10 +221,9 @@ impl Buffer for BytesMut { impl OwnedAsyncWriter for Vec { async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, _: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { - assert_eq!(buf.bytes_init(), buf.bytes_total()); + ) -> std::io::Result<(usize, FullSlice)> { self.extend_from_slice(&buf[..]); Ok((buf.len(), buf)) } @@ -242,10 +244,9 @@ mod tests { impl OwnedAsyncWriter for RecorderWriter { async fn write_all( &mut self, - buf: Slice, + buf: FullSlice, _: &RequestContext, - ) -> std::io::Result<(usize, Slice)> { - assert_eq!(buf.bytes_init(), buf.bytes_total()); + ) -> std::io::Result<(usize, FullSlice)> { self.writes.push(Vec::from(&buf[..])); Ok((buf.len(), buf)) } @@ -258,7 +259,7 @@ mod tests { macro_rules! write { ($writer:ident, $data:literal) => {{ $writer - .write_buffered(::bytes::Bytes::from_static($data).slice_full(), &test_ctx()) + .write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx()) .await?; }}; } From a5c062c33619d19b58dbf89517d021199bfd772c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Aug 2024 13:52:27 +0000 Subject: [PATCH 5/6] improved doc comments --- .../virtual_file/owned_buffers_io/io_buf_ext.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs index 3b2fa92f5e24..7c773b6b2103 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/io_buf_ext.rs @@ -1,3 +1,5 @@ +//! See [`FullSlice`]. + use bytes::{Bytes, BytesMut}; use std::ops::{Deref, Range}; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; @@ -41,10 +43,7 @@ where } pub(crate) trait IoBufExt { - /// Get a [`Slice`] covering the range `[0..self.len()]`. - /// It is guaranteed that the resulting slice has [`Slice::bytes_init`] equal to [`Slice::bytes_total`]. - /// - /// This is for use on the write path. + /// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`. fn slice_len(self) -> FullSlice where Self: Sized; @@ -57,12 +56,13 @@ macro_rules! impl_io_buf_ext { fn slice_len(self) -> FullSlice { let len = self.len(); let s = if len == 0 { - // paper over the incorrect assertion - // https://github.com/neondatabase/tokio-epoll-uring/issues/46 + // `BoundedBuf::slice(0..len)` or `BoundedBuf::slice(..)` has an incorrect assertion, + // causing a panic if len == 0. + // The Slice::from_buf_bounds has the correct assertion (<= instead of <). + // => https://github.com/neondatabase/tokio-epoll-uring/issues/46 let slice = self.slice_full(); let mut bounds: Range<_> = slice.bounds(); bounds.end = bounds.start; - // from_buf_bounds has the correct assertion Slice::from_buf_bounds(slice.into_inner(), bounds) } else { self.slice(0..len) From b55a70ab1b0538d9f05654a898afe155235eae45 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 14 Aug 2024 13:57:37 +0000 Subject: [PATCH 6/6] no need for std::mem::take; https://github.com/neondatabase/neon/pull/8717#discussion_r1716749014 --- pageserver/src/tenant/storage_layer/inmemory_layer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 53d855450f8a..748d79c14969 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -586,7 +586,7 @@ impl InMemoryLayer { .put_value_bytes( Key::from_compact(*key), *lsn, - std::mem::take(&mut buf).slice_len(), + buf.slice_len(), will_init, &ctx, ) @@ -631,7 +631,7 @@ impl InMemoryLayer { .put_value_bytes( Key::from_compact(*key), *lsn, - std::mem::take(&mut buf).slice_len(), + buf.slice_len(), will_init, ctx, )