Skip to content

Commit

Permalink
feat(core/raw/oio): Add Writable Buf (#4410)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Mar 28, 2024
1 parent 017c267 commit 140b24d
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 5 deletions.
3 changes: 3 additions & 0 deletions core/src/raw/oio/buf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ pub use buffer::Buffer;

mod readable_buf;
pub use readable_buf::ReadableBuf;

mod writable_buf;
pub use writable_buf::WritableBuf;
36 changes: 31 additions & 5 deletions core/src/raw/oio/buf/readable_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use bytes::Bytes;

/// ReadableBuf is the buf used in `oio::Write`.
///
/// This API is used internally by the `oio` crate. Users should never use it directly.
/// This API is used internally by the `oio` crate. Users SHOULD never use it in any way.
///
/// # Safety
///
/// Caller must make sure that input buffer lives longer than `ReadableBuf` otherwise `ReadableBuf`
/// might point to invalid memory.
/// - Caller MUST make sure that input buffer lives longer than `ReadableBuf`. Otherwise
/// `ReadableBuf` might point to invalid memory.
/// - Caller SHOULD NOT store `ReadableBuf` in anyway. The buf should only be passed to `oio::Write`
/// or been copied out by [`ReadableBuf::to_bytes`].
#[derive(Clone)]
pub struct ReadableBuf(Inner);

Expand Down Expand Up @@ -184,7 +186,7 @@ impl Buf for ReadableBuf {
"ptr of slice must be valid across the lifetime of ReadableBuf"
);
assert!(
*size > *offset + cnt,
*size >= *offset + cnt,
"cnt {cnt} exceeds the remaining size {}",
*size - *offset
);
Expand All @@ -202,7 +204,7 @@ impl Buf for ReadableBuf {
"ptr of slice must be valid across the lifetime of ReadableBuf"
);
assert!(
*size > *offset + len,
*size >= *offset + len,
"len {len} exceeds the remaining size {}",
*size - *offset
);
Expand Down Expand Up @@ -233,6 +235,12 @@ mod tests {
assert_eq!(buf.remaining(), 6);
assert_eq!(buf.chunk(), b" world");
assert_eq!(buf.to_bytes(), Bytes::from_static(b" world"));

let mut buf = ReadableBuf::from_slice(b"hello world");
assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world"));
assert_eq!(buf.remaining(), 0);
assert_eq!(buf.chunk(), b"");
assert_eq!(buf.to_bytes(), Bytes::from_static(b""));
}

#[test]
Expand All @@ -247,6 +255,12 @@ mod tests {
assert_eq!(buf.remaining(), 6);
assert_eq!(buf.chunk(), b" world");
assert_eq!(buf.to_bytes(), Bytes::from_static(b" world"));

let mut buf = ReadableBuf::from_bytes("hello world");
assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world"));
assert_eq!(buf.remaining(), 0);
assert_eq!(buf.chunk(), b"");
assert_eq!(buf.to_bytes(), Bytes::from_static(b""));
}

#[test]
Expand All @@ -262,6 +276,12 @@ mod tests {
assert_eq!(buf.remaining(), 6);
assert_eq!(buf.chunk(), b" world");
assert_eq!(buf.to_bytes(), Bytes::from_static(b" world"));

let mut buf = ReadableBuf::from_slice(bs);
assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world"));
assert_eq!(buf.remaining(), 0);
assert_eq!(buf.chunk(), b"");
assert_eq!(buf.to_bytes(), Bytes::from_static(b""));
}

#[test]
Expand All @@ -277,5 +297,11 @@ mod tests {
assert_eq!(buf.remaining(), 6);
assert_eq!(buf.chunk(), b" world");
assert_eq!(buf.to_bytes(), Bytes::from_static(b" world"));

let mut buf = ReadableBuf::from_slice(&bs);
assert_eq!(buf.copy_to_bytes(11), Bytes::from_static(b"hello world"));
assert_eq!(buf.remaining(), 0);
assert_eq!(buf.chunk(), b"");
assert_eq!(buf.to_bytes(), Bytes::from_static(b""));
}
}
146 changes: 146 additions & 0 deletions core/src/raw/oio/buf/writable_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use bytes::buf::UninitSlice;
use bytes::BufMut;

