From 140b24d2ce1be337844240b511f563c011659837 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 28 Mar 2024 23:26:41 +0800 Subject: [PATCH] feat(core/raw/oio): Add Writable Buf (#4410) --- core/src/raw/oio/buf/mod.rs | 3 + core/src/raw/oio/buf/readable_buf.rs | 36 ++++++- core/src/raw/oio/buf/writable_buf.rs | 146 +++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 core/src/raw/oio/buf/writable_buf.rs diff --git a/core/src/raw/oio/buf/mod.rs b/core/src/raw/oio/buf/mod.rs index d4f29d37159b..b5849c2a282b 100644 --- a/core/src/raw/oio/buf/mod.rs +++ b/core/src/raw/oio/buf/mod.rs @@ -23,3 +23,6 @@ pub use buffer::Buffer; mod readable_buf; pub use readable_buf::ReadableBuf; + +mod writable_buf; +pub use writable_buf::WritableBuf; diff --git a/core/src/raw/oio/buf/readable_buf.rs b/core/src/raw/oio/buf/readable_buf.rs index f885115e0453..e142ef03b8ec 100644 --- a/core/src/raw/oio/buf/readable_buf.rs +++ b/core/src/raw/oio/buf/readable_buf.rs @@ -22,12 +22,14 @@ use bytes::Bytes; /// ReadableBuf is the buf used in `oio::Write`. /// -/// This API is used internally by the `oio` crate. Users should never use it directly. +/// This API is used internally by the `oio` crate. Users SHOULD never use it in any way. /// /// # Safety /// -/// Caller must make sure that input buffer lives longer than `ReadableBuf` otherwise `ReadableBuf` -/// might point to invalid memory. +/// - Caller MUST make sure that input buffer lives longer than `ReadableBuf`. Otherwise +/// `ReadableBuf` might point to invalid memory. +/// - Caller SHOULD NOT store `ReadableBuf` in anyway. The buf should only be passed to `oio::Write` +/// or been copied out by [`ReadableBuf::to_bytes`]. #[derive(Clone)] pub struct ReadableBuf(Inner); @@ -184,7 +186,7 @@ impl Buf for ReadableBuf { "ptr of slice must be valid across the lifetime of ReadableBuf" ); assert!( - *size > *offset + cnt, + *size >= *offset + cnt, "cnt {cnt} exceeds the remaining size {}", *size - *offset ); @@ -202,7 +204,7 @@ impl Buf for ReadableBuf { "ptr of slice must be valid across the lifetime of ReadableBuf" ); assert!( - *size > *offset + len, + *size >= *offset + len, "len {len} exceeds the remaining size {}", *size - *offset ); @@ -233,6 +235,12 @@ mod tests { assert_eq!(buf.remaining(), 6); assert_eq!(buf.chunk(), b" world"); assert_eq!(buf.to_bytes(), Bytes::from_static(b" world")); + + let mut buf = ReadableBuf::from_slice(b"hello world"); + assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world")); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), b""); + assert_eq!(buf.to_bytes(), Bytes::from_static(b"")); } #[test] @@ -247,6 +255,12 @@ mod tests { assert_eq!(buf.remaining(), 6); assert_eq!(buf.chunk(), b" world"); assert_eq!(buf.to_bytes(), Bytes::from_static(b" world")); + + let mut buf = ReadableBuf::from_bytes("hello world"); + assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world")); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), b""); + assert_eq!(buf.to_bytes(), Bytes::from_static(b"")); } #[test] @@ -262,6 +276,12 @@ mod tests { assert_eq!(buf.remaining(), 6); assert_eq!(buf.chunk(), b" world"); assert_eq!(buf.to_bytes(), Bytes::from_static(b" world")); + + let mut buf = ReadableBuf::from_slice(bs); + assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world")); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), b""); + assert_eq!(buf.to_bytes(), Bytes::from_static(b"")); } #[test] @@ -277,5 +297,11 @@ mod tests { assert_eq!(buf.remaining(), 6); assert_eq!(buf.chunk(), b" world"); assert_eq!(buf.to_bytes(), Bytes::from_static(b" world")); + + let mut buf = ReadableBuf::from_slice(&bs); + assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world")); + assert_eq!(buf.remaining(), 0); + assert_eq!(buf.chunk(), b""); + assert_eq!(buf.to_bytes(), Bytes::from_static(b"")); } } diff --git a/core/src/raw/oio/buf/writable_buf.rs b/core/src/raw/oio/buf/writable_buf.rs new file mode 100644 index 000000000000..5b88648d57c0 --- /dev/null +++ b/core/src/raw/oio/buf/writable_buf.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::buf::UninitSlice; +use bytes::BufMut; + +/// WritableBuf is the buf used in `oio::Read`. +/// +/// This API is used internally by the `oio` crate. Users SHOULD never use it in any way. +/// +/// # Safety +/// +/// - Caller MUST make sure that input buffer lives longer than `WritableBuf`. Otherwise +/// `WritableBuf` might point to invalid memory. +/// - Caller MUST not mutate the original buffer in any way out of `WritableBuf`. +/// - Caller SHOULD NOT remote from `WritableBuf` in anyway. +pub struct WritableBuf { + ptr: *mut u8, + size: usize, + offset: usize, +} + +/// # Safety +/// +/// We make sure that `ptr` itself will never be changed. +unsafe impl Send for WritableBuf {} + +impl WritableBuf { + /// Build a WritableBuf from slice. + pub fn from_slice(slice: &mut [u8]) -> Self { + Self { + ptr: slice.as_mut_ptr(), + size: slice.len(), + offset: 0, + } + } + + /// Build a WritableBuf from mutable BufMut. + pub fn from_buf_mut(buf: &mut impl BufMut) -> Self { + let slice = buf.chunk_mut(); + Self { + ptr: slice.as_mut_ptr(), + size: slice.len(), + offset: 0, + } + } +} + +unsafe impl BufMut for WritableBuf { + fn remaining_mut(&self) -> usize { + assert!( + !self.ptr.is_null(), + "ptr of slice must be valid across the lifetime of WritableBuf" + ); + self.size - self.offset + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + assert!( + !self.ptr.is_null(), + "ptr of slice must be valid across the lifetime of WritableBuf" + ); + assert!( + self.size >= self.offset + cnt, + "cnt {cnt} exceeds the remaining size {}", + self.size - self.offset + ); + self.offset += cnt; + } + + fn chunk_mut(&mut self) -> &mut UninitSlice { + assert!( + !self.ptr.is_null(), + "ptr of slice must be valid across the lifetime of WritableBuf" + ); + + unsafe { + UninitSlice::from_raw_parts_mut(self.ptr.add(self.offset), self.size - self.offset) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + #[test] + fn test_writable_buf_from_slice() { + let mut buf = [0u8; 10]; + let mut writable_buf = WritableBuf::from_slice(&mut buf); + assert_eq!(writable_buf.remaining_mut(), 10); + assert_eq!(writable_buf.chunk_mut().len(), 10); + + writable_buf.put_slice(b"hello"); + + assert_eq!(writable_buf.remaining_mut(), 5); + assert_eq!(writable_buf.chunk_mut().len(), 5); + assert_eq!(&buf[..5], b"hello"); + + writable_buf.put_slice(b"world"); + assert_eq!(writable_buf.remaining_mut(), 0); + assert_eq!(writable_buf.chunk_mut().len(), 0); + assert_eq!(&buf[..], b"helloworld"); + } + + #[test] + fn test_writable_buf_from_bytes_mut() { + let mut buf = BytesMut::with_capacity(10); + let mut writable_buf = WritableBuf::from_buf_mut(&mut buf); + assert_eq!(writable_buf.remaining_mut(), 10); + assert_eq!(writable_buf.chunk_mut().len(), 10); + + writable_buf.put_slice(b"hello"); + assert_eq!(writable_buf.remaining_mut(), 5); + assert_eq!(writable_buf.chunk_mut().len(), 5); + + writable_buf.put_slice(b"world"); + assert_eq!(writable_buf.remaining_mut(), 0); + assert_eq!(writable_buf.chunk_mut().len(), 0); + + unsafe { + buf.advance_mut(5); + } + assert_eq!(&buf[..5], b"hello"); + + unsafe { + buf.advance_mut(5); + } + assert_eq!(&buf[..], b"helloworld"); + } +}