/// WritableBuf is the buf used in `oio::Read`.
///
/// This API is used internally by the `oio` crate. Users SHOULD never use it in any way.
///
/// # Safety
///
/// - Caller MUST make sure that input buffer lives longer than `WritableBuf`. Otherwise
/// `WritableBuf` might point to invalid memory.
/// - Caller MUST not mutate the original buffer in any way out of `WritableBuf`.
/// - Caller SHOULD NOT remote from `WritableBuf` in anyway.
pub struct WritableBuf {
ptr: *mut u8,
size: usize,
offset: usize,
}

/// # Safety
///
/// We make sure that `ptr` itself will never be changed.
unsafe impl Send for WritableBuf {}

impl WritableBuf {
/// Build a WritableBuf from slice.
pub fn from_slice(slice: &mut [u8]) -> Self {
Self {
ptr: slice.as_mut_ptr(),
size: slice.len(),
offset: 0,
}
}

/// Build a WritableBuf from mutable BufMut.
pub fn from_buf_mut(buf: &mut impl BufMut) -> Self {
let slice = buf.chunk_mut();
Self {
ptr: slice.as_mut_ptr(),
size: slice.len(),
offset: 0,
}
}
}

unsafe impl BufMut for WritableBuf {
fn remaining_mut(&self) -> usize {
assert!(
!self.ptr.is_null(),
"ptr of slice must be valid across the lifetime of WritableBuf"
);
self.size - self.offset
}

unsafe fn advance_mut(&mut self, cnt: usize) {
assert!(
!self.ptr.is_null(),
"ptr of slice must be valid across the lifetime of WritableBuf"
);
assert!(
self.size >= self.offset + cnt,
"cnt {cnt} exceeds the remaining size {}",
self.size - self.offset
);
self.offset += cnt;
}

fn chunk_mut(&mut self) -> &mut UninitSlice {
assert!(
!self.ptr.is_null(),
"ptr of slice must be valid across the lifetime of WritableBuf"
);

unsafe {
UninitSlice::from_raw_parts_mut(self.ptr.add(self.offset), self.size - self.offset)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;

#[test]
fn test_writable_buf_from_slice() {
let mut buf = [0u8; 10];
let mut writable_buf = WritableBuf::from_slice(&mut buf);
assert_eq!(writable_buf.remaining_mut(), 10);
assert_eq!(writable_buf.chunk_mut().len(), 10);

writable_buf.put_slice(b"hello");

assert_eq!(writable_buf.remaining_mut(), 5);
assert_eq!(writable_buf.chunk_mut().len(), 5);
assert_eq!(&buf[..5], b"hello");

writable_buf.put_slice(b"world");
assert_eq!(writable_buf.remaining_mut(), 0);
assert_eq!(writable_buf.chunk_mut().len(), 0);
assert_eq!(&buf[..], b"helloworld");
}

#[test]
fn test_writable_buf_from_bytes_mut() {
let mut buf = BytesMut::with_capacity(10);
let mut writable_buf = WritableBuf::from_buf_mut(&mut buf);
assert_eq!(writable_buf.remaining_mut(), 10);
assert_eq!(writable_buf.chunk_mut().len(), 10);

writable_buf.put_slice(b"hello");
assert_eq!(writable_buf.remaining_mut(), 5);
assert_eq!(writable_buf.chunk_mut().len(), 5);

writable_buf.put_slice(b"world");
assert_eq!(writable_buf.remaining_mut(), 0);
assert_eq!(writable_buf.chunk_mut().len(), 0);

unsafe {
buf.advance_mut(5);
}
assert_eq!(&buf[..5], b"hello");

unsafe {
buf.advance_mut(5);
}
assert_eq!(&buf[..], b"helloworld");
}
}

0 comments on commit 140b24d

Please sign in to comment.